From 3cd853a87c31b10876fce52f462a76cf0df62a85 Mon Sep 17 00:00:00 2001 From: Q00 Date: Sun, 24 May 2026 19:24:08 +0900 Subject: [PATCH 1/3] fix: make watchdog controls replay safe --- src/ouroboros/runtime/controls.py | 26 +++++++++++++++++--- src/ouroboros/runtime/watchdog.py | 25 ++++++++++++++++++- tests/unit/runtime/test_controls.py | 35 +++++++++++++++++++++----- tests/unit/runtime/test_watchdog.py | 38 +++++++++++++++++++++++++++++ 4 files changed, 114 insertions(+), 10 deletions(-) diff --git a/src/ouroboros/runtime/controls.py b/src/ouroboros/runtime/controls.py index 94555e3c1..5d759ef83 100644 --- a/src/ouroboros/runtime/controls.py +++ b/src/ouroboros/runtime/controls.py @@ -14,6 +14,7 @@ from __future__ import annotations from dataclasses import dataclass +import os from pathlib import Path from typing import Any @@ -75,14 +76,23 @@ def load_runtime_controls(path: Path | str | None = None) -> RuntimeControls: Behaviour: - - ``path is None`` → return ``RuntimeControls()`` with defaults. + - ``path is None`` → return defaults, optionally overridden by + ``OUROBOROS_SESSION_WALL_CLOCK_SECONDS``. - ``path`` exists but no ``runtime_controls`` key → defaults. - YAML parse error → ``ValueError`` so callers fail loudly. - Unrecognized keys under ``runtime_controls`` → ``ValueError``; the v2 expansion path adds new keys only through code review. """ if path is None: - return RuntimeControls() + env_budget = os.environ.get("OUROBOROS_SESSION_WALL_CLOCK_SECONDS") + if env_budget is None: + return RuntimeControls() + try: + budget = int(env_budget) + except ValueError as exc: + msg = "OUROBOROS_SESSION_WALL_CLOCK_SECONDS must be an integer" + raise ValueError(msg) from exc + return RuntimeControls(session_wall_clock_seconds=budget) resolved = Path(path) if not resolved.is_file(): msg = f"runtime_controls config not found at {resolved}" @@ -104,7 +114,17 @@ def load_runtime_controls(path: Path | str | None = None) -> RuntimeControls: if not isinstance(block, dict): msg = f"runtime_controls block at {resolved} must be a mapping; got {type(block).__name__}" raise ValueError(msg) - allowed = {"session_wall_clock_seconds"} + # Keep compatibility with the existing runtime_controls config block. + # This loader consumes only the Auto session wall-clock knob and ignores + # the older generation/MCP controls owned by ouroboros.config.loader. + allowed = { + "session_wall_clock_seconds", + "mcp_tool_timeout_seconds", + "generation_idle_timeout_seconds", + "generation_no_progress_timeout_seconds", + "generation_safety_timeout_seconds", + "watchdog_poll_seconds", + } unknown = set(block.keys()) - allowed if unknown: msg = ( diff --git a/src/ouroboros/runtime/watchdog.py b/src/ouroboros/runtime/watchdog.py index fe245d276..af91e5a76 100644 --- a/src/ouroboros/runtime/watchdog.py +++ b/src/ouroboros/runtime/watchdog.py @@ -36,7 +36,7 @@ from collections.abc import Callable from dataclasses import dataclass from datetime import UTC, datetime -from typing import Protocol +from typing import Protocol, runtime_checkable from ouroboros.events.base import BaseEvent from ouroboros.runtime.controls import RuntimeControls @@ -64,6 +64,7 @@ cancel event is observed.""" +@runtime_checkable class _EventAppender(Protocol): """Minimal protocol the watchdog needs from the EventStore. @@ -75,6 +76,19 @@ class _EventAppender(Protocol): async def append(self, event: BaseEvent) -> None: ... +@runtime_checkable +class _WatchdogEventReader(Protocol): + """Optional EventStore replay hook used for restart idempotency.""" + + async def query_events( + self, + aggregate_id: str | None = None, + event_type: str | None = None, + limit: int = 50, + offset: int = 0, + ) -> list[BaseEvent]: ... + + @dataclass(frozen=True, slots=True) class WatchdogDecision: """Outcome of a single :meth:`Watchdog.check` call when the budget @@ -180,6 +194,15 @@ async def check( budget = self._controls.session_wall_clock_seconds if elapsed_seconds <= budget: return None + if isinstance(self._appender, _WatchdogEventReader): + existing = await self._appender.query_events( + aggregate_id=session_id, + event_type=WATCHDOG_CANCEL_EVENT_TYPE, + limit=1, + ) + if existing: + self._fired_sessions.add(session_id) + return None decision = WatchdogDecision( session_id=session_id, fired_at=now, diff --git a/tests/unit/runtime/test_controls.py b/tests/unit/runtime/test_controls.py index 807233606..04794f852 100644 --- a/tests/unit/runtime/test_controls.py +++ b/tests/unit/runtime/test_controls.py @@ -43,6 +43,21 @@ def test_load_returns_defaults_when_path_is_none() -> None: assert load_runtime_controls(None) == RuntimeControls() +def test_load_path_none_uses_env_override(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OUROBOROS_SESSION_WALL_CLOCK_SECONDS", "0") + + controls = load_runtime_controls(None) + + assert controls == RuntimeControls(session_wall_clock_seconds=0) + + +def test_load_path_none_rejects_bad_env_override(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("OUROBOROS_SESSION_WALL_CLOCK_SECONDS", "soon") + + with pytest.raises(ValueError, match="OUROBOROS_SESSION_WALL_CLOCK_SECONDS"): + load_runtime_controls(None) + + def test_load_missing_file_raises(tmp_path: Path) -> None: with pytest.raises(FileNotFoundError): load_runtime_controls(tmp_path / "nope.yaml") @@ -89,16 +104,24 @@ def test_load_non_mapping_block_raises(tmp_path: Path) -> None: load_runtime_controls(fixture) -def test_load_unknown_key_rejected(tmp_path: Path) -> None: - """Unknown keys under ``runtime_controls`` are rejected — v2 expansion - keys (idle/no_progress/safety, directive vocabulary, …) require code - review to land, not silent acceptance.""" - fixture = tmp_path / "extra_key.yaml" +def test_load_existing_runtime_controls_keys_ignored(tmp_path: Path) -> None: + """Existing config-owned runtime_controls keys remain compatible.""" + fixture = tmp_path / "existing_keys.yaml" fixture.write_text( "runtime_controls:\n" " session_wall_clock_seconds: 600\n" - " idle_timeout_seconds: 60\n" # v2 expansion path; not yet here. + " mcp_tool_timeout_seconds: 0\n" + " generation_idle_timeout_seconds: 7200\n" + " generation_no_progress_timeout_seconds: 14400\n" + " generation_safety_timeout_seconds: 0\n" + " watchdog_poll_seconds: 15\n" ) + assert load_runtime_controls(fixture) == RuntimeControls(session_wall_clock_seconds=600) + + +def test_load_unknown_key_rejected(tmp_path: Path) -> None: + fixture = tmp_path / "extra_key.yaml" + fixture.write_text("runtime_controls:\n idle_timeout_seconds: 60\n") with pytest.raises(ValueError, match="idle_timeout_seconds"): load_runtime_controls(fixture) diff --git a/tests/unit/runtime/test_watchdog.py b/tests/unit/runtime/test_watchdog.py index d4c379620..f8e9499a5 100644 --- a/tests/unit/runtime/test_watchdog.py +++ b/tests/unit/runtime/test_watchdog.py @@ -26,6 +26,22 @@ def __init__(self) -> None: async def append(self, event: BaseEvent) -> None: self.events.append(event) + async def query_events( + self, + aggregate_id: str | None = None, + event_type: str | None = None, + limit: int = 50, + offset: int = 0, + ) -> list[BaseEvent]: + del offset + events = [ + event + for event in self.events + if (aggregate_id is None or event.aggregate_id == aggregate_id) + and (event_type is None or event.type == event_type) + ] + return events[:limit] + def _fixed_now(value: datetime): return lambda: value @@ -124,6 +140,28 @@ async def test_idempotent_within_instance() -> None: assert watchdog.has_fired_for("auto_dup") +@pytest.mark.asyncio +async def test_idempotent_across_new_instance_when_event_exists() -> None: + started = datetime(2026, 5, 22, 10, 0, 0, tzinfo=UTC) + appender = _CapturingAppender() + first = Watchdog( + controls=RuntimeControls(session_wall_clock_seconds=10), + event_appender=appender, + now=_fixed_now(started + timedelta(seconds=60)), + ) + second = Watchdog( + controls=RuntimeControls(session_wall_clock_seconds=10), + event_appender=appender, + now=_fixed_now(started + timedelta(seconds=120)), + ) + + assert await first.check(session_id="auto_restart", session_started_at=started) is not None + assert await second.check(session_id="auto_restart", session_started_at=started) is None + + assert len(appender.events) == 1 + assert second.has_fired_for("auto_restart") + + @pytest.mark.asyncio async def test_multiple_sessions_tracked_separately() -> None: started = datetime(2026, 5, 22, 10, 0, 0, tzinfo=UTC) From 5915c27a758c1932f986f3a679053af4caee74f6 Mon Sep 17 00:00:00 2001 From: Q00 Date: Mon, 25 May 2026 19:11:11 +0900 Subject: [PATCH 2/3] fix(runtime): replay watchdog cancel decisions Return a WatchdogDecision when a persisted cancel event already exists so resumed auto sessions still transition to BLOCKED without appending duplicate events. This preserves cancellation semantics across the EventStore/state-save crash window.\n\nServices: shared\nAffected files:\n- src/ouroboros/runtime/watchdog.py\n- tests/unit/auto/test_pipeline_watchdog_integration.py\n- tests/unit/runtime/test_watchdog.py --- src/ouroboros/runtime/watchdog.py | 35 ++++++++- .../test_pipeline_watchdog_integration.py | 75 +++++++++++++++++++ tests/unit/runtime/test_watchdog.py | 6 +- 3 files changed, 114 insertions(+), 2 deletions(-) diff --git a/src/ouroboros/runtime/watchdog.py b/src/ouroboros/runtime/watchdog.py index af91e5a76..ff06fc509 100644 --- a/src/ouroboros/runtime/watchdog.py +++ b/src/ouroboros/runtime/watchdog.py @@ -202,7 +202,20 @@ async def check( ) if existing: self._fired_sessions.add(session_id) - return None + event = existing[0] + fired_at = _coerce_datetime(event.data.get("fired_at")) or now + return WatchdogDecision( + session_id=session_id, + fired_at=fired_at, + elapsed_seconds=_coerce_int( + event.data.get("elapsed_seconds"), + default=elapsed_seconds, + ), + configured_budget_seconds=_coerce_int( + event.data.get("configured_budget_seconds"), + default=budget, + ), + ) decision = WatchdogDecision( session_id=session_id, fired_at=now, @@ -229,3 +242,23 @@ def has_fired_for(self, session_id: str) -> bool: """Test/inspection helper — True iff this instance has already fired the watchdog for *session_id*.""" return session_id in self._fired_sessions + + +def _coerce_datetime(value: object) -> datetime | None: + if not isinstance(value, str): + return None + try: + return datetime.fromisoformat(value) + except ValueError: + return None + + +def _coerce_int(value: object, *, default: int) -> int: + if isinstance(value, int): + return value + if isinstance(value, str): + try: + return int(value) + except ValueError: + return default + return default diff --git a/tests/unit/auto/test_pipeline_watchdog_integration.py b/tests/unit/auto/test_pipeline_watchdog_integration.py index 25f78ea7e..b5f46ed3e 100644 --- a/tests/unit/auto/test_pipeline_watchdog_integration.py +++ b/tests/unit/auto/test_pipeline_watchdog_integration.py @@ -77,6 +77,22 @@ def __init__(self) -> None: async def append(self, event: BaseEvent) -> None: self.events.append(event) + async def query_events( + self, + aggregate_id: str | None = None, + event_type: str | None = None, + limit: int = 50, + offset: int = 0, + ) -> list[BaseEvent]: + del offset + events = [ + event + for event in self.events + if (aggregate_id is None or event.aggregate_id == aggregate_id) + and (event_type is None or event.type == event_type) + ] + return events[:limit] + def _fill_ready(ledger: SeedDraftLedger) -> None: from ouroboros.auto.ledger import LedgerEntry, LedgerSource, LedgerStatus @@ -249,6 +265,65 @@ async def generate_seed(_session_id: str) -> Seed: # pragma: no cover - watchdo assert event.data["reason"] == "wall_clock_exceeded" +@pytest.mark.asyncio +async def test_pipeline_blocks_when_prior_watchdog_cancel_event_exists(tmp_path) -> None: + """Replay must preserve a watchdog cancellation even if state was not saved.""" + + async def start(goal: str, cwd: str) -> InterviewTurn: # noqa: ARG001 + raise AssertionError("interview should not start after watchdog cancellation replay") + + async def answer(session_id: str, text: str) -> InterviewTurn: # noqa: ARG001 + raise AssertionError("interview should not answer after watchdog cancellation replay") + + async def generate_seed(_session_id: str) -> Seed: + raise AssertionError("seed_generator should not run after watchdog cancellation replay") + + state = AutoPipelineState(goal="Build a CLI", cwd=str(tmp_path)) + started = datetime.fromisoformat(state.created_at) + fired_at = started + timedelta(seconds=120) + + appender = _CapturingAppender() + appender.events.append( + BaseEvent( + type=WATCHDOG_CANCEL_EVENT_TYPE, + aggregate_type=WATCHDOG_AGGREGATE_TYPE, + aggregate_id=state.auto_session_id, + data={ + "reason": "wall_clock_exceeded", + "session_started_at": started.isoformat(), + "fired_at": fired_at.isoformat(), + "elapsed_seconds": 120, + "configured_budget_seconds": 60, + }, + ) + ) + watchdog = Watchdog( + controls=RuntimeControls(session_wall_clock_seconds=60), + event_appender=appender, + now=lambda: started + timedelta(seconds=180), + ) + + driver = AutoInterviewDriver( + FunctionInterviewBackend(start, answer), + store=AutoStore(tmp_path), + max_rounds=1, + ) + pipeline = AutoPipeline( + driver, + generate_seed, + store=AutoStore(tmp_path), + watchdog=watchdog, + ) + + result = await pipeline.run(state) + + assert result.status == "blocked" + assert state.phase is AutoPhase.BLOCKED + assert result.stop_reason_code == WATCHDOG_STOP_REASON_CODE + assert "120s" in (state.last_error or "") + assert len(appender.events) == 1 + + @pytest.mark.asyncio async def test_pipeline_with_disabled_watchdog_never_fires(tmp_path) -> None: """A watchdog with ``session_wall_clock_seconds=0`` is the opt-out diff --git a/tests/unit/runtime/test_watchdog.py b/tests/unit/runtime/test_watchdog.py index f8e9499a5..368f99807 100644 --- a/tests/unit/runtime/test_watchdog.py +++ b/tests/unit/runtime/test_watchdog.py @@ -142,6 +142,7 @@ async def test_idempotent_within_instance() -> None: @pytest.mark.asyncio async def test_idempotent_across_new_instance_when_event_exists() -> None: + """A restarted watchdog replays cancellation without appending again.""" started = datetime(2026, 5, 22, 10, 0, 0, tzinfo=UTC) appender = _CapturingAppender() first = Watchdog( @@ -156,8 +157,11 @@ async def test_idempotent_across_new_instance_when_event_exists() -> None: ) assert await first.check(session_id="auto_restart", session_started_at=started) is not None - assert await second.check(session_id="auto_restart", session_started_at=started) is None + replayed = await second.check(session_id="auto_restart", session_started_at=started) + assert replayed is not None + assert replayed.session_id == "auto_restart" + assert replayed.elapsed_seconds == 60 assert len(appender.events) == 1 assert second.has_fired_for("auto_restart") From 807bdd96057a8878fb85b78301a9e5913cc3b0da Mon Sep 17 00:00:00 2001 From: Q00 Date: Mon, 25 May 2026 20:51:54 +0900 Subject: [PATCH 3/3] fix(runtime): replay watchdog cancels before controls --- src/ouroboros/runtime/watchdog.py | 67 ++++++++++++------- .../test_pipeline_watchdog_integration.py | 8 ++- tests/unit/runtime/test_watchdog.py | 40 +++++++++++ 3 files changed, 89 insertions(+), 26 deletions(-) diff --git a/src/ouroboros/runtime/watchdog.py b/src/ouroboros/runtime/watchdog.py index ff06fc509..ae8bbfbb4 100644 --- a/src/ouroboros/runtime/watchdog.py +++ b/src/ouroboros/runtime/watchdog.py @@ -185,37 +185,23 @@ async def check( ``runtime.watchdog.cancel`` event is appended to the configured event appender. """ - if not self._controls.watchdog_enabled: - return None if session_id in self._fired_sessions: return None now = self._now() elapsed_seconds = int((now - session_started_at).total_seconds()) budget = self._controls.session_wall_clock_seconds + replayed = await self._replay_existing_decision( + session_id=session_id, + fired_at_default=now, + elapsed_seconds_default=elapsed_seconds, + budget_default=budget, + ) + if replayed is not None: + return replayed + if not self._controls.watchdog_enabled: + return None if elapsed_seconds <= budget: return None - if isinstance(self._appender, _WatchdogEventReader): - existing = await self._appender.query_events( - aggregate_id=session_id, - event_type=WATCHDOG_CANCEL_EVENT_TYPE, - limit=1, - ) - if existing: - self._fired_sessions.add(session_id) - event = existing[0] - fired_at = _coerce_datetime(event.data.get("fired_at")) or now - return WatchdogDecision( - session_id=session_id, - fired_at=fired_at, - elapsed_seconds=_coerce_int( - event.data.get("elapsed_seconds"), - default=elapsed_seconds, - ), - configured_budget_seconds=_coerce_int( - event.data.get("configured_budget_seconds"), - default=budget, - ), - ) decision = WatchdogDecision( session_id=session_id, fired_at=now, @@ -238,6 +224,39 @@ async def check( self._fired_sessions.add(session_id) return decision + async def _replay_existing_decision( + self, + *, + session_id: str, + fired_at_default: datetime, + elapsed_seconds_default: int, + budget_default: int, + ) -> WatchdogDecision | None: + """Replay a persisted cancel event before evaluating current controls.""" + if isinstance(self._appender, _WatchdogEventReader): + existing = await self._appender.query_events( + aggregate_id=session_id, + event_type=WATCHDOG_CANCEL_EVENT_TYPE, + limit=1, + ) + if existing: + self._fired_sessions.add(session_id) + event = existing[0] + fired_at = _coerce_datetime(event.data.get("fired_at")) or fired_at_default + return WatchdogDecision( + session_id=session_id, + fired_at=fired_at, + elapsed_seconds=_coerce_int( + event.data.get("elapsed_seconds"), + default=elapsed_seconds_default, + ), + configured_budget_seconds=_coerce_int( + event.data.get("configured_budget_seconds"), + default=budget_default, + ), + ) + return None + def has_fired_for(self, session_id: str) -> bool: """Test/inspection helper — True iff this instance has already fired the watchdog for *session_id*.""" diff --git a/tests/unit/auto/test_pipeline_watchdog_integration.py b/tests/unit/auto/test_pipeline_watchdog_integration.py index b5f46ed3e..e3f66a5cc 100644 --- a/tests/unit/auto/test_pipeline_watchdog_integration.py +++ b/tests/unit/auto/test_pipeline_watchdog_integration.py @@ -266,7 +266,11 @@ async def generate_seed(_session_id: str) -> Seed: # pragma: no cover - watchdo @pytest.mark.asyncio -async def test_pipeline_blocks_when_prior_watchdog_cancel_event_exists(tmp_path) -> None: +@pytest.mark.parametrize("resumed_budget", [0, 10_000]) +async def test_pipeline_blocks_when_prior_watchdog_cancel_event_exists( + tmp_path, + resumed_budget: int, +) -> None: """Replay must preserve a watchdog cancellation even if state was not saved.""" async def start(goal: str, cwd: str) -> InterviewTurn: # noqa: ARG001 @@ -298,7 +302,7 @@ async def generate_seed(_session_id: str) -> Seed: ) ) watchdog = Watchdog( - controls=RuntimeControls(session_wall_clock_seconds=60), + controls=RuntimeControls(session_wall_clock_seconds=resumed_budget), event_appender=appender, now=lambda: started + timedelta(seconds=180), ) diff --git a/tests/unit/runtime/test_watchdog.py b/tests/unit/runtime/test_watchdog.py index 368f99807..0105743e7 100644 --- a/tests/unit/runtime/test_watchdog.py +++ b/tests/unit/runtime/test_watchdog.py @@ -166,6 +166,46 @@ async def test_idempotent_across_new_instance_when_event_exists() -> None: assert second.has_fired_for("auto_restart") +@pytest.mark.asyncio +@pytest.mark.parametrize("resumed_budget", [0, 10_000]) +async def test_existing_cancel_replays_before_current_budget_gates(resumed_budget: int) -> None: + """Persisted cancellation remains terminal even if resume controls change.""" + started = datetime(2026, 5, 22, 10, 0, 0, tzinfo=UTC) + fired_at = started + timedelta(seconds=60) + appender = _CapturingAppender() + appender.events.append( + BaseEvent( + type=WATCHDOG_CANCEL_EVENT_TYPE, + aggregate_type=WATCHDOG_AGGREGATE_TYPE, + aggregate_id="auto_resume_control_drift", + data={ + "reason": "wall_clock_exceeded", + "session_started_at": started.isoformat(), + "fired_at": fired_at.isoformat(), + "elapsed_seconds": 60, + "configured_budget_seconds": 10, + }, + ) + ) + watchdog = Watchdog( + controls=RuntimeControls(session_wall_clock_seconds=resumed_budget), + event_appender=appender, + now=_fixed_now(started + timedelta(seconds=120)), + ) + + replayed = await watchdog.check( + session_id="auto_resume_control_drift", + session_started_at=started, + ) + + assert replayed is not None + assert replayed.fired_at == fired_at + assert replayed.elapsed_seconds == 60 + assert replayed.configured_budget_seconds == 10 + assert len(appender.events) == 1 + assert watchdog.has_fired_for("auto_resume_control_drift") + + @pytest.mark.asyncio async def test_multiple_sessions_tracked_separately() -> None: started = datetime(2026, 5, 22, 10, 0, 0, tzinfo=UTC)