Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 312 additions & 10 deletions snapagent/agent/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from __future__ import annotations

import asyncio
import json
import os
import shutil
from contextlib import AsyncExitStack
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
Expand Down Expand Up @@ -320,11 +323,12 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:
task = self._doctor_tasks.get(key)
task_status = "running" if task and not task.done() else "idle"
mode = "on" if session.metadata.get("doctor_mode") else "off"
codex_session = session.metadata.get("doctor_codex_session_id") or "-"
await self.bus.publish_outbound(
OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=f"🩺 Doctor status: {task_status} (mode={mode}).",
content=f"🩺 Doctor status: {task_status} (mode={mode}, codex_session={codex_session}).",
run_id=run_id,
turn_id=turn_id,
)
Expand All @@ -334,6 +338,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:
if action == "cancel":
total = await self._cancel_session_tasks(key, msg.chat_id)
session.metadata.pop("doctor_mode", None)
session.metadata.pop("doctor_codex_session_id", None)
self.sessions.save(session)
await self.bus.publish_outbound(
OutboundMessage(
Expand All @@ -348,6 +353,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:

if action == "resume":
session.metadata.pop("doctor_mode", None)
session.metadata.pop("doctor_codex_session_id", None)
self.sessions.save(session)
await self.bus.publish_outbound(
OutboundMessage(
Expand All @@ -362,7 +368,9 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:

note = text[len("/doctor") :].strip() if text.lower().startswith("/doctor") else ""
total = await self._cancel_session_tasks(key, msg.chat_id)
guidance = self._doctor_setup_guidance()
session.metadata.pop("doctor_codex_session_id", None)
doctor_cli_available = self._doctor_cli_available()
guidance = None if doctor_cli_available else self._doctor_setup_guidance()
if guidance:
session.metadata.pop("doctor_mode", None)
self.sessions.save(session)
Expand Down Expand Up @@ -408,8 +416,251 @@ async def _handle_doctor(self, msg: InboundMessage) -> None:
)
task = asyncio.create_task(self._dispatch(follow_up))
self._doctor_tasks[key] = task
self._active_tasks.setdefault(follow_up.session_key, []).append(task)
task.add_done_callback(lambda t, k=follow_up.session_key: self._cleanup_task(k, t))
self._active_tasks.setdefault(key, []).append(task)
task.add_done_callback(lambda t, k=key: self._cleanup_task(k, t))

def _doctor_cli_available(self) -> bool:
"""Return whether `codex` CLI is available on PATH."""
return shutil.which("codex") is not None

def _doctor_codex_model(self) -> str:
"""Resolve doctor Codex CLI model from env with a stable default."""
model = os.environ.get("SNAPAGENT_DOCTOR_CODEX_MODEL", "").strip()
return model or "gpt-5.3-codex"

def _build_doctor_codex_command(
self, prompt: str, *, resume_session_id: str | None = None
) -> list[str]:
"""Build Codex CLI command for doctor diagnostics."""
if resume_session_id:
return [
"codex",
"exec",
"resume",
"--json",
"--skip-git-repo-check",
"--model",
self._doctor_codex_model(),
"-c",
'approval_policy="never"',
"-c",
'model_reasoning_effort="high"',
resume_session_id,
prompt,
]
return [
"codex",
"exec",
"--json",
"--skip-git-repo-check",
"--model",
self._doctor_codex_model(),
"--sandbox",
"workspace-write",
"--full-auto",
"-c",
'approval_policy="never"',
"-c",
'model_reasoning_effort="high"',
prompt,
]

async def _run_doctor_via_codex_cli(
self,
*,
msg: InboundMessage,
prompt: str,
run_id: str,
turn_id: str,
session_key: str | None = None,
publish: bool = True,
) -> tuple[str, bool]:
"""Run doctor diagnostics through Codex CLI.

Returns:
(final_message, success)
"""
resume_session_id = self._get_doctor_codex_session_id(session_key) if session_key else None
cmd = self._build_doctor_codex_command(prompt, resume_session_id=resume_session_id)
proc: asyncio.subprocess.Process | None = None
stderr_task: asyncio.Task[bytes] | None = None

try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.workspace),
)
except FileNotFoundError:
final = (
"🩺 Doctor failed: codex CLI not found on PATH. "
"Please install Codex CLI or use provider-based diagnostics."
)
if publish:
await self.bus.publish_outbound(
OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final,
run_id=run_id,
turn_id=turn_id,
)
)
return final, False
except Exception as e:
final = f"🩺 Doctor failed to start codex CLI: {e}"
if publish:
await self.bus.publish_outbound(
OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final,
run_id=run_id,
turn_id=turn_id,
)
)
return final, False

try:
if proc.stderr is not None:
# Drain stderr concurrently to avoid subprocess pipe backpressure deadlocks.
stderr_task = asyncio.create_task(proc.stderr.read())
output, session_id = await self._read_codex_cli_output(proc.stdout)
exit_code = await proc.wait()
stderr_text = ""
if stderr_task is not None:
stderr_text = (await stderr_task).decode("utf-8", "replace").strip()

if exit_code == 0:
final = output or "Doctor completed via Codex CLI, but no final message was captured."
else:
detail = stderr_text or output or f"exited with code {exit_code}"
final = f"🩺 Doctor via Codex CLI failed: {detail}"

if session_id:
if session_key:
self._set_doctor_codex_session_id(session_key, session_id)
final = f"{final}\n\n(codex session: {session_id})"

success = exit_code == 0
if publish:
await self.bus.publish_outbound(
OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final,
run_id=run_id,
turn_id=turn_id,
)
)
return final, success
except asyncio.CancelledError:
if proc.returncode is None:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
proc.kill()
if stderr_task is not None and not stderr_task.done():
stderr_task.cancel()
raise
except Exception as e:
if stderr_task is not None and not stderr_task.done():
stderr_task.cancel()
final = f"🩺 Doctor via Codex CLI errored: {e}"
if publish:
await self.bus.publish_outbound(
OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=final,
run_id=run_id,
turn_id=turn_id,
)
)
return final, False

def _get_doctor_codex_session_id(self, session_key: str) -> str | None:
"""Get stored Codex session ID for a doctor-mode chat session."""
session = self.sessions.get_or_create(session_key)
raw = session.metadata.get("doctor_codex_session_id")
if isinstance(raw, str) and raw.strip():
return raw.strip()
return None

def _set_doctor_codex_session_id(self, session_key: str, session_id: str) -> None:
"""Persist Codex session ID for doctor-mode resume calls."""
if not session_id.strip():
return
session = self.sessions.get_or_create(session_key)
session.metadata["doctor_codex_session_id"] = session_id.strip()
self.sessions.save(session)

async def _read_codex_cli_output(
self, stream: asyncio.StreamReader | None
) -> tuple[str | None, str | None]:
"""Read Codex `--json` stdout and return (assistant_message, session_id)."""
if stream is None:
return None, None

last_message: str | None = None
session_id: str | None = None

while True:
line = await stream.readline()
if not line:
break

text = line.decode("utf-8", "replace").strip()
if not text:
continue

try:
event = json.loads(text)
except json.JSONDecodeError:
continue

if event.get("type") == "thread.started":
thread_id = event.get("thread_id")
if isinstance(thread_id, str) and thread_id.strip():
session_id = thread_id.strip()
continue

if event.get("type") == "item.completed" and isinstance(event.get("item"), dict):
item = event["item"]
msg = ""
if item.get("type") == "message":
content_parts = item.get("content", [])
texts = [
p.get("text", "")
for p in content_parts
if isinstance(p, dict) and p.get("type") == "output_text"
]
msg = "\n".join(texts).strip()
elif item.get("type") == "agent_message":
msg = str(item.get("text", "")).strip()
if msg:
last_message = msg
continue

if (
event.get("type") == "response_item"
and isinstance(event.get("payload"), dict)
and event["payload"].get("type") == "message"
and event["payload"].get("role") == "assistant"
):
content_parts = event["payload"].get("content", [])
texts = [
p.get("text", "")
for p in content_parts
if isinstance(p, dict) and p.get("type") == "output_text"
]
msg = "\n".join(texts).strip()
if msg:
last_message = msg

return last_message, session_id

async def _cancel_session_tasks(self, session_key: str, chat_id: str) -> int:
"""Cancel all active tasks, doctor task, and subagents for one session."""
Expand Down Expand Up @@ -747,16 +998,67 @@ async def _process_message(
turn_id=turn_id,
)
elif session.metadata.get("doctor_mode") and not cmd.startswith("/"):
doctor_prompt = (
"[Doctor Mode] Diagnose issues using evidence first. "
"Use doctor_check with check=health/status/logs/events as needed. "
"Cite observed evidence and then propose next actions.\n\n"
+ msg.content
)
if self._doctor_cli_available():
codex_final, codex_ok = await self._run_doctor_via_codex_cli(
msg=msg,
prompt=doctor_prompt,
run_id=run_id,
turn_id=turn_id,
session_key=key,
publish=False,
)
if codex_ok:
return OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=codex_final,
metadata={
**(msg.metadata or {}),
"run_id": run_id,
"turn_id": turn_id,
},
run_id=run_id,
turn_id=turn_id,
)
logger.warning(
"Codex CLI doctor run failed; falling back to provider diagnostics for {}",
key,
)
fallback_notice = (
"🩺 Codex CLI diagnostics failed; "
"falling back to provider-based diagnostics.\n\n"
f"{codex_final}"
)
if on_progress is not None:
await on_progress(fallback_notice)
elif msg.channel != "cli":
await self.bus.publish_outbound(
OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content=fallback_notice,
metadata={
**(msg.metadata or {}),
"_progress": True,
"run_id": run_id,
"turn_id": turn_id,
},
run_id=run_id,
turn_id=turn_id,
)
)

msg = InboundMessage(
channel=msg.channel,
sender_id=msg.sender_id,
chat_id=msg.chat_id,
content=(
"[Doctor Mode] Diagnose issues using evidence first. "
"Use doctor_check with check=health/status/logs/events as needed. "
"Cite observed evidence and then propose next actions.\n\n"
+ msg.content
),
content=doctor_prompt,
timestamp=msg.timestamp,
media=msg.media,
metadata=msg.metadata,
Expand Down
Loading