Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions parallel_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import subprocess
import sys
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Literal
Expand Down Expand Up @@ -135,6 +136,7 @@ def _dump_database_state(feature_dicts: list[dict], label: str = ""):
POLL_INTERVAL = 5 # seconds between checking for ready features
MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature
INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer
AGENT_INACTIVITY_TIMEOUT = 1200 # 20 minutes - kill agents with no output activity


class ParallelOrchestrator:
Expand Down Expand Up @@ -199,6 +201,10 @@ def __init__(
# Track feature failures to prevent infinite retry loops
self._failure_counts: dict[int, int] = {}

# Track last activity time per agent for stuck detection
# Updated whenever an agent produces output
self._last_activity: dict[int, float] = {}

# Track recently tested feature IDs to avoid redundant re-testing.
# Cleared when all passing features have been covered at least once.
self._recently_tested: set[int] = set()
Expand Down Expand Up @@ -867,6 +873,8 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]:
with self._lock:
self.running_coding_agents[feature_id] = proc
self.abort_events[feature_id] = abort_event
# Initialize activity timestamp for stuck detection
self._last_activity[feature_id] = time.time()

# Start output reader thread
threading.Thread(
Expand Down Expand Up @@ -1027,6 +1035,8 @@ def _spawn_testing_agent(self) -> tuple[bool, str]:
# when multiple agents test the same feature
self.running_testing_agents[proc.pid] = (primary_feature_id, proc)
testing_count = len(self.running_testing_agents)
# Initialize activity timestamp for stuck detection (negative PID to distinguish from coding)
self._last_activity[-proc.pid] = time.time()

# Start output reader thread with primary feature ID for log attribution
threading.Thread(
Expand Down Expand Up @@ -1139,6 +1149,11 @@ def _read_output(
if abort.is_set():
break
line = line.rstrip()
# Update activity timestamp for stuck detection
if feature_id is not None:
activity_key = -proc.pid if agent_type == "testing" else feature_id
with self._lock:
self._last_activity[activity_key] = time.time()
# Detect when a batch agent claims a new feature
claim_match = self._CLAIM_FEATURE_PATTERN.search(line)
if claim_match:
Expand Down Expand Up @@ -1206,6 +1221,59 @@ async def _wait_for_agent_completion(self, timeout: float = POLL_INTERVAL):
# Timeout reached without agent completion - this is normal, just check anyway
pass

def _check_stuck_agents(self) -> list[int]:
"""Check for and kill agents that have been inactive for too long.

An agent is considered stuck if it hasn't produced any output for
AGENT_INACTIVITY_TIMEOUT seconds. This catches agents that hang without
crashing (e.g., waiting indefinitely for a response).

Returns:
List of feature IDs that were killed due to inactivity.
"""
current_time = time.time()
killed_features = []

with self._lock:
# Check coding agents (keyed by feature_id)
for feature_id, proc in list(self.running_coding_agents.items()):
last_activity = self._last_activity.get(feature_id, current_time)
inactive_seconds = current_time - last_activity

if inactive_seconds > AGENT_INACTIVITY_TIMEOUT:
inactive_minutes = int(inactive_seconds // 60)
print(f"WARNING: Feature #{feature_id} agent stuck - no output for {inactive_minutes} minutes. Killing...", flush=True)
debug_log.log("STUCK", f"Killing stuck coding agent for feature #{feature_id}",
inactive_minutes=inactive_minutes,
pid=proc.pid)

# Kill the stuck agent
try:
kill_process_tree(proc, timeout=5.0)
except Exception as e:
debug_log.log("STUCK", f"Error killing stuck agent for feature #{feature_id}", error=str(e))

killed_features.append(feature_id)

# Check testing agents (keyed by PID, activity keyed by -PID)
for pid, (feature_id, proc) in list(self.running_testing_agents.items()):
last_activity = self._last_activity.get(-pid, current_time)
inactive_seconds = current_time - last_activity

if inactive_seconds > AGENT_INACTIVITY_TIMEOUT:
inactive_minutes = int(inactive_seconds // 60)
print(f"WARNING: Testing agent for feature #{feature_id} stuck - no output for {inactive_minutes} minutes. Killing...", flush=True)
debug_log.log("STUCK", f"Killing stuck testing agent for feature #{feature_id}",
inactive_minutes=inactive_minutes,
pid=proc.pid)

try:
kill_process_tree(proc, timeout=5.0)
except Exception as e:
debug_log.log("STUCK", f"Error killing stuck testing agent for feature #{feature_id}", error=str(e))

return killed_features

def _on_agent_complete(
self,
feature_id: int | None,
Expand All @@ -1228,6 +1296,8 @@ def _on_agent_complete(
with self._lock:
# Remove by PID
self.running_testing_agents.pop(proc.pid, None)
# Clean up activity tracking (negative PID key for testing agents)
self._last_activity.pop(-proc.pid, None)

status = "completed" if return_code == 0 else "failed"
print(f"Feature #{feature_id} testing {status}", flush=True)
Expand All @@ -1252,6 +1322,8 @@ def _on_agent_complete(
self._feature_to_primary.pop(fid, None)
self.running_coding_agents.pop(feature_id, None)
self.abort_events.pop(feature_id, None)
# Clean up activity tracking
self._last_activity.pop(feature_id, None)

all_feature_ids = batch_ids or [feature_id]

Expand Down Expand Up @@ -1489,6 +1561,14 @@ async def run_loop(self):
print("\nAll features complete!", flush=True)
break

# Check for stuck agents (no output for AGENT_INACTIVITY_TIMEOUT)
stuck_features = self._check_stuck_agents()
if stuck_features:
debug_log.log("STUCK", f"Killed {len(stuck_features)} stuck agent(s)",
feature_ids=stuck_features)
# Brief pause to allow process cleanup before continuing
await asyncio.sleep(1)

# Maintain testing agents independently (runs every iteration)
self._maintain_testing_agents(feature_dicts)

Expand Down