-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRecoTrainer.py
More file actions
6479 lines (5572 loc) · 258 KB
/
RecoTrainer.py
File metadata and controls
6479 lines (5572 loc) · 258 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#WIP -- FUCKING PERFECT KITING
import pydirectinput
import time
import pymem
import pymem.exception
import threading
import frida
import pyautogui
import bresenham
import json
import random
import tkinter as tk
# ===== RecoTrainer additions: GUI log routing =====
LOG_SINK = None # set by GUI
def rt_log(msg: str):
try:
import time as _t
ts = _t.strftime('%H:%M:%S')
except Exception:
ts = '??:??:??'
line = f"[{ts}] " + str(msg)
try:
if LOG_SINK is not None:
LOG_SINK(line)
else:
__builtins__['print'](line)
except Exception:
__builtins__['print'](line)
_builtin_print = print
def print(*args, **kwargs):
if kwargs.pop('_raw', False):
return _builtin_print(*args, **kwargs)
try:
rt_log(' '.join(str(a) for a in args))
except Exception:
_builtin_print(*args, **kwargs)
from ttkthemes import ThemedTk
from pynput import mouse, keyboard
from tkinter import ttk
from queue import PriorityQueue
from pathlib import Path
import os, sys
from collections import deque
import math
import re
#memory addresses
walkAddress = "0xEEC70"
npcAddress = "0x1F450E"
directionalAddress = "0x6DACA"
EXP_HOOK_ADDR = 0x577414
varOffset = 0xA0
totalExp = None
WEIGHT_WRITE_ADDRS = [0x100DF5, 0x100454]
KILL_FLAG_DEBOUNCE = 0.25
SPAWN_UID_OFFSET = 0xA0
MOB_ID_OFFSET = 0x98 # we read short at address_x - 0x98
MOB_ID_FILTERS = {285, 302, 188, 422, 538, 286, 292, 672, 301, 543, 325, 321, 319, 305, 502, 511, 512, 513, 514, 754, 413, 15, 324, 412, 84, 421, 422, 417}
# --- Configurable cleanup settings (GUI can edit these) ---
LAST_MOVED_TIMER_SECONDS = 35 # how long before unmoved NPC is stale
FLANK_RANGE = 1
INVALID_COORD_LIMIT = 100 # max valid X/Y (anything higher is junk)
POLL_INTERVAL = 0.05 # how often cleanup runs (seconds)
RANGE_MODE = {}
map_data_lock = threading.Lock()
SPEECH_ASSOC_RADIUS = 6 # tiles
SPEECH_TTL_S = 7.0 # bubble lifetime
recent_speech_log = deque(maxlen=120)
npc_last_speech = {} # addr_hex -> {"text": str, "ts": float}
speech_quarantine_until = 0.0
TRIGGERS = [r'\bmax\b', r'\bnecklace\b', r'\b5\b'] # optional regex triggers
HARVEST_MODE = False
_harvest_thread = None
HARVEST_CANCEL = threading.Event()
# ═══════════════════════════════════════════════════════════════
# TASK MODE: Task management (replaces harvest mode)
# ═══════════════════════════════════════════════════════════════
TASK_MODE = False
_task_thread = None
TASK_CANCEL = threading.Event()
_task_loops_remaining = 1000
_tasks = [] # List of task dictionaries
# Blocked tile tracking (from npc4)
_task_temporary_blocked_tiles = {} # tile -> expire_time
_task_permanent_blocked_tiles = set() # permanently blocked tiles
_task_current_navigation_target = None # Track final target for obstacle handling
_task_last_success_direction = {} # task key -> preferred facing direction
FACING_NAME_TO_CODE = {"DOWN": 0, "LEFT": 1, "UP": 2, "RIGHT": 3}
x_address = None
y_address = None
directional_address = None
combat_baseline_exp = None
current_target_npc = None
xstarted = 0
debug = 0
pyautogui.PAUSE = 0
last_direction = None
wandering_target = None
stuck_timer_start = None
last_position = None
pm = None # Global pymem instance
pause_flag = False
stat_base = None
CLICKING_ENABLED = True
resurrect_points = []
movement_allowed = threading.Event()
movement_allowed.set()
clicks_in_progress = threading.Event()
WANDER_TIMEOUT_S = 15.0 # max time to wander with no targets before re-running Home
game_win = None
STOP_HOME_IF_TARGET = True
IGNORE_PROTECTION = False # False = protection ON, True = ignore (boss mode)
POST_PROTECT_GRACE_S = 2.0 # tiny debounce after protection ends
# ===================== HOME ROUTINE CONFIG =====================
HOME_POS = (3, 12) # <<< SET THIS to your desired (X, Y) "home" tile
RUN_HOME_AFTER_KILL = True
HOME_NEAR_THRESH = 1 # distance in tiles considered "arrived
HOME_TRAVEL_TIMEOUT = 6.0 # seconds to give the walk before giving up
home_routine_running = threading.Event()
VANISH_GRACE_SEC = 0.0 # how long we wait after a target vanishes before assuming kill
RESPAWN_CHANGE_SEC = 0.4 # debounce if unique_id flips rapidly on respawn
WALKABLE_ARG = sys.argv[1] if len(sys.argv) > 1 else None
IMMUNITY_SEC = 0.0 # first N seconds to ignore death detection for a fresh target
_target_immunity_until = {} # addr_hex -> monotonic deadline
# [REMOVED] boss_aggro_removed_TOGGLE stripped
SIT_REMOVED = 70 # <<< Configurable sit timer in seconds
CONFIG_FILE = "cerafrog_config.json" # <<< Settings persistence file
KILL_QUARANTINE_SEC = 0.0 # keep a killed NPC muted for ~1s
RECENTLY_KILLED: dict[str, float] = {} # addr_hex -> time.monotonic()
_last_kill_ts: dict[str, float] = {}
_kill_lock = threading.Lock()
KILL_DEBOUNCE_SEC = 0.25
DIRECTIONAL_LOOT = True # pick 1 of 4, based on facing
LOOT_HOLD_SECONDS = 6.0 # how long to keep clicking that one spotp
FAST_CLICK = False
FAST_CLICK_BURST_COUNT = 3
FAST_CLICK_GAP_S = 0.12
ATTACK_RECENCY_S = 0.3
PRE_HIT_GRACE_S = 0.3
last_attack_time = 0.0
last_attack_addr = None
NO_CLICK_UNTIL = 0.0
F5_TAP_COUNT = 24
HOME_AFTER_KILLS_N = 1
KILLS_SINCE_HOME = 0
FORCE_MOVEMENT_SECONDS = 4.05
COMBAT_TASK_DURATION = 120 # default combat runtime in seconds
COMBAT_CLEAR_GRACE_SECONDS = 5.0 # how long area must stay empty before ending combat
DIR_TO_SLOT = {
0: 2, # 0 = Down -> points[2]
1: 3, # 1 = Left -> points[3]
2: 0, # 2 = Up -> points[0]
3: 1, # 3 = Right -> points[1]
}
# Global bot control
bot_running = True # <<< Global flag to control bot threads
# ——— all stat offsets relative to EXP base ———
STAT_OFFSETS = {
'exp': 0x000, # at exp_base itself
'weight': -0x008, # 0x0293F404
'level': 0x008, # 0x0293F414
'tnl': 0x010, # 0x0293F41C
'eon': 0x034, # 0x0293F440
'vit': 0x188, # 0x0293F594
'dex': 0x184, # 0x0293F590
'acc': 0x180, # 0x0293F58C
'def': 0x17C, # 0x0293F588
'pwr': 0x178, # 0x0293F584
'crit': 0x174, # 0x0293F580
'armor': 0x170, # 0x0293F57C
'eva': 0x16C, # 0x0293F578
'hit_rate': 0x168, # 0x0293F574
'max_dmg': 0x164, # 0x0293F570
'min_dmg': 0x160, # 0x0293F56C
'aura': 0x18C, # 0x0293F598
'max_hp': 0x358, # 0x0293F764
'max_mana': 0x4F0, # 0x0293F8FC
}
def save_settings():
"""Save current settings to config file"""
try:
config = {
"boss_aggro_removed_TOGGLE": boss_aggro_removed_TOGGLE,
"SIT_REMOVED": SIT_REMOVED,
"LAST_MOVED_TIMER_SECONDS": LAST_MOVED_TIMER_SECONDS,
"MOB_ID_FILTERS": list(MOB_ID_FILTERS),
"HOME_POS": HOME_POS,
"FLANK_RANGE": FLANK_RANGE,
"RUN_HOME_AFTER_KILL": RUN_HOME_AFTER_KILL,
"CLICKING_ENABLED": CLICKING_ENABLED,
"FAST_CLICK": FAST_CLICK,
"FAST_CLICK_BURST_COUNT": FAST_CLICK_BURST_COUNT,
"FAST_CLICK_GAP_S": FAST_CLICK_GAP_S,
"IGNORE_PROTECTION": bool(globals().get("IGNORE_PROTECTION", False)),
"F5_TAP_COUNT": int(F5_TAP_COUNT),
"WANDER_TIMEOUT_S": float(WANDER_TIMEOUT_S),
"HOME_AFTER_KILLS_N": int(HOME_AFTER_KILLS_N),
"FORCE_MOVEMENT_SECONDS": float(FORCE_MOVEMENT_SECONDS),
"RESURRECT_POINTS": list(resurrect_points), # Save click locations [Up, Right, Down, Left]
"COMBAT_TASK_DURATION": int(COMBAT_TASK_DURATION),
}
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=2)
print(f"[CONFIG] Settings saved to {CONFIG_FILE}")
except Exception as e:
print(f"[CONFIG] Failed to save settings: {e}")
def load_settings():
global boss_aggro_removed_TOGGLE, SIT_REMOVED, LAST_MOVED_TIMER_SECONDS, MOB_ID_FILTERS, HOME_POS, FLANK_RANGE, RUN_HOME_AFTER_KILL, CLICKING_ENABLED, FAST_CLICK, FAST_CLICK_BURST_COUNT, FAST_CLICK_GAP_S, IGNORE_PROTECTION, HOME_AFTER_KILLS_N, F5_TAP_COUNT, WANDER_TIMEOUT_S, resurrect_points, COMBAT_TASK_DURATION
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as f:
config = json.load(f)
# [REMOVED] boss_aggro_removed_TOGGLE stripped
SIT_REMOVED = config.get("SIT_REMOVED", SIT_REMOVED)
LAST_MOVED_TIMER_SECONDS = config.get("LAST_MOVED_TIMER_SECONDS", LAST_MOVED_TIMER_SECONDS)
MOB_ID_FILTERS = set(config.get("MOB_ID_FILTERS", list(MOB_ID_FILTERS)))
HOME_POS = tuple(config.get("HOME_POS", list(HOME_POS)))
RUN_HOME_AFTER_KILL = config.get("RUN_HOME_AFTER_KILL", RUN_HOME_AFTER_KILL)
CLICKING_ENABLED = config.get("CLICKING_ENABLED", CLICKING_ENABLED)
FAST_CLICK = config.get("FAST_CLICK", FAST_CLICK)
FAST_CLICK_BURST_COUNT = int(config.get("FAST_CLICK_BURST_COUNT", FAST_CLICK_BURST_COUNT))
FAST_CLICK_GAP_S = float(config.get("FAST_CLICK_GAP_S", FAST_CLICK_GAP_S))
F5_TAP_COUNT = int(config.get("F5_TAP_COUNT", F5_TAP_COUNT))
WANDER_TIMEOUT_S = float(config.get("WANDER_TIMEOUT_S", WANDER_TIMEOUT_S))
HOME_AFTER_KILLS_N = int(config.get("HOME_AFTER_KILLS_N", HOME_AFTER_KILLS_N))
v = config.get("FLANK_RANGE")
if v is not None:
FLANK_RANGE = int(v)
IGNORE_PROTECTION = bool(config.get("IGNORE_PROTECTION", IGNORE_PROTECTION)) # <— ADD THIS
COMBAT_TASK_DURATION = int(config.get("COMBAT_TASK_DURATION", COMBAT_TASK_DURATION))
# Load click locations (resurrect_points) - [Up, Right, Down, Left]
saved_points = config.get("RESURRECT_POINTS", [])
if saved_points and len(saved_points) == 4:
# Convert list of lists to list of tuples
resurrect_points[:] = [tuple(p) if isinstance(p, list) else p for p in saved_points]
print(f"[CONFIG] Click locations loaded: {resurrect_points}")
print(f"[CONFIG] Settings loaded from {CONFIG_FILE}")
else:
print(f"[CONFIG] No config file found, using defaults")
except Exception as e:
print(f"[CONFIG] Failed to load settings: {e}")
# --- Unified live-XY navigation core (targets for combat & harvest) ---
from typing import Callable, Optional, Tuple, Protocol
# Keep these tunables (you can tweak to taste)
STEP_TIMEOUT_S = 0.20 # shorter = snappier; don't go too low or steps may miss
STEP_POLL_S = 0.008 # faster polling confirms steps sooner
REPLAN_LIMIT = 10 # how many failed replans before giving up
class Target(Protocol):
def get_xy(self) -> Optional[Tuple[int,int]]: ...
def is_valid(self) -> bool: ...
def on_arrival(self) -> bool: ...
def _live_xy() -> Optional[Tuple[int,int]]:
x, y = _get_xy_safe()
if x is None or y is None: return None
return (int(x), int(y))
def _nudge_toward(cur: Tuple[int,int], nxt: Tuple[int,int]) -> None:
cx, cy = cur; nx, ny = nxt
if nx > cx: press_key('right', 1, 0.02)
elif nx < cx: press_key('left', 1, 0.02)
elif ny > cy: press_key('down', 1, 0.02)
elif ny < cy: press_key('up', 1, 0.02)
def _move_one_tile(cur: Tuple[int,int], nxt: Tuple[int,int]) -> bool:
"""Tap toward nxt and confirm we actually landed there via live XY."""
import time as _t
_nudge_toward(cur, nxt)
deadline = _t.monotonic() + STEP_TIMEOUT_S
while _t.monotonic() < deadline:
xy = _live_xy()
if xy == nxt:
return True
_t.sleep(STEP_POLL_S)
return False # step didn't complete
def _path_or_none(start: Tuple[int,int], goal: Tuple[int,int], walkable: set):
path = astar_pathfinding(start, goal, walkable)
if not path or len(path) < 2: return None
return path
def go_to_target(
target: Target,
*,
near_thresh: float = 0.6,
timeout_s: float = 40.0,
tag: str = "NAV",
) -> bool:
"""Watchdog-free navigation: only replans after a failed step.
Integrated fixes:
- Adjacency guard before choosing a direction (prevents backwards picks).
- Early stop while holding if we've reached/passed the segment end.
- Overshoot promotion: if we land further along the path, advance i instead of backtracking.
"""
import time
walkable = set(load_walkable_tiles())
cur = _live_xy()
if cur is None or cur not in walkable:
print(f"[{tag}] invalid start XY {cur}; abort"); return False
goal = target.get_xy()
if goal is None:
print(f"[{tag}] target has no position"); return False
if goal not in walkable:
print(f"[{tag}] goal {goal} not walkable"); return False
t0 = time.time()
while time.time() - t0 < timeout_s:
if not target.is_valid():
print(f"[{tag}] target invalid/despawned"); return False
# Refresh goal (cheap for static harvest nodes)
g = target.get_xy()
if g is None:
print(f"[{tag}] target lost"); return False
goal = g
# Arrival gate: near + action commits
cur = _live_xy()
if cur is not None and _distance_tiles(cur, goal) <= near_thresh:
if target.on_arrival():
return True
if cur is None:
continue
path = _path_or_none(cur, goal, walkable)
if path is None:
print(f"[{tag}] no path {cur}->{goal}"); return False
progressed = False
# Stream straight segments up to 4 tiles to avoid micro-pauses
i = 1
while i < len(path):
nxt = path[i]
if nxt not in walkable:
print(f"[{tag}] path step {nxt} not walkable"); return False
# Build a straight run starting at current index
run_end = i
if i + 1 < len(path):
dx0 = path[i][0] - path[i-1][0]
dy0 = path[i][1] - path[i-1][1]
while run_end + 1 < len(path):
ndx = path[run_end+1][0] - path[run_end][0]
ndy = path[run_end+1][1] - path[run_end][1]
# cap run len to 4 tiles total (i..run_end inclusive)
if (ndx, ndy) == (dx0, dy0) and (run_end - i) < 3:
run_end += 1
else:
break
# Decide direction key from delta of the first step (adjacency guard)
dx = path[i][0] - cur[0]
dy = path[i][1] - cur[1]
if abs(dx) + abs(dy) != 1:
# Not adjacent anymore — replan from current position
break
if dx > 0 and dy == 0: key = 'right'
elif dx < 0 and dy == 0: key = 'left'
elif dy > 0 and dx == 0: key = 'down'
elif dy < 0 and dx == 0: key = 'up'
else:
break # safety: non-cardinal (shouldn't happen)
# Hold the key across the segment and poll live XY
hold_key(key)
# Keep your global STEP_TIMEOUT_S behavior, but allow early exit if we pass run_end
deadline = time.monotonic() + STEP_TIMEOUT_S * max(1, (run_end - i + 1))
try:
while time.monotonic() < deadline:
xy = _live_xy()
if xy:
cur = xy
# If we've reached or progressed past run_end along this path, stop holding now
# 'past' means cur appears later in the same path sequence.
try:
idx_in_path = path.index(cur)
if idx_in_path >= run_end:
break
except ValueError:
# cur not on this computed path snapshot; keep polling until timeout
pass
# still aiming for exact run_end otherwise
if cur == path[run_end]:
break
time.sleep(STEP_POLL_S)
finally:
release_key(key)
# Overshoot promotion: if we landed further along the path, advance i instead of backtracking
promoted = False
if cur in path:
cur_idx = path.index(cur)
if cur_idx >= i:
progressed = True
i = cur_idx + 1
promoted = True
if _distance_tiles(cur, goal) <= near_thresh:
if target.on_arrival():
return True
if promoted:
continue
if cur == path[run_end]:
progressed = True
i = run_end + 1
if _distance_tiles(cur, goal) <= near_thresh:
if target.on_arrival():
return True
continue
else:
# fallback: single-tile nudge toward path[i]
if not _move_one_tile(cur, path[i]):
cur = _live_xy() or cur
break
cur = path[i]
progressed = True
i += 1
if _distance_tiles(cur, goal) <= near_thresh:
if target.on_arrival():
return True
continue
# If we never moved during this path attempt, let the outer loop replan
if not progressed:
# small backoff to avoid hammering when path is unstable
time.sleep(0.01)
print(f"[{tag}] timeout navigating to {goal}")
return False
def start_bot():
"""Start all bot threads"""
global bot_running
bot_running = True
print("[BOT] Time to Kill ****!")
def stop_bot():
"""Stop all bot threads"""
global bot_running
bot_running = False
print("[BOT] STOP Damnit!")
def is_bot_running():
"""Check if the bot is currently running"""
return bot_running
def _combat_targets_present() -> bool:
"""Return True if there are any combat-eligible NPCs currently visible."""
try:
with map_data_lock:
snapshot = list(map_data)
except Exception:
snapshot = list(map_data)
now = time.monotonic()
for item in snapshot:
if item.get("type") != "npc":
continue
uid = item.get("unique_id")
if uid not in MOB_ID_FILTERS:
continue
addr_hex = item.get("addr_hex")
if addr_hex and (now - RECENTLY_KILLED.get(addr_hex, 0.0)) < KILL_QUARANTINE_SEC:
continue
return True
return False
def _run_combat_task(task, task_id, duration_s: int) -> bool:
"""Run combat until the area is clear or the timer expires."""
was_running = is_bot_running()
if not was_running:
start_bot()
time.sleep(0.2) # allow combat thread to spin up
start_ts = time.time()
last_target_seen = time.time() if _combat_targets_present() else None
reason = None
success = True
print(
f"[TASK] Task #{task_id} Combat engagement started "
f"(limit={'∞' if duration_s <= 0 else f'{duration_s}s'})"
)
try:
while True:
if TASK_CANCEL.is_set() or not TASK_MODE:
reason = "cancelled"
success = False
break
now = time.time()
if duration_s > 0 and (now - start_ts) >= duration_s:
reason = "timer"
break
if _combat_targets_present():
last_target_seen = now
else:
if last_target_seen is None:
if (now - start_ts) >= COMBAT_CLEAR_GRACE_SECONDS:
reason = "cleared"
break
elif (now - last_target_seen) >= COMBAT_CLEAR_GRACE_SECONDS:
reason = "cleared"
break
if _sleep_with_cancel(0.5):
reason = "cancelled"
success = False
break
finally:
if not was_running:
stop_bot()
_all_stop()
if success:
if reason == "timer":
print(f"[TASK] Task #{task_id} Combat timer reached {duration_s}s — moving on.")
elif reason == "cleared":
print(
f"[TASK] Task #{task_id} Combat area clear "
f"(no targets for {COMBAT_CLEAR_GRACE_SECONDS:.1f}s)."
)
else:
print(f"[TASK] Task #{task_id} Combat finished.")
else:
print(f"[TASK] Task #{task_id} Combat cancelled.")
return success
def record_direction_points():
"""
Prompts user to click 4 times: Up, Right, Down, Left.
Stores them into global `resurrect_points` in that exact order.
"""
global resurrect_points
resurrect_points.clear()
prompts = ["UP", "RIGHT", "DOWN", "LEFT"]
print("[SETUP] Click the following in your game window when prompted.")
from pynput import mouse
captured = []
def capture_one(label):
print(f" ➜ Click the {label} spot now...")
def on_click(x, y, button, pressed):
if pressed:
captured.append((x, y))
print(f" {label} recorded at {(x, y)}")
return False
with mouse.Listener(on_click=on_click) as listener:
listener.join()
for lab in prompts:
capture_one(lab)
resurrect_points[:] = captured # [Up, Right, Down, Left]
print("[SETUP] Click points set (Up,Right,Down,Left):", resurrect_points)
# --- HARD FREEZE HELPERS ---
def _all_stop():
"""Release any keys that could cause movement/attacking."""
for k in ('ctrl', 'up', 'down', 'left', 'right'):
try:
release_key(k)
except Exception:
pass
# let keyUps register
time.sleep(0.08)
def _pause_movement():
"""Fully pause movement & attacks during pickups/home routine."""
try:
movement_allowed.clear() # gate combat_thread BEFORE we click
except Exception:
pass
_all_stop()
time.sleep(0.07) # debounce to avoid in-flight taps
def _resume_movement():
"""Allow combat thread to continue (after clicks/routines)."""
_all_stop() # belt-and-suspenders
try:
movement_allowed.set()
except Exception:
pass
def _quarantine_cleanup_loop():
"""Housekeeping: forget old RECENTLY_KILLED entries."""
import time as _t
while True:
now = _t.monotonic()
for k, ts in list(RECENTLY_KILLED.items()):
if (now - ts) > KILL_QUARANTINE_SEC:
RECENTLY_KILLED.pop(k, None)
_t.sleep(0.25) # light duty
def _sleep_cancellable(seconds: float, step: float = 0.05):
import time
end = time.monotonic() + float(seconds)
while time.monotonic() < end:
if HARVEST_CANCEL.is_set():
break
time.sleep(step)
def _sleep_with_cancel(seconds: float, step: float = 0.05) -> bool:
"""Sleep in small increments; return True if a cancel flag is set."""
import time
end = time.monotonic() + float(seconds)
while time.monotonic() < end:
if TASK_CANCEL.is_set() or not TASK_MODE:
return True
if HARVEST_CANCEL.is_set():
return True
time.sleep(step)
return False
def block_clicks_for(seconds: float):
import time
global NO_CLICK_UNTIL
NO_CLICK_UNTIL = max(NO_CLICK_UNTIL, time.monotonic() + float(seconds))
def _has_available_target() -> bool:
"""True if any valid target NPC is currently visible on the map."""
try:
if not STOP_HOME_IF_TARGET:
return False
if not map_data:
return False
# Filter to targetable npcs by your MOB_ID_FILTERS and ignore recently killed
now = time.monotonic()
valid = []
for item in map_data:
if item.get("type") != "npc":
continue
uid = item.get("unique_id")
if uid not in MOB_ID_FILTERS:
continue
addr_hex = item.get("addr_hex")
# Optional: ignore just-killed entries in the quarantine window
if addr_hex and (now - RECENTLY_KILLED.get(addr_hex, 0.0)) < KILL_QUARANTINE_SEC:
continue
valid.append(item)
return bool(valid)
except Exception:
return False
def is_attacking() -> bool:
return bool(globals().get('ctrl_pressed', False))
def recently_attacking(addr: str | None, window: float = ATTACK_RECENCY_S) -> bool:
return (addr is not None
and addr == globals().get('last_attack_addr')
and (time.time() - globals().get('last_attack_time', 0.0)) < window)
def read_stat(name):
"""
Reads a single stat using STAT_OFFSETS and the global stat_base.
"""
global stat_base
if stat_base is None:
return None
addr = stat_base + STAT_OFFSETS[name]
try:
return pm.read_int(addr)
except Exception:
return None
def read_all_stats():
"""
Reads all stats into a dictionary.
"""
stats = {}
for k in STAT_OFFSETS:
stats[k] = read_stat(k)
return stats
def update_stats_gui():
"""
Updates GUI with current stats every 0.5s.
"""
while True:
if stat_base:
stats = read_all_stats()
if stats:
# Example simple print (replace with your GUI update call)
gui_text = (
f"EXP: {stats['exp']} | LVL: {stats['level']} | TNL: {stats['tnl']}\n"
f"HP: {stats['max_hp']} | MANA: {stats['max_mana']}\n"
f"DEF: {stats['def']} | PWR: {stats['pwr']} | CRIT: {stats['crit']}\n"
)
print(gui_text) # replace with your GUI label update
time.sleep(0.5)
def _read_facing_live() -> int | None:
"""Return current facing 0..3 from directional_address, or None if unavailable."""
try:
if directional_address and pm:
return int(pm.read_uchar(directional_address)) & 3
except Exception:
pass
return None
def tap_key(key_name: str, times: int = 1, gap: float = 0.08):
"""Best-effort key tap that works with your existing press/release or keyboard lib."""
import time, sys
for _ in range(times):
try:
if 'press_key' in globals() and 'release_key' in globals():
press_key(key_name)
time.sleep(0.03)
release_key(key_name)
elif 'keyboard' in sys.modules:
import keyboard
keyboard.press_and_release(key_name)
else:
print(f"[WARN] No key sender available for {key_name}")
except Exception as e:
print(f"[WARN] tap_key({key_name}) failed: {e}")
time.sleep(gap)
# ═══════════════════════════════════════════════════════════════
# TASK MODE: Helper functions for reading position/direction from RecoTrainer addresses
# ═══════════════════════════════════════════════════════════════
def _read_player_position_cerabot():
"""Read player position from RecoTrainer's direct memory addresses"""
global pm, x_address, y_address
try:
if pm and x_address and y_address:
x = pm.read_short(x_address)
y = pm.read_short(y_address)
return (int(x), int(y))
except Exception as e:
print(f"⚠️ Error reading position: {e}")
return None
def _read_player_direction_cerabot():
"""Read player direction from RecoTrainer's direct memory address"""
global pm, directional_address
try:
if pm and directional_address:
dir_bytes = pm.read_bytes(directional_address, 1)
if dir_bytes and len(dir_bytes) > 0:
d = dir_bytes[0] & 3 # Mask to 0-3
if 0 <= d <= 3:
return d
except Exception as e:
print(f"⚠️ Error reading direction: {e}")
return None
def _read_player_state_cerabot():
"""Read full player state (x, y, direction) - compatible with npc4's scanner interface"""
pos = _read_player_position_cerabot()
if pos is None:
return None
direction = _read_player_direction_cerabot()
if direction is None:
return None
return (pos[0], pos[1], direction)
def _get_xy_safe():
"""Try to read player coords if your script exposes it; otherwise return (None, None)."""
# Only call if present
fn = globals().get("get_player_coords")
if callable(fn):
try:
return fn(pm)
except Exception:
pass
try:
return (pm.read_short(x_address), pm.read_short(y_address))
except Exception:
return (None, None)
def _distance_tiles(a, b):
if a[0] is None or b[0] is None:
return None
dx = a[0] - b[0]
dy = a[1] - b[1]
return (dx*dx + dy*dy) ** 0.5
def _call_first_available(fn_names, *args):
"""Try a list of possible movement functions that may exist in your codebase."""
for name in fn_names:
fn = globals().get(name)
if callable(fn):
try:
# Try (x, y) signature, fall back to ((x,y),) if needed
try:
return fn(*args)
except TypeError:
return fn(args[0])
except Exception as e:
print(f"[WARN] {name}{args} failed: {e}")
raise RuntimeError("No compatible movement function found. "
"Implement one of: walk_to_tile / walk_to / navigate_to / move_to_tile / goto_tile")
def _go_to_home_blocking():
"""Walk to HOME_POS using the live navigator; abort early if targets appear."""
if not isinstance(HOME_POS, tuple) or len(HOME_POS) != 2:
print("[HOME] HOME_POS not set. Skipping travel.")
return "SKIP"
# Abort before starting if a target is already visible
if _has_available_target():
print("[HOME] Abort: target spotted before travel.")
return "ABORT"
hx, hy = int(HOME_POS[0]), int(HOME_POS[1])
class _HomeTarget:
def get_xy(self): # navigator asks where to go
return (hx, hy)
def is_valid(self): # always a valid goal
return True
def on_arrival(self): # nothing special on arrival
return True
# Use the same pathing as harvest/combat so we follow walkable.json
try:
go_to_target(_HomeTarget(), tag="HOME",
near_thresh=globals().get("HOME_NEAR_THRESH", 0.8),
timeout_s=globals().get("HOME_TRAVEL_TIMEOUT", 30.0))
except Exception as e:
print(f"[HOME] Navigator error: {e!r}")
return "TIMEOUT"
# Poll until close or timeout so the caller gets a status (kept from old flow)
import time
start = time.time()
HOME_TRAVEL_TIMEOUT = float(globals().get("HOME_TRAVEL_TIMEOUT", 30.0))
HOME_NEAR_THRESH = float(globals().get("HOME_NEAR_THRESH", 0.8))
while time.time() - start < HOME_TRAVEL_TIMEOUT:
# abort if a target pops up during the trip
if _has_available_target():
print("[HOME] Abort: target spotted during travel.")
return "ABORT"
cur = _get_xy_safe()
dist = _distance_tiles(cur, (hx, hy))
if dist is not None and dist <= HOME_NEAR_THRESH:
print(f"[HOME] Arrived at HOME {HOME_POS}")
return "ARRIVED"
time.sleep(0.2)
print("[HOME] Travel timeout.")
return "TIMEOUT"
def _home_routine():
"""
Pause combat/wandering, wait for any pickup clicks to finish,
go home, run F11/F5 sequence (interruptible), then resume.
Behavior:
- If a target is visible at ANY point, immediately abort buffs/sit and resume combat.
- Travel to home can also abort early if a target appears (handled in _go_to_home_blocking()).
- F5 is applied in small steps so the routine can bail out mid-buff.
"""
if home_routine_running.is_set():
return
home_routine_running.set()
print("[HOME] Starting safety routine…")
# Helper: wait until clicks are done (with a safety timeout)
def _wait_for_clicks(label: str, timeout_s: float = 6.0):
try:
start = time.time()
while clicks_in_progress.is_set():
if timeout_s and (time.time() - start) > timeout_s:
print(f"[HOME] {label}: waited {timeout_s}s for clicks; continuing.")
break
time.sleep(0.01)
except NameError:
# clicks_in_progress not defined; nothing to wait for
pass
def _finish():
# Resume normal behavior and clear guard
try:
_resume_movement()
except Exception:
pass
home_routine_running.clear()
try:
# If a loot sweep is in progress, wait for it to finish
_wait_for_clicks("pre-travel")
# Freeze movement/attacks
try:
_pause_movement()
except Exception:
pass
try:
release_key('ctrl')
except Exception:
pass
# Focus game window if available (best-effort)
try:
if game_win:
try:
game_win.activate()
time.sleep(0.2)
except Exception:
pass
except NameError:
pass
# ——— Immediate abort if a target is already available ———
try:
if _has_available_target():
print("[HOME] Abort: target visible before travel; resuming combat.")
return
except NameError:
# If helper/toggle not present, fall back to legacy flow
pass
# ——— Travel to home tile (blocking & abortable) ———
travel_status = None
try:
travel_status = _go_to_home_blocking()
except Exception as e:
print(f"[HOME] Travel error: {e!r}")
travel_status = "TIMEOUT"
if travel_status in ("ABORT",):
print("[HOME] Travel aborted due to target; resuming combat.")
return
if travel_status in ("SKIP", "TIMEOUT"):
try:
if _has_available_target():
print("[HOME] Target visible after travel failure; resuming combat.")
return
except NameError:
pass
# Force the normal sit/buff flow to run *right here* on timeout
print(f"[HOME] Travel status={travel_status}; forcing sit/buffs here.")
# (no return — fall through to the F11/F5 sit/buff logic below
# Just in case a new click sweep started during travel, wait again
_wait_for_clicks("pre-buff")
# ——— Abort if target visible before sit/buffs ———
try:
if _has_available_target():
print("[HOME] Abort: target visible pre-buff; resuming combat.")
return
except NameError:
pass
# ——— F11 (sit), then F5 in small, interruptible steps ———
try:
tap_key('f11', times=1)
block_clicks_for(3.0)
except Exception:
pass