Skip to content

Commit 3bfa608

Browse files
bpiwowarclaude
andcommitted
fix(scheduler): dedupe ExperimentJobStateEvent writes per job
notify_job_state is called on every JobProgressEvent forwarded from the job process, which appended a duplicate ExperimentJobStateEvent to the experiment event log even when the scheduler_state had not changed, producing huge logs. Track the last written (scheduler_state, failure_reason) per (experiment, run, job) and skip the file write when unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent dd80a93 commit 3bfa608

1 file changed

Lines changed: 22 additions & 1 deletion

File tree

src/experimaestro/scheduler/base.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,15 @@ def __init__(self, name: str = "Global"):
172172
# Track which workspaces have logging setup
173173
self._workspace_logging_setup: set[Path] = set()
174174

175+
# Last (scheduler_state, failure_reason) written to each experiment's
176+
# event file per job. Used to deduplicate ExperimentJobStateEvent
177+
# writes in notify_job_state (avoids huge logs when notify_job_state
178+
# is called repeatedly without an actual scheduler_state change, e.g.
179+
# on every JobProgressEvent from the job process).
180+
self._last_written_scheduler_state: Dict[
181+
tuple[str, str, str], tuple[str, Optional[str]]
182+
] = {}
183+
175184
@staticmethod
176185
def has_instance() -> bool:
177186
"""Check if a scheduler instance exists without creating one"""
@@ -890,7 +899,19 @@ def notify_job_state(self, job: Job):
890899
)
891900
self._notify_state_listeners_async(event)
892901

893-
# Write to experiment event file for offline monitoring
902+
# Write to experiment event file for offline monitoring,
903+
# but only when the scheduler_state actually changed for this job.
904+
# notify_job_state may be called many times for the same job
905+
# (e.g. on every progress event from the job process); without
906+
# this dedup we would append a duplicate ExperimentJobStateEvent
907+
# to the experiment event log every time.
908+
last_key = (xp.experiment_id, xp.run_id, job.identifier)
909+
last = self._last_written_scheduler_state.get(last_key)
910+
current = (event.scheduler_state, failure_reason)
911+
if last == current:
912+
continue
913+
self._last_written_scheduler_state[last_key] = current
914+
894915
if xp._event_writer is not None:
895916
try:
896917
xp._event_writer.write_event(event)

0 commit comments

Comments
 (0)