From 9721368188ca0f04c8c36030ce0a6552f0ac3780 Mon Sep 17 00:00:00 2001 From: Caitlyn Byrne Date: Sun, 8 Feb 2026 13:25:37 -0500 Subject: [PATCH] feat: add graceful pause (drain mode) for running agents File-based signal (.pause_drain) lets the orchestrator finish current work before pausing instead of hard-freezing the process tree. New status states pausing/paused_graceful flow through WebSocket to the UI where a Pause button, draining indicator, and Resume button are shown. Co-Authored-By: Claude Opus 4.6 --- autoforge_paths.py | 10 +++ parallel_orchestrator.py | 45 ++++++++++ server/routers/agent.py | 28 +++++++ server/schemas.py | 2 +- server/services/process_manager.py | 77 ++++++++++++++++-- server/websocket.py | 27 ++++++ ui/package-lock.json | 14 ++++ ui/src/components/AgentControl.tsx | 86 ++++++++++++++++---- ui/src/components/OrchestratorStatusCard.tsx | 8 ++ ui/src/hooks/useProjects.ts | 22 +++++ ui/src/lib/api.ts | 12 +++ ui/src/lib/types.ts | 4 +- 12 files changed, 311 insertions(+), 24 deletions(-) diff --git a/autoforge_paths.py b/autoforge_paths.py index 8283a9b9..c782f2c6 100644 --- a/autoforge_paths.py +++ b/autoforge_paths.py @@ -39,6 +39,7 @@ assistant.db-shm .agent.lock .devserver.lock +.pause_drain .claude_settings.json .claude_assistant_settings.json .claude_settings.expand.*.json @@ -145,6 +146,15 @@ def get_claude_assistant_settings_path(project_dir: Path) -> Path: return _resolve_path(project_dir, ".claude_assistant_settings.json") +def get_pause_drain_path(project_dir: Path) -> Path: + """Return the path to the ``.pause_drain`` signal file. + + This file is created to request a graceful pause (drain mode). + Always uses the new location since it's a transient signal file. + """ + return project_dir / ".autoforge" / ".pause_drain" + + def get_progress_cache_path(project_dir: Path) -> Path: """Resolve the path to ``.progress_cache``.""" return _resolve_path(project_dir, ".progress_cache") diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 856e33cb..5112b4a9 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -212,6 +212,9 @@ def __init__( # Signal handlers only set this flag; cleanup happens in the main loop self._shutdown_requested = False + # Graceful pause (drain mode) flag + self._drain_requested = False + # Session tracking for logging/debugging self.session_start_time: datetime | None = None @@ -1368,6 +1371,9 @@ async def run_loop(self): # Must happen before any debug_log.log() calls debug_log.start_session() + # Clear any stale drain signal from a previous session + self._clear_drain_signal() + # Log startup to debug file debug_log.section("ORCHESTRATOR STARTUP") debug_log.log("STARTUP", "Orchestrator run_loop starting", @@ -1489,6 +1495,34 @@ async def run_loop(self): print("\nAll features complete!", flush=True) break + # --- Graceful pause (drain mode) --- + if not self._drain_requested and self._check_drain_signal(): + self._drain_requested = True + print("Graceful pause requested - draining running agents...", flush=True) + debug_log.log("DRAIN", "Graceful pause requested, draining running agents") + + if self._drain_requested: + with self._lock: + coding_count = len(self.running_coding_agents) + testing_count = len(self.running_testing_agents) + + if coding_count == 0 and testing_count == 0: + print("All agents drained - paused.", flush=True) + debug_log.log("DRAIN", "All agents drained, entering paused state") + # Wait until signal file is removed (resume) or shutdown + while self._check_drain_signal() and self.is_running and not self._shutdown_requested: + await asyncio.sleep(1) + if not self.is_running or self._shutdown_requested: + break + self._drain_requested = False + print("Resuming from graceful pause...", flush=True) + debug_log.log("DRAIN", "Resuming from graceful pause") + continue + else: + debug_log.log("DRAIN", f"Waiting for agents to finish: coding={coding_count}, testing={testing_count}") + await self._wait_for_agent_completion() + continue + # Maintain testing agents independently (runs every iteration) self._maintain_testing_agents(feature_dicts) @@ -1613,6 +1647,17 @@ def get_status(self) -> dict: "yolo_mode": self.yolo_mode, } + def _check_drain_signal(self) -> bool: + """Check if the graceful pause (drain) signal file exists.""" + from autoforge_paths import get_pause_drain_path + return get_pause_drain_path(self.project_dir).exists() + + def _clear_drain_signal(self) -> None: + """Delete the drain signal file and reset the flag.""" + from autoforge_paths import get_pause_drain_path + get_pause_drain_path(self.project_dir).unlink(missing_ok=True) + self._drain_requested = False + def cleanup(self) -> None: """Clean up database resources. Safe to call multiple times. diff --git a/server/routers/agent.py b/server/routers/agent.py index 26605e4b..ea961666 100644 --- a/server/routers/agent.py +++ b/server/routers/agent.py @@ -175,3 +175,31 @@ async def resume_agent(project_name: str): status=manager.status, message=message, ) + + +@router.post("/graceful-pause", response_model=AgentActionResponse) +async def graceful_pause_agent(project_name: str): + """Request a graceful pause (drain mode) - finish current work then pause.""" + manager = get_project_manager(project_name) + + success, message = await manager.graceful_pause() + + return AgentActionResponse( + success=success, + status=manager.status, + message=message, + ) + + +@router.post("/graceful-resume", response_model=AgentActionResponse) +async def graceful_resume_agent(project_name: str): + """Resume from a graceful pause.""" + manager = get_project_manager(project_name) + + success, message = await manager.graceful_resume() + + return AgentActionResponse( + success=success, + status=manager.status, + message=message, + ) diff --git a/server/schemas.py b/server/schemas.py index 5f546e2b..d470d499 100644 --- a/server/schemas.py +++ b/server/schemas.py @@ -217,7 +217,7 @@ def validate_testing_ratio(cls, v: int | None) -> int | None: class AgentStatus(BaseModel): """Current agent status.""" - status: Literal["stopped", "running", "paused", "crashed"] + status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"] pid: int | None = None started_at: datetime | None = None yolo_mode: bool = False diff --git a/server/services/process_manager.py b/server/services/process_manager.py index d38d9001..0af7cbab 100644 --- a/server/services/process_manager.py +++ b/server/services/process_manager.py @@ -77,7 +77,7 @@ def __init__( self.project_dir = project_dir self.root_dir = root_dir self.process: subprocess.Popen | None = None - self._status: Literal["stopped", "running", "paused", "crashed"] = "stopped" + self._status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"] = "stopped" self.started_at: datetime | None = None self._output_task: asyncio.Task | None = None self.yolo_mode: bool = False # YOLO mode for rapid prototyping @@ -96,11 +96,11 @@ def __init__( self.lock_file = get_agent_lock_path(self.project_dir) @property - def status(self) -> Literal["stopped", "running", "paused", "crashed"]: + def status(self) -> Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]: return self._status @status.setter - def status(self, value: Literal["stopped", "running", "paused", "crashed"]): + def status(self, value: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]): old_status = self._status self._status = value if old_status != value: @@ -308,6 +308,12 @@ async def _stream_output(self) -> None: for help_line in AUTH_ERROR_HELP.strip().split('\n'): await self._broadcast_output(help_line) + # Detect graceful pause status transitions from orchestrator output + if "All agents drained - paused." in decoded: + self.status = "paused_graceful" + elif "Resuming from graceful pause..." in decoded: + self.status = "running" + await self._broadcast_output(sanitized) except asyncio.CancelledError: @@ -355,7 +361,7 @@ async def start( Returns: Tuple of (success, message) """ - if self.status in ("running", "paused"): + if self.status in ("running", "paused", "pausing", "paused_graceful"): return False, f"Agent is already {self.status}" if not self._check_lock(): @@ -481,6 +487,12 @@ async def stop(self) -> tuple[bool, str]: self._cleanup_stale_features() self._remove_lock() + # Clean up drain signal file if present + try: + from autoforge_paths import get_pause_drain_path + get_pause_drain_path(self.project_dir).unlink(missing_ok=True) + except Exception: + pass self.status = "stopped" self.process = None self.started_at = None @@ -541,6 +553,47 @@ async def resume(self) -> tuple[bool, str]: logger.exception("Failed to resume agent") return False, f"Failed to resume agent: {e}" + async def graceful_pause(self) -> tuple[bool, str]: + """Request a graceful pause (drain mode). + + Creates a signal file that the orchestrator polls. Running agents + finish their current work before the orchestrator enters a paused state. + + Returns: + Tuple of (success, message) + """ + if not self.process or self.status not in ("running",): + return False, "Agent is not running" + + try: + from autoforge_paths import get_pause_drain_path + drain_path = get_pause_drain_path(self.project_dir) + drain_path.parent.mkdir(parents=True, exist_ok=True) + drain_path.write_text(str(self.process.pid)) + self.status = "pausing" + return True, "Graceful pause requested" + except Exception as e: + logger.exception("Failed to request graceful pause") + return False, f"Failed to request graceful pause: {e}" + + async def graceful_resume(self) -> tuple[bool, str]: + """Resume from a graceful pause by removing the drain signal file. + + Returns: + Tuple of (success, message) + """ + if not self.process or self.status not in ("pausing", "paused_graceful"): + return False, "Agent is not in a graceful pause state" + + try: + from autoforge_paths import get_pause_drain_path + get_pause_drain_path(self.project_dir).unlink(missing_ok=True) + self.status = "running" + return True, "Agent resumed from graceful pause" + except Exception as e: + logger.exception("Failed to resume from graceful pause") + return False, f"Failed to resume: {e}" + async def healthcheck(self) -> bool: """ Check if the agent process is still alive. @@ -556,8 +609,14 @@ async def healthcheck(self) -> bool: poll = self.process.poll() if poll is not None: # Process has terminated - if self.status in ("running", "paused"): + if self.status in ("running", "paused", "pausing", "paused_graceful"): self._cleanup_stale_features() + # Clean up drain signal file if present + try: + from autoforge_paths import get_pause_drain_path + get_pause_drain_path(self.project_dir).unlink(missing_ok=True) + except Exception: + pass self.status = "crashed" self._remove_lock() return False @@ -642,8 +701,14 @@ def cleanup_orphaned_locks() -> int: if not project_path.exists(): continue + # Clean up stale drain signal files + from autoforge_paths import get_autoforge_dir, get_pause_drain_path + drain_file = get_pause_drain_path(project_path) + if drain_file.exists(): + drain_file.unlink(missing_ok=True) + logger.info("Removed stale drain signal file for project '%s'", name) + # Check both legacy and new locations for lock files - from autoforge_paths import get_autoforge_dir lock_locations = [ project_path / ".agent.lock", get_autoforge_dir(project_path) / ".agent.lock", diff --git a/server/websocket.py b/server/websocket.py index e6600643..ef57bf03 100644 --- a/server/websocket.py +++ b/server/websocket.py @@ -78,6 +78,9 @@ 'testing_complete': re.compile(r'Feature #(\d+) testing (completed|failed)'), 'all_complete': re.compile(r'All features complete'), 'blocked_features': re.compile(r'(\d+) blocked by dependencies'), + 'drain_start': re.compile(r'Graceful pause requested'), + 'drain_complete': re.compile(r'All agents drained'), + 'drain_resume': re.compile(r'Resuming from graceful pause'), } @@ -562,6 +565,30 @@ async def process_line(self, line: str) -> dict | None: 'All features complete!' ) + # Graceful pause (drain mode) events + elif ORCHESTRATOR_PATTERNS['drain_start'].search(line): + self.state = 'draining' + update = self._create_update( + 'drain_start', + 'Draining active agents...' + ) + + elif ORCHESTRATOR_PATTERNS['drain_complete'].search(line): + self.state = 'paused' + self.coding_agents = 0 + self.testing_agents = 0 + update = self._create_update( + 'drain_complete', + 'All agents drained. Paused.' + ) + + elif ORCHESTRATOR_PATTERNS['drain_resume'].search(line): + self.state = 'scheduling' + update = self._create_update( + 'drain_resume', + 'Resuming feature scheduling' + ) + return update def _create_update( diff --git a/ui/package-lock.json b/ui/package-lock.json index 56a3de21..6c11fce3 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -96,6 +96,7 @@ "integrity": "sha512-e7jT4DxYvIDLk1ZHmU/m/mB19rex9sv0c2ftBtjSBv+kVM/902eh0fINUzD7UwLLNR+jU585GxUJ8/EBfAM5fw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@babel/code-frame": "^7.27.1", "@babel/generator": "^7.28.5", @@ -2825,6 +2826,7 @@ "integrity": "sha512-MciR4AKGHWl7xwxkBa6xUGxQJ4VBOmPTF7sL+iGzuahOFaO0jHCsuEfS80pan1ef4gWId1oWOweIhrDEYLuaOw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -2834,6 +2836,7 @@ "resolved": "https://registry.npmjs.org/@types/react/-/react-19.2.9.tgz", "integrity": "sha512-Lpo8kgb/igvMIPeNV2rsYKTgaORYdO1XGVZ4Qz3akwOj0ySGYMPlQWa8BaLn0G63D1aSaAQ5ldR06wCpChQCjA==", "license": "MIT", + "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -2844,6 +2847,7 @@ "integrity": "sha512-jp2L/eY6fn+KgVVQAOqYItbF0VY/YApe5Mz2F0aykSO8gx31bYCZyvSeYxCHKvzHG5eZjc+zyaS5BrBWya2+kQ==", "devOptional": true, "license": "MIT", + "peer": true, "peerDependencies": { "@types/react": "^19.2.0" } @@ -2899,6 +2903,7 @@ "integrity": "sha512-3xP4XzzDNQOIqBMWogftkwxhg5oMKApqY0BAflmLZiFYHqyhSOxv/cd/zPQLTcCXr4AkaKb25joocY0BD1WC6A==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.51.0", "@typescript-eslint/types": "8.51.0", @@ -3209,6 +3214,7 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", + "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -3340,6 +3346,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -3611,6 +3618,7 @@ "resolved": "https://registry.npmjs.org/d3-selection/-/d3-selection-3.0.0.tgz", "integrity": "sha512-fmTRWbNMmsmWq6xJV8D19U/gw/bwrHfNXxrIN+HfZgnzqTHp9jOmKMhsTUjXOJnZOdZY9Q28y4yebKzqDKlxlQ==", "license": "ISC", + "peer": true, "engines": { "node": ">=12" } @@ -3836,6 +3844,7 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -5836,6 +5845,7 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -5951,6 +5961,7 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.2.3.tgz", "integrity": "sha512-Ku/hhYbVjOQnXDZFv2+RibmLFGwFdeeKHFcOTlrt7xplBnya5OGn/hIRDsqDiSUcfORsDC7MPxwork8jBwsIWA==", "license": "MIT", + "peer": true, "engines": { "node": ">=0.10.0" } @@ -5960,6 +5971,7 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.2.3.tgz", "integrity": "sha512-yELu4WmLPw5Mr/lmeEpox5rw3RETacE++JgHqQzd2dg+YbJuat3jH4ingc+WPZhxaoFzdv9y33G+F7Nl5O0GBg==", "license": "MIT", + "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -6424,6 +6436,7 @@ "integrity": "sha512-84MVSjMEHP+FQRPy3pX9sTVV/INIex71s9TL2Gm5FG/WG1SqXeKyZ0k7/blY/4FdOzI12CBy1vGc4og/eus0fw==", "dev": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -6677,6 +6690,7 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", diff --git a/ui/src/components/AgentControl.tsx b/ui/src/components/AgentControl.tsx index 3529c03d..7dc7e0a3 100644 --- a/ui/src/components/AgentControl.tsx +++ b/ui/src/components/AgentControl.tsx @@ -1,8 +1,10 @@ import { useState, useEffect, useRef, useCallback } from 'react' -import { Play, Square, Loader2, GitBranch, Clock } from 'lucide-react' +import { Play, Square, Loader2, GitBranch, Clock, Pause, PlayCircle } from 'lucide-react' import { useStartAgent, useStopAgent, + useGracefulPauseAgent, + useGracefulResumeAgent, useSettings, useUpdateProjectSettings, } from '../hooks/useProjects' @@ -60,12 +62,14 @@ export function AgentControl({ projectName, status, defaultConcurrency = 3 }: Ag const startAgent = useStartAgent(projectName) const stopAgent = useStopAgent(projectName) + const gracefulPause = useGracefulPauseAgent(projectName) + const gracefulResume = useGracefulResumeAgent(projectName) const { data: nextRun } = useNextScheduledRun(projectName) const [showScheduleModal, setShowScheduleModal] = useState(false) - const isLoading = startAgent.isPending || stopAgent.isPending - const isRunning = status === 'running' || status === 'paused' + const isLoading = startAgent.isPending || stopAgent.isPending || gracefulPause.isPending || gracefulResume.isPending + const isRunning = status === 'running' || status === 'paused' || status === 'pausing' || status === 'paused_graceful' const isLoadingStatus = status === 'loading' const isParallel = concurrency > 1 @@ -126,7 +130,7 @@ export function AgentControl({ projectName, status, defaultConcurrency = 3 }: Ag )} - {/* Start/Stop button */} + {/* Start/Stop/Pause/Resume buttons */} {isLoadingStatus ? ( ) : ( - + + {/* Paused indicator + Resume button */} + {status === 'paused_graceful' && ( + <> + + Paused + + + + )} + + {/* Graceful pause button (only when running normally) */} + {status === 'running' && ( + + )} + + {/* Stop button (always available) */} + + )} {/* Clock button to open schedule modal */} diff --git a/ui/src/components/OrchestratorStatusCard.tsx b/ui/src/components/OrchestratorStatusCard.tsx index dedeaa94..860abfd7 100644 --- a/ui/src/components/OrchestratorStatusCard.tsx +++ b/ui/src/components/OrchestratorStatusCard.tsx @@ -25,6 +25,10 @@ function getStateText(state: OrchestratorState): string { return 'Watching progress...' case 'complete': return 'Mission accomplished!' + case 'draining': + return 'Draining agents...' + case 'paused': + return 'Paused' default: return 'Orchestrating...' } @@ -42,6 +46,10 @@ function getStateColor(state: OrchestratorState): string { return 'text-primary' case 'initializing': return 'text-yellow-600 dark:text-yellow-400' + case 'draining': + return 'text-amber-600 dark:text-amber-400' + case 'paused': + return 'text-muted-foreground' default: return 'text-muted-foreground' } diff --git a/ui/src/hooks/useProjects.ts b/ui/src/hooks/useProjects.ts index f69d90f9..4ed44364 100644 --- a/ui/src/hooks/useProjects.ts +++ b/ui/src/hooks/useProjects.ts @@ -197,6 +197,28 @@ export function useResumeAgent(projectName: string) { }) } +export function useGracefulPauseAgent(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.gracefulPauseAgent(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + +export function useGracefulResumeAgent(projectName: string) { + const queryClient = useQueryClient() + + return useMutation({ + mutationFn: () => api.gracefulResumeAgent(projectName), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['agent-status', projectName] }) + }, + }) +} + // ============================================================================ // Setup // ============================================================================ diff --git a/ui/src/lib/api.ts b/ui/src/lib/api.ts index 23e9973f..739d5ff8 100644 --- a/ui/src/lib/api.ts +++ b/ui/src/lib/api.ts @@ -271,6 +271,18 @@ export async function resumeAgent(projectName: string): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/agent/graceful-pause`, { + method: 'POST', + }) +} + +export async function gracefulResumeAgent(projectName: string): Promise { + return fetchJSON(`/projects/${encodeURIComponent(projectName)}/agent/graceful-resume`, { + method: 'POST', + }) +} + // ============================================================================ // Spec Creation API // ============================================================================ diff --git a/ui/src/lib/types.ts b/ui/src/lib/types.ts index ba8eab94..5a82a63f 100644 --- a/ui/src/lib/types.ts +++ b/ui/src/lib/types.ts @@ -120,7 +120,7 @@ export interface FeatureUpdate { } // Agent types -export type AgentStatus = 'stopped' | 'running' | 'paused' | 'crashed' | 'loading' +export type AgentStatus = 'stopped' | 'running' | 'paused' | 'crashed' | 'loading' | 'pausing' | 'paused_graceful' export interface AgentStatusResponse { status: AgentStatus @@ -216,6 +216,8 @@ export type OrchestratorState = | 'spawning' | 'monitoring' | 'complete' + | 'draining' + | 'paused' // Orchestrator event for recent activity export interface OrchestratorEvent {