diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 856e33cb..cb6626ea 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -27,6 +27,7 @@ import subprocess import sys import threading +import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Literal @@ -135,6 +136,7 @@ def _dump_database_state(feature_dicts: list[dict], label: str = ""): POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer +AGENT_INACTIVITY_TIMEOUT = 1200 # 20 minutes - kill agents with no output activity class ParallelOrchestrator: @@ -199,6 +201,10 @@ def __init__( # Track feature failures to prevent infinite retry loops self._failure_counts: dict[int, int] = {} + # Track last activity time per agent for stuck detection + # Updated whenever an agent produces output + self._last_activity: dict[int, float] = {} + # Track recently tested feature IDs to avoid redundant re-testing. # Cleared when all passing features have been covered at least once. self._recently_tested: set[int] = set() @@ -867,6 +873,8 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: with self._lock: self.running_coding_agents[feature_id] = proc self.abort_events[feature_id] = abort_event + # Initialize activity timestamp for stuck detection + self._last_activity[feature_id] = time.time() # Start output reader thread threading.Thread( @@ -1027,6 +1035,8 @@ def _spawn_testing_agent(self) -> tuple[bool, str]: # when multiple agents test the same feature self.running_testing_agents[proc.pid] = (primary_feature_id, proc) testing_count = len(self.running_testing_agents) + # Initialize activity timestamp for stuck detection (negative PID to distinguish from coding) + self._last_activity[-proc.pid] = time.time() # Start output reader thread with primary feature ID for log attribution threading.Thread( @@ -1139,6 +1149,11 @@ def _read_output( if abort.is_set(): break line = line.rstrip() + # Update activity timestamp for stuck detection + if feature_id is not None: + activity_key = -proc.pid if agent_type == "testing" else feature_id + with self._lock: + self._last_activity[activity_key] = time.time() # Detect when a batch agent claims a new feature claim_match = self._CLAIM_FEATURE_PATTERN.search(line) if claim_match: @@ -1206,6 +1221,59 @@ async def _wait_for_agent_completion(self, timeout: float = POLL_INTERVAL): # Timeout reached without agent completion - this is normal, just check anyway pass + def _check_stuck_agents(self) -> list[int]: + """Check for and kill agents that have been inactive for too long. + + An agent is considered stuck if it hasn't produced any output for + AGENT_INACTIVITY_TIMEOUT seconds. This catches agents that hang without + crashing (e.g., waiting indefinitely for a response). + + Returns: + List of feature IDs that were killed due to inactivity. + """ + current_time = time.time() + killed_features = [] + + with self._lock: + # Check coding agents (keyed by feature_id) + for feature_id, proc in list(self.running_coding_agents.items()): + last_activity = self._last_activity.get(feature_id, current_time) + inactive_seconds = current_time - last_activity + + if inactive_seconds > AGENT_INACTIVITY_TIMEOUT: + inactive_minutes = int(inactive_seconds // 60) + print(f"WARNING: Feature #{feature_id} agent stuck - no output for {inactive_minutes} minutes. Killing...", flush=True) + debug_log.log("STUCK", f"Killing stuck coding agent for feature #{feature_id}", + inactive_minutes=inactive_minutes, + pid=proc.pid) + + # Kill the stuck agent + try: + kill_process_tree(proc, timeout=5.0) + except Exception as e: + debug_log.log("STUCK", f"Error killing stuck agent for feature #{feature_id}", error=str(e)) + + killed_features.append(feature_id) + + # Check testing agents (keyed by PID, activity keyed by -PID) + for pid, (feature_id, proc) in list(self.running_testing_agents.items()): + last_activity = self._last_activity.get(-pid, current_time) + inactive_seconds = current_time - last_activity + + if inactive_seconds > AGENT_INACTIVITY_TIMEOUT: + inactive_minutes = int(inactive_seconds // 60) + print(f"WARNING: Testing agent for feature #{feature_id} stuck - no output for {inactive_minutes} minutes. Killing...", flush=True) + debug_log.log("STUCK", f"Killing stuck testing agent for feature #{feature_id}", + inactive_minutes=inactive_minutes, + pid=proc.pid) + + try: + kill_process_tree(proc, timeout=5.0) + except Exception as e: + debug_log.log("STUCK", f"Error killing stuck testing agent for feature #{feature_id}", error=str(e)) + + return killed_features + def _on_agent_complete( self, feature_id: int | None, @@ -1228,6 +1296,8 @@ def _on_agent_complete( with self._lock: # Remove by PID self.running_testing_agents.pop(proc.pid, None) + # Clean up activity tracking (negative PID key for testing agents) + self._last_activity.pop(-proc.pid, None) status = "completed" if return_code == 0 else "failed" print(f"Feature #{feature_id} testing {status}", flush=True) @@ -1252,6 +1322,8 @@ def _on_agent_complete( self._feature_to_primary.pop(fid, None) self.running_coding_agents.pop(feature_id, None) self.abort_events.pop(feature_id, None) + # Clean up activity tracking + self._last_activity.pop(feature_id, None) all_feature_ids = batch_ids or [feature_id] @@ -1489,6 +1561,14 @@ async def run_loop(self): print("\nAll features complete!", flush=True) break + # Check for stuck agents (no output for AGENT_INACTIVITY_TIMEOUT) + stuck_features = self._check_stuck_agents() + if stuck_features: + debug_log.log("STUCK", f"Killed {len(stuck_features)} stuck agent(s)", + feature_ids=stuck_features) + # Brief pause to allow process cleanup before continuing + await asyncio.sleep(1) + # Maintain testing agents independently (runs every iteration) self._maintain_testing_agents(feature_dicts)