-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
1390 lines (1153 loc) · 50.5 KB
/
utils.py
File metadata and controls
1390 lines (1153 loc) · 50.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import numpy as np
import random
import copy
from typing import Tuple, List, Optional, Dict, Any
import os
from base import Board, Player, compare_strength
from collections import namedtuple
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
def manhattan_distance(pos1: Tuple[int, int], pos2: Tuple[int, int]) -> int:
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
class BaseReward:
"""奖励函数基类"""
def get_piece_value(self, strength: int) -> float:
"""根据棋子强度返回价值权重"""
# 象和鼠有特殊价值,中等强度棋子也很重要
value_map = {1: 1.8, 2: 1.0, 3: 1.5, 4: 2.0, 5: 2.5, 6: 3.0, 7: 3.5, 8: 4.0}
return value_map.get(strength, 1.0)
def calculate_reward(
self,
board_before: Board,
board_after: Board,
action: Tuple,
player_id: int,
done: bool,
) -> float:
"""计算奖励值"""
raise NotImplementedError
class RewardFunction(BaseReward):
"""斗兽棋智能奖励函数"""
def __init__(self):
# 奖励权重配置
self.weights = {
"win_game": 100.0, # 获胜
"lose_game": -100.0, # 失败
"draw_game": 0.0, # 平局
"capture_piece": 10.0, # 吃掉对方棋子基础奖励
"be_captured": -8.0, # 被吃掉基础惩罚
"mutual_destruction": -0.5, # 同归于尽
"reveal_piece": 1.0, # 翻开棋子
"survival_penalty": -0.1,
}
# def can_capture(self, attacker_strength: int, defender_strength: int) -> bool:
# """判断攻击方是否能吃掉防守方"""
# if attacker_strength == 8 and defender_strength == 1: # 象吃鼠 x
# return False
# if attacker_strength > defender_strength:
# return True
# if attacker_strength == 1 and defender_strength == 8: # 鼠吃象
# return True
# return False
def find_closest_enemy(
self, board: Board, pos: Tuple[int, int], player_id: int
) -> Tuple[Optional[Tuple[int, int]], Optional[int]]:
"""
找到距离指定位置最近的敌方棋子
返回: (敌方位置, 距离) 或 (None, None)
"""
current_piece = board.get_piece(pos[0], pos[1])
if not current_piece:
return None, None
min_distance = float("inf")
closest_enemy_pos = None
# 遍历棋盘寻找已翻开的敌方棋子
for r in range(7):
for c in range(8):
enemy = board.get_piece(r, c)
if enemy and enemy.player != player_id and enemy.revealed:
dist = manhattan_distance(pos, (r, c))
# 更新最近敌人
if dist < min_distance:
min_distance = dist
closest_enemy_pos = (r, c)
return closest_enemy_pos, min_distance if closest_enemy_pos else None
def evaluate_position(
self, board: Board, pos: Tuple[int, int], player_id: int
) -> Tuple[float, float]:
"""
评估位置的威胁和机会程度
Returns:
Tuple[float, float]: (威胁值, 机会值)
"""
piece = board.get_piece(pos[0], pos[1])
if not piece or not piece.revealed:
return 0.0, 0.0
enemy_pos, distance = self.find_closest_enemy(board, pos, player_id)
if not enemy_pos or not distance:
return 0.0, 0.0
enemy = board.get_piece(enemy_pos[0], enemy_pos[1])
threat, opportunity = 0.0, 0.0
# 威胁评估: 敌人能吃掉我方棋子
if compare_strength(piece.strength, enemy.strength) == -1:
# 威胁随距离增加而减小
threat = 4.0 / (distance + 1)
# 对高价值棋子增加威胁程度
if self.get_piece_value(piece.strength) >= 3.0:
threat *= 1.5
threat = -threat # 转换为负值
# 机会评估: 我方棋子能吃掉敌人
if compare_strength(piece.strength, enemy.strength) == 1:
# 机会随距离增加而减小
opportunity = 3.0 / (distance + 1)
# 对高价值敌方棋子增加机会价值
if self.get_piece_value(enemy.strength) >= 3.0:
opportunity *= 1.5
return threat, opportunity
def _evaluate_revealed_piece(
self,
pos: Tuple[int, int],
threats: Dict[Tuple[int, int], int],
opportunities: Dict[Tuple[int, int], int],
strength: int,
is_self: bool,
) -> float:
"""
评估翻开棋子的价值
Args:
threats: 预计算的威胁字典
opportunities: 预计算的机会字典
"""
value = 0.0
# 根据敌我计算基础分
base_value = self.weights["reveal"] if is_self else -self.weights["reveal"]
threat_value = threats.get(
pos, 0
) # 需要改,如果是敌人,则威胁的是附近的己方棋子
opportunity_value = opportunities.get(pos, 0) # 同样,机会也是己方棋子的
value = base_value + threat_value + opportunity_value
return value * self.get_piece_value(strength)
def estimate_reveal_reward(
self,
board: Board,
pos: Tuple[int, int],
player_id: int,
threats: Dict[Tuple[int, int], int],
opportunities: Dict[Tuple[int, int], int],
) -> float:
"""估算翻开棋子的期望奖励"""
# 获取所有未翻开棋子的可能性
unrevealed_pieces = []
for r in range(7):
for c in range(8):
piece = board.get_piece(r, c)
if piece and not piece.revealed:
unrevealed_pieces.append((piece.player, piece.strength))
if not unrevealed_pieces:
return 0.0
# 计算期望奖励
total_reward = 0.0
probability = 1.0 / len(unrevealed_pieces)
for player, strength in unrevealed_pieces:
is_self = player == player_id
reward = self._evaluate_revealed_piece(
pos, threats, opportunities, strength, is_self
)
total_reward += reward * probability
return total_reward
def _is_defensive_move(
self,
board_after: Board,
start_pos: Tuple[int, int],
end_pos: Tuple[int, int],
player_id: int,
threats_before: Dict[Tuple[int, int], int],
threats_after: Dict[Tuple[int, int], int],
) -> bool:
"""
判断是否为防守性移动
Args:
threats_before: 移动前的威胁字典
threats_after: 移动后的威胁字典
"""
piece = board_after.get_piece(end_pos[0], end_pos[1])
if not piece or piece.player != player_id or not piece.revealed:
return False
# 检查移动是否减少了威胁
old_threats = threats_before.get(start_pos, 0)
new_threats = threats_after.get(end_pos, 0)
# 高价值棋子(强度>=3)的威胁减少
if new_threats < old_threats and self.get_piece_value(piece.strength) >= 3.0:
return True
return False
def calculate_reward(
self,
board_before: Board,
board_after: Board,
action: Tuple[str, Tuple[int, int], Optional[Tuple[int, int]]],
player_id: int,
result: int,
) -> float:
"""计算奖励值"""
action_type, pos1, pos2 = action
total_reward = self.weights["survival_penalty"]
if result == player_id:
return self.weights["win_game"]
if result == 1 - player_id:
return self.weights["lose_game"]
if result == 2:
return self.weights["draw_game"]
# 翻开棋子的动作
if action_type == "reveal":
r, c = pos1
piece = board_after.get_piece(r, c)
if piece:
# 使用合并后的评估函数
threat, opportunity = self.evaluate_position(
board_after, pos1, player_id
)
# 由于翻开后处于后手,威胁的影响更大
total_reward = self.weights["reveal_piece"] + threat * 1.2 + opportunity
# 移动棋子的动作
else: # move
start_pos, end_pos = pos1, pos2
moving_piece_before = board_before.get_piece(start_pos[0], start_pos[1])
target_piece_before = board_before.get_piece(end_pos[0], end_pos[1])
piece_after = board_after.get_piece(end_pos[0], end_pos[1])
# 吃子奖励计算
if target_piece_before and target_piece_before.player != player_id:
if piece_after and piece_after.player == player_id:
# 成功吃子
capture_value = self.get_piece_value(target_piece_before.strength)
total_reward += self.weights["capture_piece"] * capture_value
elif not piece_after:
# 同归于尽
total_reward += self.weights["mutual_destruction"]
else:
# 被吃
lost_value = self.get_piece_value(moving_piece_before.strength)
total_reward += self.weights["be_captured"] * lost_value
# 评估移动后的位置价值变化
if piece_after and piece_after.player == player_id:
# 使用合并后的评估函数
old_threat, old_opportunity = self.evaluate_position(
board_before, start_pos, player_id
)
new_threat, new_opportunity = self.evaluate_position(
board_after, end_pos, player_id
)
# 威胁减少或机会增加时给予奖励
if new_threat > old_threat: # threat是负值
total_reward += (new_threat - old_threat) * 1.2
if new_opportunity > old_opportunity:
total_reward += (new_opportunity - old_opportunity) * 1.5
return total_reward
# class SimpleReward(BaseReward):
# """斗兽棋简单奖励函数"""
# def __init__(self):
# self.weights = {
# "win_game": 10.0, # 获胜
# "lose_game": -10.0, # 失败
# "draw_game": 0.0, # 平局
# "capture_piece": 1.0, # 吃掉对方棋子基础奖励
# "be_captured": -1.0, # 被吃掉基础惩罚
# "reveal_piece": 0.5, # 翻开棋子
# "survival_penalty": -0.1,
# }
# def calculate_reward(
# self,
# board_before: Board,
# board_after: Board,
# action: Tuple[str, Tuple[int, int], Optional[Tuple[int, int]]],
# player_id: int,
# result: int,
# ) -> float:
# """计算简单奖励值"""
# action_type, pos1, pos2 = action
# if result == player_id:
# return self.weights["win_game"]
# if result == 1 - player_id:
# return self.weights["lose_game"]
# if result == 2:
# return self.weights["draw_game"]
# total_reward = 0.0
# if action_type == "reveal":
# total_reward += self.weights["reveal_piece"]
# elif action_type == "move":
# start_pos, end_pos = pos1, pos2
# target_piece = board_after.get_piece(end_pos[0], end_pos[1])
# if target_piece and target_piece.player != player_id:
# total_reward += self.weights["capture_piece"]
# elif not target_piece:
# total_reward += self.weights["be_captured"]
# return total_reward
class SimpleReward:
"""重新设计的简单奖励函数"""
def __init__(self):
self.weights = {
"win_game": 100.0, # 获胜
"lose_game": -100.0, # 失败
"draw_game": 0.0, # 平局
"capture_piece": 5.0, # 吃掉对方棋子基础奖励
"be_captured": -5.0, # 被吃掉基础惩罚
"mutual_destruction": -0.5, # 同归于尽:中性偏负
"step_penalty": -0.02, # 每步小惩罚,避免拖延
}
def count_immediate_threats_opportunities(
self, board: Board, player_id: int
) -> Tuple[float, float]:
"""计算当前棋盘上的即时威胁和机会总和"""
total_threats = 0.0
total_opportunities = 0.0
my_pieces = board.get_player_pieces(player_id)
for r, c in my_pieces:
piece = board.get_piece(r, c)
if piece and piece.revealed:
# 检查四周的直接威胁和机会
for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nr, nc = r + dr, c + dc
if 0 <= nr < 7 and 0 <= nc < 8:
neighbor = board.get_piece(nr, nc)
if (
neighbor
and neighbor.revealed
and neighbor.player != player_id
):
compare_result = compare_strength(piece.strength, neighbor.strength)
if compare_result == -1: # 邻居能吃掉我
total_threats += self.get_piece_value(piece.strength)
elif compare_result == 1: # 我能吃掉邻居
total_opportunities += self.get_piece_value(
neighbor.strength
)
return total_threats, total_opportunities
def calculate_reward(
self,
board_before: Board,
board_after: Board,
action: Tuple[str, Tuple[int, int], Optional[Tuple[int, int]]],
player_id: int,
result: int,
) -> float:
"""计算重新设计的奖励值"""
try:
# 处理动作解包
if len(action) == 2:
action_type, pos1 = action
pos2 = None
elif len(action) == 3:
action_type, pos1, pos2 = action
else:
return -2.0
# 游戏结束奖励(大奖励,稀疏)
if result == player_id:
return self.weights["win_game"]
if result == 1 - player_id:
return self.weights["lose_game"]
if result == 2:
return self.weights["draw_game"]
# 基础步数惩罚,避免拖延
total_reward = self.weights["step_penalty"]
# 动作特定奖励
if action_type == "reveal":
# 翻开动作本身不给特殊奖励,让威胁机会变化来评估
pass
elif action_type == "move" and pos2 is not None:
start_pos, end_pos = pos1, pos2
moving_piece_before = board_before.get_piece(start_pos[0], start_pos[1])
target_piece_before = board_before.get_piece(end_pos[0], end_pos[1])
piece_after = board_after.get_piece(end_pos[0], end_pos[1])
# 吃子奖励计算
if target_piece_before and target_piece_before.player != player_id:
target_value = self.get_piece_value(target_piece_before.strength)
my_value = self.get_piece_value(moving_piece_before.strength)
if piece_after and piece_after.player == player_id:
# 成功吃子
total_reward += self.weights["capture_piece"] * target_value
# 额外奖励:用低价值棋子吃高价值棋子
if target_value > my_value:
total_reward += (target_value - my_value) * 0.5
elif not piece_after:
# 同归于尽:精确的价值交换评估
value_diff = target_value - my_value
if value_diff > 0:
# 有利交换
total_reward += (
self.weights["mutual_destruction"] + value_diff * 1.0
)
elif value_diff == 0:
# 等价交换,中性偏负
total_reward += self.weights["mutual_destruction"]
else:
# 不利交换
total_reward += (
self.weights["mutual_destruction"] + value_diff * 1.0
) # value_diff是负数
else:
# 被吃掉
total_reward += self.weights["be_captured"] * my_value
# === 统一的威胁机会变化评估(适用于所有动作类型)===
threats_before, opportunities_before = (
self.count_immediate_threats_opportunities(board_before, player_id)
)
threats_after, opportunities_after = (
self.count_immediate_threats_opportunities(board_after, player_id)
)
# 威胁减少是好事,机会增加是好事
threat_improvement = threats_before - threats_after # 威胁减少为正值
opportunity_improvement = (
opportunities_after - opportunities_before
) # 机会增加为正值
# 综合战略改善奖励
strategic_improvement = threat_improvement + opportunity_improvement
total_reward += strategic_improvement * 0.4
return total_reward
except Exception as e:
print(f"计算奖励时出错: {e}, 动作: {action}")
raise e
class ImprovedReward(BaseReward):
"""改进的奖励函数 - 解决反复移动问题"""
def __init__(self):
self.weights = {
"win_game": 50.0, # 降低获胜奖励,避免过大梯度
"lose_game": -50.0, # 降低失败惩罚
"draw_game": 0.0,
"capture_piece": 3.0, # 降低吃子奖励
"be_captured": -3.0, # 降低被吃惩罚
"mutual_destruction": -0.5,
"step_penalty": -0.1, # 增加步数惩罚,强制快速结束
"repetition_penalty": -1.0, # 减少重复动作惩罚
"progress_reward": 0.5, # 减少进度奖励
"position_improvement": 0.2, # 减少位置改善奖励
"no_progress_penalty": -0.5, # 新增:无进展惩罚
}
# 记录最近的动作历史
self.action_history = []
self.position_history = []
self.max_history = 10
self.last_capture_step = 0 # 记录上次吃子的步数
self.current_step = 0 # 当前步数
def detect_repetition(self, action: Tuple) -> float:
"""检测重复动作并返回惩罚"""
if len(action) < 3:
return 0.0
action_type, pos1, pos2 = action
if action_type != "move" or pos2 is None:
return 0.0
# 检查是否在最近几步中有相同或相反的移动
repetition_penalty = 0.0
# 当前移动
current_move = (pos1, pos2)
reverse_move = (pos2, pos1)
# 检查历史中的重复
recent_actions = self.action_history[-4:] # 只检查最近4步,减少惩罚
for i, hist_action in enumerate(recent_actions):
if len(hist_action) >= 3 and hist_action[0] == "move":
hist_move = (hist_action[1], hist_action[2])
# 完全相同的移动
if hist_move == current_move:
repetition_penalty += self.weights["repetition_penalty"] * (
1.0 - i * 0.2
)
# 相反的移动(往返移动)
elif hist_move == reverse_move:
repetition_penalty += (
self.weights["repetition_penalty"] * 1.2 * (1.0 - i * 0.2)
)
return repetition_penalty
def calculate_position_value(
self, board, pos: Tuple[int, int], player_id: int
) -> float:
"""计算位置的战略价值"""
r, c = pos
value = 0.0
# 简化中心位置价值计算
center_distance = abs(r - 3) + abs(c - 3.5)
center_value = max(0, 1 - center_distance * 0.1) # 降低中心价值
value += center_value
# 检查周围的威胁和机会
piece = board.get_piece(r, c)
if piece and piece.player == player_id and piece.revealed:
for dr, dc in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nr, nc = r + dr, c + dc
if 0 <= nr < 7 and 0 <= nc < 8:
neighbor = board.get_piece(nr, nc)
if neighbor and neighbor.revealed and neighbor.player != player_id:
# 能攻击敌人增加价值
if compare_strength(piece.strength, neighbor.strength) == 1:
value += 1.0 # 降低攻击价值
# 被威胁减少价值
elif compare_strength(piece.strength, neighbor.strength) == -1:
value -= 0.8 # 降低威胁惩罚
return value
def calculate_progress_reward(
self, board_before, board_after, action: Tuple, player_id: int
) -> float:
"""计算游戏进度奖励"""
progress_reward = 0.0
# 1. 翻开新棋子给予奖励
if len(action) >= 2 and action[0] == "reveal":
progress_reward += self.weights["progress_reward"]
# 2. 移动到新位置给予奖励
elif len(action) >= 3 and action[0] == "move":
pos1, pos2 = action[1], action[2]
# 简化位置历史检查
if pos2 not in self.position_history[-10:]: # 减少到最近10步
progress_reward += self.weights["progress_reward"] * 0.3
# 简化位置价值改善奖励
old_value = self.calculate_position_value(board_before, pos1, player_id)
new_value = self.calculate_position_value(board_after, pos2, player_id)
if new_value > old_value:
progress_reward += self.weights["position_improvement"] * (
new_value - old_value
)
return progress_reward
def update_history(self, action: Tuple):
"""更新动作和位置历史"""
self.action_history.append(action)
if len(self.action_history) > self.max_history:
self.action_history.pop(0)
# 记录移动的目标位置
if len(action) >= 3 and action[0] == "move":
self.position_history.append(action[2])
if len(self.position_history) > self.max_history * 2:
self.position_history.pop(0)
def calculate_reward(
self,
board_before,
board_after,
action: Tuple[str, Tuple[int, int], Optional[Tuple[int, int]]],
player_id: int,
result: int,
) -> float:
"""改进的奖励计算 - 重点解决步长过长问题"""
try:
self.current_step += 1
# 处理动作解包
if len(action) == 2:
action_type, pos1 = action
pos2 = None
action = (action_type, pos1, pos2)
elif len(action) == 3:
action_type, pos1, pos2 = action
else:
return -2.0
# 游戏结束奖励
if result == player_id:
# 奖励快速获胜
step_bonus = max(0, (500 - self.current_step) * 0.02)
return self.weights["win_game"] + step_bonus
if result == 1 - player_id:
return self.weights["lose_game"]
if result == 2:
return self.weights["draw_game"]
# 基础步数惩罚(递增)
step_penalty = self.weights["step_penalty"]
if self.current_step > 200:
step_penalty *= 2 # 200步后加倍惩罚
if self.current_step > 400:
step_penalty *= 3 # 400步后三倍惩罚
total_reward = step_penalty
# 无进展惩罚:如果很长时间没有吃子
steps_since_capture = self.current_step - self.last_capture_step
if steps_since_capture > 50:
total_reward += self.weights["no_progress_penalty"] * (
steps_since_capture / 50.0
)
# 检测重复动作惩罚
repetition_penalty = self.detect_repetition(action)
total_reward += repetition_penalty
# 简化进度奖励
progress_reward = self.calculate_progress_reward(
board_before, board_after, action, player_id
)
total_reward += progress_reward
# 处理移动动作的战斗奖励
if action_type == "move" and pos2 is not None:
start_pos, end_pos = pos1, pos2
moving_piece_before = board_before.get_piece(start_pos[0], start_pos[1])
target_piece_before = board_before.get_piece(end_pos[0], end_pos[1])
piece_after = board_after.get_piece(end_pos[0], end_pos[1])
# 战斗结果奖励
if target_piece_before and target_piece_before.player != player_id:
target_value = self.get_piece_value(target_piece_before.strength)
my_value = self.get_piece_value(moving_piece_before.strength)
# 记录吃子时间
self.last_capture_step = self.current_step
if piece_after and piece_after.player == player_id:
# 成功吃子
total_reward += self.weights["capture_piece"] * target_value
if target_value > my_value:
total_reward += (
target_value - my_value
) * 0.3 # 降低额外奖励
elif not piece_after:
# 同归于尽
value_diff = target_value - my_value
total_reward += (
self.weights["mutual_destruction"] + value_diff * 0.5
)
else:
# 被吃掉
total_reward += self.weights["be_captured"] * my_value
# 更新历史记录
self.update_history(action)
# 限制奖励范围,避免梯度爆炸
total_reward = np.clip(total_reward, -10.0, 10.0)
return total_reward
except Exception as e:
print(f"计算奖励时出错: {e}, 动作: {action}")
return -2.0
def reset_history(self):
"""重置历史记录(每局游戏开始时调用)"""
self.action_history = []
self.position_history = []
self.last_capture_step = 0
self.current_step = 0
class AggressiveReward(BaseReward):
"""激进的奖励函数 - 强制学习攻击性策略"""
def __init__(self):
self.weights = {
"win_game": 100.0,
"lose_game": -100.0,
"draw_game": -10.0, # 平局也是负奖励,鼓励主动进攻
"capture_piece": 15.0, # 大幅提高吃子奖励
"be_captured": -8.0,
"mutual_destruction": 2.0, # 同归于尽变为正奖励(鼓励交换)
"step_penalty": -0.2, # 增加步数惩罚
"repetition_penalty": -3.0, # 大幅增加重复惩罚
"no_progress_penalty": -2.0, # 增加无进展惩罚
"reveal_bonus": 2.0, # 翻开奖励
"attack_bonus": 5.0, # 新增:主动攻击奖励
"retreat_penalty": -2.0, # 新增:后退惩罚
}
self.action_history = []
self.position_history = []
self.last_capture_step = 0
self.current_step = 0
self.board_center = (3, 4) # 棋盘中心
def is_attacking_move(
self, board_before: Board, action: Tuple, player_id: int
) -> bool:
"""判断是否为攻击性移动"""
if len(action) < 3 or action[0] != "move":
return False
_, pos1, pos2 = action
if pos2 is None:
return False
# 检查目标位置是否有敌方棋子
target_piece = board_before.get_piece(pos2[0], pos2[1])
if target_piece and target_piece.player != player_id:
return True
# 检查是否向敌方棋子靠近
moving_piece = board_before.get_piece(pos1[0], pos1[1])
if not moving_piece or not moving_piece.revealed:
return False
# 找最近的敌方棋子,看是否在靠近
min_dist_before = float("inf")
min_dist_after = float("inf")
for r in range(7):
for c in range(8):
piece = board_before.get_piece(r, c)
if piece and piece.player != player_id and piece.revealed:
dist_before = abs(r - pos1[0]) + abs(c - pos1[1])
dist_after = abs(r - pos2[0]) + abs(c - pos2[1])
min_dist_before = min(min_dist_before, dist_before)
min_dist_after = min(min_dist_after, dist_after)
return min_dist_after < min_dist_before
def is_retreating_move(
self, board_before: Board, action: Tuple, player_id: int
) -> bool:
"""判断是否为后退移动"""
if len(action) < 3 or action[0] != "move":
return False
_, pos1, pos2 = action
if pos2 is None:
return False
moving_piece = board_before.get_piece(pos1[0], pos1[1])
if not moving_piece or not moving_piece.revealed:
return False
# 检查是否远离所有敌方棋子
total_dist_before = 0
total_dist_after = 0
enemy_count = 0
for r in range(7):
for c in range(8):
piece = board_before.get_piece(r, c)
if piece and piece.player != player_id and piece.revealed:
dist_before = abs(r - pos1[0]) + abs(c - pos1[1])
dist_after = abs(r - pos2[0]) + abs(c - pos2[1])
total_dist_before += dist_before
total_dist_after += dist_after
enemy_count += 1
if enemy_count == 0:
return False
avg_dist_before = total_dist_before / enemy_count
avg_dist_after = total_dist_after / enemy_count
return avg_dist_after > avg_dist_before + 0.5 # 明显远离
def detect_repetition(self, action: Tuple) -> float:
"""更严厉的重复检测"""
if len(action) < 3 or action[0] != "move":
return 0.0
_, pos1, pos2 = action
if pos2 is None:
return 0.0
current_move = (pos1, pos2)
reverse_move = (pos2, pos1)
repetition_penalty = 0.0
recent_actions = self.action_history[-8:] # 检查最近8步
for i, hist_action in enumerate(recent_actions):
if len(hist_action) >= 3 and hist_action[0] == "move":
hist_move = (hist_action[1], hist_action[2])
if hist_move == current_move:
# 完全相同的移动,惩罚递增
repetition_penalty += self.weights["repetition_penalty"] * (
2 ** (len(recent_actions) - i)
)
elif hist_move == reverse_move:
# 往返移动,更严重的惩罚
repetition_penalty += (
self.weights["repetition_penalty"]
* 2
* (2 ** (len(recent_actions) - i))
)
return repetition_penalty
def calculate_reward(
self, board_before, board_after, action: Tuple, player_id: int, result: int
) -> float:
"""激进的奖励计算"""
try:
self.current_step += 1
# 处理动作解包
if len(action) == 2:
action_type, pos1 = action
pos2 = None
action = (action_type, pos1, pos2)
elif len(action) == 3:
action_type, pos1, pos2 = action
else:
return -5.0
# 游戏结束奖励
if result == player_id:
step_bonus = max(
0, (300 - self.current_step) * 0.1
) # 更大的快速获胜奖励
return self.weights["win_game"] + step_bonus
if result == 1 - player_id:
return self.weights["lose_game"]
if result == 2:
return self.weights["draw_game"]
# 基础步数惩罚(更严厉)
step_penalty = self.weights["step_penalty"]
if self.current_step > 150:
step_penalty *= 3
if self.current_step > 300:
step_penalty *= 5
total_reward = step_penalty
# 无进展惩罚
steps_since_capture = self.current_step - self.last_capture_step
if steps_since_capture > 30: # 降低到30步
total_reward += self.weights["no_progress_penalty"] * (
steps_since_capture / 30.0
)
# 重复动作惩罚
repetition_penalty = self.detect_repetition(action)
total_reward += repetition_penalty
# 动作特定奖励
if action_type == "reveal":
total_reward += self.weights["reveal_bonus"]
elif action_type == "move" and pos2 is not None:
start_pos, end_pos = pos1, pos2
moving_piece_before = board_before.get_piece(start_pos[0], start_pos[1])
target_piece_before = board_before.get_piece(end_pos[0], end_pos[1])
piece_after = board_after.get_piece(end_pos[0], end_pos[1])
# 战斗奖励
if target_piece_before and target_piece_before.player != player_id:
target_value = self.get_piece_value(target_piece_before.strength)
my_value = self.get_piece_value(moving_piece_before.strength)
self.last_capture_step = self.current_step
if piece_after and piece_after.player == player_id:
# 成功吃子 - 大奖励
total_reward += self.weights["capture_piece"] * target_value
total_reward += self.weights["attack_bonus"] # 额外攻击奖励
if target_value > my_value:
total_reward += (
target_value - my_value
) * 2.0 # 更大的以小博大奖励
elif not piece_after:
# 同归于尽 - 现在是正奖励
total_reward += self.weights["mutual_destruction"] * min(
target_value, my_value
)
else:
# 被吃掉 - 仍然惩罚,但减少惩罚以鼓励尝试
total_reward += self.weights["be_captured"] * my_value * 0.5
# 移动行为分析
else:
# 检查是否为攻击性移动
if self.is_attacking_move(board_before, action, player_id):
total_reward += (
self.weights["attack_bonus"] * 0.5
) # 较小的攻击奖励
# 检查是否为后退移动
elif self.is_retreating_move(board_before, action, player_id):
total_reward += self.weights["retreat_penalty"]
# 更新历史
self.action_history.append(action)
if len(self.action_history) > 10:
self.action_history.pop(0)
if action_type == "move" and pos2 is not None:
self.position_history.append(pos2)
if len(self.position_history) > 20:
self.position_history.pop(0)
# 限制奖励范围
return np.clip(total_reward, -20.0, 20.0)
except Exception as e:
print(f"计算奖励时出错: {e}, 动作: {action}")
return -5.0
def reset_history(self):
"""重置历史记录"""
self.action_history = []
self.position_history = []
self.last_capture_step = 0
self.current_step = 0
class FeatureExtractor:
"""改进的特征提取器"""
def __init__(self):
self.reward_function = RewardFunction()
def extract_features(
self, board: Board, player_id: int, action: Optional[Tuple] = None
) -> np.ndarray:
"""提取状态-动作特征"""
features = []
# 1. 基础棋盘特征 (8维)
features.extend(self._extract_board_features(board, player_id))
# 2. 威胁和机会特征 (8维)
features.extend(self._extract_threat_opportunity_features(board, player_id))
# 3. 位置控制特征 (4维)