Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/ouroboros/orchestrator/parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3325,13 +3325,25 @@ async def execute_parallel(
)
# Invalid indices will be skipped in the execution loop below

dependency_edges = [
{"ac_index": idx, "depends_on": deps}
for idx in range(total_acs)
if (deps := tuple(execution_plan.get_dependencies(idx)))
]
log.info(
"parallel_executor.execution.started",
session_id=session_id,
total_acs=total_acs,
total_levels=total_levels,
levels=execution_plan.execution_levels,
)
log.info(
"parallel_executor.dependency_graph",
session_id=session_id,
execution_id=execution_id,
total_acs=total_acs,
dependency_edges=dependency_edges,
)

# Emit initial progress for TUI
await self._emit_workflow_progress(
Expand Down Expand Up @@ -5205,6 +5217,28 @@ async def _execute_atomic_ac(
result_final_message = final_message
if fat_harness_error is not None:
success = False
log.warning(
"parallel_executor.ac.verifier_rejected",
session_id=session_id,
execution_id=execution_id,
ac_index=ac_index,
depth=depth,
reason=fat_harness_error,
typed_evidence_present=typed_evidence is not None,
typed_evidence_valid=(
typed_validation.ok if typed_validation is not None else False
),
verifier_ran=verifier_verdict is not None,
verifier_passed=(
verifier_verdict.passed if verifier_verdict is not None else False
),
verifier_reasons=(
list(verifier_verdict.reasons) if verifier_verdict is not None else []
),
verifier_failure_class=(
verifier_verdict.failure_class if verifier_verdict is not None else None
),
)
result_final_message = (
f"{fat_harness_error}\n\nRuntime final message:\n{final_message}"
if final_message
Expand Down
75 changes: 67 additions & 8 deletions tests/unit/orchestrator/test_parallel_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5612,16 +5612,32 @@ def _rejecting_verifier(**kwargs: object) -> VerifierVerdict:
atomic_verifier=_rejecting_verifier,
)

result = await executor._execute_atomic_ac(
ac_index=0,
ac_content="Implement AC 1",
with patch("ouroboros.orchestrator.parallel_executor.log") as log_mock:
result = await executor._execute_atomic_ac(
ac_index=0,
ac_content="Implement AC 1",
session_id="orch_123",
tools=["Read"],
tool_catalog=(MCPToolDefinition(name="Read", description="Read a file."),),
system_prompt="system",
seed_goal="Ship the feature",
depth=0,
start_time=datetime.now(UTC),
)

log_mock.warning.assert_any_call(
"parallel_executor.ac.verifier_rejected",
session_id="orch_123",
tools=["Read"],
tool_catalog=(MCPToolDefinition(name="Read", description="Read a file."),),
system_prompt="system",
seed_goal="Ship the feature",
execution_id="",
ac_index=0,
depth=0,
start_time=datetime.now(UTC),
reason="Fat-harness verifier failed (claimed test command did not support the AC).",
typed_evidence_present=True,
typed_evidence_valid=True,
verifier_ran=True,
verifier_passed=False,
verifier_reasons=["claimed test command did not support the AC"],
verifier_failure_class="FABRICATION_SUSPECTED",
)

assert result.success is False
Expand Down Expand Up @@ -6412,6 +6428,49 @@ async def test_execute_parallel_skips_externally_satisfied_acs(self) -> None:
assert "abc1234" in result.results[0].final_message
executor._execute_ac_batch.assert_awaited_once()

@pytest.mark.asyncio
async def test_execute_parallel_logs_dependency_edges(self) -> None:
"""The inferred dependency graph should be visible before cascaded skips."""
seed = _make_seed("AC 0 foundation", "AC 1 dependent flow")
dependency_graph = DependencyGraph(
nodes=(
ACNode(index=0, content=seed.acceptance_criteria[0], depends_on=()),
ACNode(index=1, content=seed.acceptance_criteria[1], depends_on=(0,)),
),
execution_levels=((0,), (1,)),
)
executor = _make_executor()
executor._execute_ac_batch = AsyncMock(
return_value=[
ACExecutionResult(
ac_index=0,
ac_content=seed.acceptance_criteria[0],
success=False,
error="Foundation failed",
outcome=ACExecutionOutcome.FAILED,
)
]
)

with patch("ouroboros.orchestrator.parallel_executor.log") as log_mock:
await executor.execute_parallel(
seed=seed,
execution_plan=dependency_graph.to_execution_plan(),
session_id="orch_dependency_log",
execution_id="exec_dependency_log",
tools=["Read"],
tool_catalog=None,
system_prompt="system",
)

log_mock.info.assert_any_call(
"parallel_executor.dependency_graph",
session_id="orch_dependency_log",
execution_id="exec_dependency_log",
total_acs=2,
dependency_edges=[{"ac_index": 1, "depends_on": (0,)}],
)

@pytest.mark.asyncio
async def test_externally_satisfied_ac_blocked_when_dependency_failed(self) -> None:
"""Externally satisfied ACs must be BLOCKED when an upstream dep failed.
Expand Down