Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions autoforge_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
assistant.db-shm
.agent.lock
.devserver.lock
.pause_drain
.claude_settings.json
.claude_assistant_settings.json
.claude_settings.expand.*.json
Expand Down Expand Up @@ -145,6 +146,15 @@ def get_claude_assistant_settings_path(project_dir: Path) -> Path:
return _resolve_path(project_dir, ".claude_assistant_settings.json")


def get_pause_drain_path(project_dir: Path) -> Path:
"""Return the path to the ``.pause_drain`` signal file.

This file is created to request a graceful pause (drain mode).
Always uses the new location since it's a transient signal file.
"""
return project_dir / ".autoforge" / ".pause_drain"


def get_progress_cache_path(project_dir: Path) -> Path:
"""Resolve the path to ``.progress_cache``."""
return _resolve_path(project_dir, ".progress_cache")
Expand Down
45 changes: 45 additions & 0 deletions parallel_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def __init__(
# Signal handlers only set this flag; cleanup happens in the main loop
self._shutdown_requested = False

# Graceful pause (drain mode) flag
self._drain_requested = False

# Session tracking for logging/debugging
self.session_start_time: datetime | None = None

Expand Down Expand Up @@ -1368,6 +1371,9 @@ async def run_loop(self):
# Must happen before any debug_log.log() calls
debug_log.start_session()

# Clear any stale drain signal from a previous session
self._clear_drain_signal()

# Log startup to debug file
debug_log.section("ORCHESTRATOR STARTUP")
debug_log.log("STARTUP", "Orchestrator run_loop starting",
Expand Down Expand Up @@ -1489,6 +1495,34 @@ async def run_loop(self):
print("\nAll features complete!", flush=True)
break

# --- Graceful pause (drain mode) ---
if not self._drain_requested and self._check_drain_signal():
self._drain_requested = True
print("Graceful pause requested - draining running agents...", flush=True)
debug_log.log("DRAIN", "Graceful pause requested, draining running agents")

if self._drain_requested:
with self._lock:
coding_count = len(self.running_coding_agents)
testing_count = len(self.running_testing_agents)

if coding_count == 0 and testing_count == 0:
print("All agents drained - paused.", flush=True)
debug_log.log("DRAIN", "All agents drained, entering paused state")
# Wait until signal file is removed (resume) or shutdown
while self._check_drain_signal() and self.is_running and not self._shutdown_requested:
await asyncio.sleep(1)
if not self.is_running or self._shutdown_requested:
break
self._drain_requested = False
print("Resuming from graceful pause...", flush=True)
debug_log.log("DRAIN", "Resuming from graceful pause")
continue
else:
debug_log.log("DRAIN", f"Waiting for agents to finish: coding={coding_count}, testing={testing_count}")
await self._wait_for_agent_completion()
continue

# Maintain testing agents independently (runs every iteration)
self._maintain_testing_agents(feature_dicts)

Expand Down Expand Up @@ -1613,6 +1647,17 @@ def get_status(self) -> dict:
"yolo_mode": self.yolo_mode,
}

def _check_drain_signal(self) -> bool:
"""Check if the graceful pause (drain) signal file exists."""
from autoforge_paths import get_pause_drain_path
return get_pause_drain_path(self.project_dir).exists()

def _clear_drain_signal(self) -> None:
"""Delete the drain signal file and reset the flag."""
from autoforge_paths import get_pause_drain_path
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
self._drain_requested = False

def cleanup(self) -> None:
"""Clean up database resources. Safe to call multiple times.

Expand Down
28 changes: 28 additions & 0 deletions server/routers/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,31 @@ async def resume_agent(project_name: str):
status=manager.status,
message=message,
)


@router.post("/graceful-pause", response_model=AgentActionResponse)
async def graceful_pause_agent(project_name: str):
"""Request a graceful pause (drain mode) - finish current work then pause."""
manager = get_project_manager(project_name)

success, message = await manager.graceful_pause()

return AgentActionResponse(
success=success,
status=manager.status,
message=message,
)


@router.post("/graceful-resume", response_model=AgentActionResponse)
async def graceful_resume_agent(project_name: str):
"""Resume from a graceful pause."""
manager = get_project_manager(project_name)

success, message = await manager.graceful_resume()

return AgentActionResponse(
success=success,
status=manager.status,
message=message,
)
2 changes: 1 addition & 1 deletion server/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def validate_testing_ratio(cls, v: int | None) -> int | None:

class AgentStatus(BaseModel):
"""Current agent status."""
status: Literal["stopped", "running", "paused", "crashed"]
status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]
pid: int | None = None
started_at: datetime | None = None
yolo_mode: bool = False
Expand Down
77 changes: 71 additions & 6 deletions server/services/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(
self.project_dir = project_dir
self.root_dir = root_dir
self.process: subprocess.Popen | None = None
self._status: Literal["stopped", "running", "paused", "crashed"] = "stopped"
self._status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"] = "stopped"
self.started_at: datetime | None = None
self._output_task: asyncio.Task | None = None
self.yolo_mode: bool = False # YOLO mode for rapid prototyping
Expand All @@ -96,11 +96,11 @@ def __init__(
self.lock_file = get_agent_lock_path(self.project_dir)

@property
def status(self) -> Literal["stopped", "running", "paused", "crashed"]:
def status(self) -> Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]:
return self._status

@status.setter
def status(self, value: Literal["stopped", "running", "paused", "crashed"]):
def status(self, value: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]):
old_status = self._status
self._status = value
if old_status != value:
Expand Down Expand Up @@ -308,6 +308,12 @@ async def _stream_output(self) -> None:
for help_line in AUTH_ERROR_HELP.strip().split('\n'):
await self._broadcast_output(help_line)

# Detect graceful pause status transitions from orchestrator output
if "All agents drained - paused." in decoded:
self.status = "paused_graceful"
elif "Resuming from graceful pause..." in decoded:
self.status = "running"

await self._broadcast_output(sanitized)

except asyncio.CancelledError:
Expand Down Expand Up @@ -355,7 +361,7 @@ async def start(
Returns:
Tuple of (success, message)
"""
if self.status in ("running", "paused"):
if self.status in ("running", "paused", "pausing", "paused_graceful"):
return False, f"Agent is already {self.status}"

if not self._check_lock():
Expand Down Expand Up @@ -481,6 +487,12 @@ async def stop(self) -> tuple[bool, str]:

self._cleanup_stale_features()
self._remove_lock()
# Clean up drain signal file if present
try:
from autoforge_paths import get_pause_drain_path
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
except Exception:
pass
self.status = "stopped"
self.process = None
self.started_at = None
Expand Down Expand Up @@ -541,6 +553,47 @@ async def resume(self) -> tuple[bool, str]:
logger.exception("Failed to resume agent")
return False, f"Failed to resume agent: {e}"

async def graceful_pause(self) -> tuple[bool, str]:
"""Request a graceful pause (drain mode).

Creates a signal file that the orchestrator polls. Running agents
finish their current work before the orchestrator enters a paused state.

Returns:
Tuple of (success, message)
"""
if not self.process or self.status not in ("running",):
return False, "Agent is not running"

try:
from autoforge_paths import get_pause_drain_path
drain_path = get_pause_drain_path(self.project_dir)
drain_path.parent.mkdir(parents=True, exist_ok=True)
drain_path.write_text(str(self.process.pid))
self.status = "pausing"
return True, "Graceful pause requested"
except Exception as e:
logger.exception("Failed to request graceful pause")
return False, f"Failed to request graceful pause: {e}"

async def graceful_resume(self) -> tuple[bool, str]:
"""Resume from a graceful pause by removing the drain signal file.

Returns:
Tuple of (success, message)
"""
if not self.process or self.status not in ("pausing", "paused_graceful"):
return False, "Agent is not in a graceful pause state"

try:
from autoforge_paths import get_pause_drain_path
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
self.status = "running"
return True, "Agent resumed from graceful pause"
except Exception as e:
logger.exception("Failed to resume from graceful pause")
return False, f"Failed to resume: {e}"

async def healthcheck(self) -> bool:
"""
Check if the agent process is still alive.
Expand All @@ -556,8 +609,14 @@ async def healthcheck(self) -> bool:
poll = self.process.poll()
if poll is not None:
# Process has terminated
if self.status in ("running", "paused"):
if self.status in ("running", "paused", "pausing", "paused_graceful"):
self._cleanup_stale_features()
# Clean up drain signal file if present
try:
from autoforge_paths import get_pause_drain_path
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
except Exception:
pass
self.status = "crashed"
self._remove_lock()
return False
Expand Down Expand Up @@ -642,8 +701,14 @@ def cleanup_orphaned_locks() -> int:
if not project_path.exists():
continue

# Clean up stale drain signal files
from autoforge_paths import get_autoforge_dir, get_pause_drain_path
drain_file = get_pause_drain_path(project_path)
if drain_file.exists():
drain_file.unlink(missing_ok=True)
logger.info("Removed stale drain signal file for project '%s'", name)

# Check both legacy and new locations for lock files
from autoforge_paths import get_autoforge_dir
lock_locations = [
project_path / ".agent.lock",
get_autoforge_dir(project_path) / ".agent.lock",
Expand Down
27 changes: 27 additions & 0 deletions server/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
'testing_complete': re.compile(r'Feature #(\d+) testing (completed|failed)'),
'all_complete': re.compile(r'All features complete'),
'blocked_features': re.compile(r'(\d+) blocked by dependencies'),
'drain_start': re.compile(r'Graceful pause requested'),
'drain_complete': re.compile(r'All agents drained'),
'drain_resume': re.compile(r'Resuming from graceful pause'),
}


Expand Down Expand Up @@ -562,6 +565,30 @@ async def process_line(self, line: str) -> dict | None:
'All features complete!'
)

# Graceful pause (drain mode) events
elif ORCHESTRATOR_PATTERNS['drain_start'].search(line):
self.state = 'draining'
update = self._create_update(
'drain_start',
'Draining active agents...'
)

elif ORCHESTRATOR_PATTERNS['drain_complete'].search(line):
self.state = 'paused'
self.coding_agents = 0
self.testing_agents = 0
update = self._create_update(
'drain_complete',
'All agents drained. Paused.'
)

elif ORCHESTRATOR_PATTERNS['drain_resume'].search(line):
self.state = 'scheduling'
update = self._create_update(
'drain_resume',
'Resuming feature scheduling'
)

return update

def _create_update(
Expand Down
Loading