Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/ouroboros/runtime/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import annotations

from dataclasses import dataclass
import os
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -75,14 +76,23 @@ def load_runtime_controls(path: Path | str | None = None) -> RuntimeControls:

Behaviour:

- ``path is None`` → return ``RuntimeControls()`` with defaults.
- ``path is None`` → return defaults, optionally overridden by
``OUROBOROS_SESSION_WALL_CLOCK_SECONDS``.
- ``path`` exists but no ``runtime_controls`` key → defaults.
- YAML parse error → ``ValueError`` so callers fail loudly.
- Unrecognized keys under ``runtime_controls`` → ``ValueError``;
the v2 expansion path adds new keys only through code review.
"""
if path is None:
return RuntimeControls()
env_budget = os.environ.get("OUROBOROS_SESSION_WALL_CLOCK_SECONDS")
if env_budget is None:
return RuntimeControls()
try:
budget = int(env_budget)
except ValueError as exc:
msg = "OUROBOROS_SESSION_WALL_CLOCK_SECONDS must be an integer"
raise ValueError(msg) from exc
return RuntimeControls(session_wall_clock_seconds=budget)
resolved = Path(path)
if not resolved.is_file():
msg = f"runtime_controls config not found at {resolved}"
Expand All @@ -104,7 +114,17 @@ def load_runtime_controls(path: Path | str | None = None) -> RuntimeControls:
if not isinstance(block, dict):
msg = f"runtime_controls block at {resolved} must be a mapping; got {type(block).__name__}"
raise ValueError(msg)
allowed = {"session_wall_clock_seconds"}
# Keep compatibility with the existing runtime_controls config block.
# This loader consumes only the Auto session wall-clock knob and ignores
# the older generation/MCP controls owned by ouroboros.config.loader.
allowed = {
"session_wall_clock_seconds",
"mcp_tool_timeout_seconds",
"generation_idle_timeout_seconds",
"generation_no_progress_timeout_seconds",
"generation_safety_timeout_seconds",
"watchdog_poll_seconds",
}
unknown = set(block.keys()) - allowed
if unknown:
msg = (
Expand Down
81 changes: 78 additions & 3 deletions src/ouroboros/runtime/watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Protocol
from typing import Protocol, runtime_checkable

from ouroboros.events.base import BaseEvent
from ouroboros.runtime.controls import RuntimeControls
Expand Down Expand Up @@ -64,6 +64,7 @@
cancel event is observed."""


@runtime_checkable
class _EventAppender(Protocol):
"""Minimal protocol the watchdog needs from the EventStore.

Expand All @@ -75,6 +76,19 @@ class _EventAppender(Protocol):
async def append(self, event: BaseEvent) -> None: ...


@runtime_checkable
class _WatchdogEventReader(Protocol):
"""Optional EventStore replay hook used for restart idempotency."""

async def query_events(
self,
aggregate_id: str | None = None,
event_type: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[BaseEvent]: ...


@dataclass(frozen=True, slots=True)
class WatchdogDecision:
"""Outcome of a single :meth:`Watchdog.check` call when the budget
Expand Down Expand Up @@ -171,13 +185,21 @@ async def check(
``runtime.watchdog.cancel`` event is appended to the
configured event appender.
"""
if not self._controls.watchdog_enabled:
return None
if session_id in self._fired_sessions:
return None
now = self._now()
elapsed_seconds = int((now - session_started_at).total_seconds())
budget = self._controls.session_wall_clock_seconds
replayed = await self._replay_existing_decision(
session_id=session_id,
fired_at_default=now,
elapsed_seconds_default=elapsed_seconds,
budget_default=budget,
)
if replayed is not None:
return replayed
if not self._controls.watchdog_enabled:
return None
if elapsed_seconds <= budget:
return None
decision = WatchdogDecision(
Expand All @@ -202,7 +224,60 @@ async def check(
self._fired_sessions.add(session_id)
return decision

async def _replay_existing_decision(
self,
*,
session_id: str,
fired_at_default: datetime,
elapsed_seconds_default: int,
budget_default: int,
) -> WatchdogDecision | None:
"""Replay a persisted cancel event before evaluating current controls."""
if isinstance(self._appender, _WatchdogEventReader):
existing = await self._appender.query_events(
aggregate_id=session_id,
event_type=WATCHDOG_CANCEL_EVENT_TYPE,
limit=1,
)
if existing:
self._fired_sessions.add(session_id)
event = existing[0]
fired_at = _coerce_datetime(event.data.get("fired_at")) or fired_at_default
return WatchdogDecision(
session_id=session_id,
fired_at=fired_at,
elapsed_seconds=_coerce_int(
event.data.get("elapsed_seconds"),
default=elapsed_seconds_default,
),
configured_budget_seconds=_coerce_int(
event.data.get("configured_budget_seconds"),
default=budget_default,
),
)
return None

def has_fired_for(self, session_id: str) -> bool:
"""Test/inspection helper — True iff this instance has already
fired the watchdog for *session_id*."""
return session_id in self._fired_sessions


def _coerce_datetime(value: object) -> datetime | None:
if not isinstance(value, str):
return None
try:
return datetime.fromisoformat(value)
except ValueError:
return None


def _coerce_int(value: object, *, default: int) -> int:
if isinstance(value, int):
return value
if isinstance(value, str):
try:
return int(value)
except ValueError:
return default
return default
79 changes: 79 additions & 0 deletions tests/unit/auto/test_pipeline_watchdog_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ def __init__(self) -> None:
async def append(self, event: BaseEvent) -> None:
self.events.append(event)

async def query_events(
self,
aggregate_id: str | None = None,
event_type: str | None = None,
limit: int = 50,
offset: int = 0,
) -> list[BaseEvent]:
del offset
events = [
event
for event in self.events
if (aggregate_id is None or event.aggregate_id == aggregate_id)
and (event_type is None or event.type == event_type)
]
return events[:limit]


def _fill_ready(ledger: SeedDraftLedger) -> None:
from ouroboros.auto.ledger import LedgerEntry, LedgerSource, LedgerStatus
Expand Down Expand Up @@ -249,6 +265,69 @@ async def generate_seed(_session_id: str) -> Seed: # pragma: no cover - watchdo
assert event.data["reason"] == "wall_clock_exceeded"


@pytest.mark.asyncio
@pytest.mark.parametrize("resumed_budget", [0, 10_000])
async def test_pipeline_blocks_when_prior_watchdog_cancel_event_exists(
tmp_path,
resumed_budget: int,
) -> None:
"""Replay must preserve a watchdog cancellation even if state was not saved."""

async def start(goal: str, cwd: str) -> InterviewTurn: # noqa: ARG001
raise AssertionError("interview should not start after watchdog cancellation replay")

async def answer(session_id: str, text: str) -> InterviewTurn: # noqa: ARG001
raise AssertionError("interview should not answer after watchdog cancellation replay")

async def generate_seed(_session_id: str) -> Seed:
raise AssertionError("seed_generator should not run after watchdog cancellation replay")

state = AutoPipelineState(goal="Build a CLI", cwd=str(tmp_path))
started = datetime.fromisoformat(state.created_at)
fired_at = started + timedelta(seconds=120)

appender = _CapturingAppender()
appender.events.append(
BaseEvent(
type=WATCHDOG_CANCEL_EVENT_TYPE,
aggregate_type=WATCHDOG_AGGREGATE_TYPE,
aggregate_id=state.auto_session_id,
data={
"reason": "wall_clock_exceeded",
"session_started_at": started.isoformat(),
"fired_at": fired_at.isoformat(),
"elapsed_seconds": 120,
"configured_budget_seconds": 60,
},
)
)
watchdog = Watchdog(
controls=RuntimeControls(session_wall_clock_seconds=resumed_budget),
event_appender=appender,
now=lambda: started + timedelta(seconds=180),
)

driver = AutoInterviewDriver(
FunctionInterviewBackend(start, answer),
store=AutoStore(tmp_path),
max_rounds=1,
)
pipeline = AutoPipeline(
driver,
generate_seed,
store=AutoStore(tmp_path),
watchdog=watchdog,
)

result = await pipeline.run(state)

assert result.status == "blocked"
assert state.phase is AutoPhase.BLOCKED
assert result.stop_reason_code == WATCHDOG_STOP_REASON_CODE
assert "120s" in (state.last_error or "")
assert len(appender.events) == 1


@pytest.mark.asyncio
async def test_pipeline_with_disabled_watchdog_never_fires(tmp_path) -> None:
"""A watchdog with ``session_wall_clock_seconds=0`` is the opt-out
Expand Down
35 changes: 29 additions & 6 deletions tests/unit/runtime/test_controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ def test_load_returns_defaults_when_path_is_none() -> None:
assert load_runtime_controls(None) == RuntimeControls()


def test_load_path_none_uses_env_override(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OUROBOROS_SESSION_WALL_CLOCK_SECONDS", "0")

controls = load_runtime_controls(None)

assert controls == RuntimeControls(session_wall_clock_seconds=0)


def test_load_path_none_rejects_bad_env_override(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("OUROBOROS_SESSION_WALL_CLOCK_SECONDS", "soon")

with pytest.raises(ValueError, match="OUROBOROS_SESSION_WALL_CLOCK_SECONDS"):
load_runtime_controls(None)


def test_load_missing_file_raises(tmp_path: Path) -> None:
with pytest.raises(FileNotFoundError):
load_runtime_controls(tmp_path / "nope.yaml")
Expand Down Expand Up @@ -89,16 +104,24 @@ def test_load_non_mapping_block_raises(tmp_path: Path) -> None:
load_runtime_controls(fixture)


def test_load_unknown_key_rejected(tmp_path: Path) -> None:
"""Unknown keys under ``runtime_controls`` are rejected — v2 expansion
keys (idle/no_progress/safety, directive vocabulary, …) require code
review to land, not silent acceptance."""
fixture = tmp_path / "extra_key.yaml"
def test_load_existing_runtime_controls_keys_ignored(tmp_path: Path) -> None:
"""Existing config-owned runtime_controls keys remain compatible."""
fixture = tmp_path / "existing_keys.yaml"
fixture.write_text(
"runtime_controls:\n"
" session_wall_clock_seconds: 600\n"
" idle_timeout_seconds: 60\n" # v2 expansion path; not yet here.
" mcp_tool_timeout_seconds: 0\n"
" generation_idle_timeout_seconds: 7200\n"
" generation_no_progress_timeout_seconds: 14400\n"
" generation_safety_timeout_seconds: 0\n"
" watchdog_poll_seconds: 15\n"
)
assert load_runtime_controls(fixture) == RuntimeControls(session_wall_clock_seconds=600)


def test_load_unknown_key_rejected(tmp_path: Path) -> None:
fixture = tmp_path / "extra_key.yaml"
fixture.write_text("runtime_controls:\n idle_timeout_seconds: 60\n")
with pytest.raises(ValueError, match="idle_timeout_seconds"):
load_runtime_controls(fixture)

Expand Down
Loading
Loading