From 5246f6aa7a83a54ccfc2b90ad8b9023338156c72 Mon Sep 17 00:00:00 2001 From: pikaxinge <2392811793@qq.com> Date: Sat, 28 Feb 2026 06:13:55 +0000 Subject: [PATCH 1/4] feat(doctor): run diagnostics via codex cli with resume support --- snapagent/agent/loop.py | 288 ++++++++++++++++++++++++++++++++--- tests/test_doctor_command.py | 146 ++++++++++++++++++ 2 files changed, 415 insertions(+), 19 deletions(-) diff --git a/snapagent/agent/loop.py b/snapagent/agent/loop.py index 30a8d42..bb4c2be 100644 --- a/snapagent/agent/loop.py +++ b/snapagent/agent/loop.py @@ -3,6 +3,9 @@ from __future__ import annotations import asyncio +import json +import os +import shutil from contextlib import AsyncExitStack from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable @@ -320,11 +323,12 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: task = self._doctor_tasks.get(key) task_status = "running" if task and not task.done() else "idle" mode = "on" if session.metadata.get("doctor_mode") else "off" + codex_session = session.metadata.get("doctor_codex_session_id") or "-" await self.bus.publish_outbound( OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content=f"🩺 Doctor status: {task_status} (mode={mode}).", + content=f"🩺 Doctor status: {task_status} (mode={mode}, codex_session={codex_session}).", run_id=run_id, turn_id=turn_id, ) @@ -334,6 +338,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: if action == "cancel": total = await self._cancel_session_tasks(key, msg.chat_id) session.metadata.pop("doctor_mode", None) + session.metadata.pop("doctor_codex_session_id", None) self.sessions.save(session) await self.bus.publish_outbound( OutboundMessage( @@ -348,6 +353,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: if action == "resume": session.metadata.pop("doctor_mode", None) + session.metadata.pop("doctor_codex_session_id", None) self.sessions.save(session) await self.bus.publish_outbound( OutboundMessage( @@ -362,6 +368,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: note = text[len("/doctor") :].strip() if text.lower().startswith("/doctor") else "" total = await self._cancel_session_tasks(key, msg.chat_id) + session.metadata.pop("doctor_codex_session_id", None) guidance = self._doctor_setup_guidance() if guidance: session.metadata.pop("doctor_mode", None) @@ -397,19 +404,251 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: "Please self-diagnose this session. Collect evidence with doctor_check " "(health/status/logs/events) before conclusions." ) - follow_up = InboundMessage( - channel=msg.channel, - sender_id=msg.sender_id, - chat_id=msg.chat_id, - content=bootstrap, - media=[], - metadata=dict(msg.metadata or {}), - session_key_override=msg.session_key_override, - ) - task = asyncio.create_task(self._dispatch(follow_up)) + if self._doctor_cli_available(): + task = asyncio.create_task( + self._run_doctor_via_codex_cli( + msg=msg, + prompt=bootstrap, + run_id=run_id, + turn_id=turn_id, + session_key=key, + ) + ) + else: + logger.warning("Codex CLI unavailable; falling back to provider-based doctor flow") + follow_up = InboundMessage( + channel=msg.channel, + sender_id=msg.sender_id, + chat_id=msg.chat_id, + content=bootstrap, + media=[], + metadata=dict(msg.metadata or {}), + session_key_override=msg.session_key_override, + ) + task = asyncio.create_task(self._dispatch(follow_up)) self._doctor_tasks[key] = task - self._active_tasks.setdefault(follow_up.session_key, []).append(task) - task.add_done_callback(lambda t, k=follow_up.session_key: self._cleanup_task(k, t)) + self._active_tasks.setdefault(key, []).append(task) + task.add_done_callback(lambda t, k=key: self._cleanup_task(k, t)) + + def _doctor_cli_available(self) -> bool: + """Return whether `codex` CLI is available on PATH.""" + return shutil.which("codex") is not None + + def _doctor_codex_model(self) -> str: + """Resolve doctor Codex CLI model from env with a stable default.""" + model = os.environ.get("SNAPAGENT_DOCTOR_CODEX_MODEL", "").strip() + return model or "gpt-5.3-codex" + + def _build_doctor_codex_command( + self, prompt: str, *, resume_session_id: str | None = None + ) -> list[str]: + """Build Codex CLI command for doctor diagnostics.""" + if resume_session_id: + return [ + "codex", + "exec", + "resume", + "--json", + "--skip-git-repo-check", + "--model", + self._doctor_codex_model(), + "-c", + 'approval_policy="never"', + "-c", + 'model_reasoning_effort="high"', + resume_session_id, + prompt, + ] + return [ + "codex", + "exec", + "--json", + "--skip-git-repo-check", + "--model", + self._doctor_codex_model(), + "--sandbox", + "workspace-write", + "--full-auto", + "-c", + 'approval_policy="never"', + "-c", + 'model_reasoning_effort="high"', + prompt, + ] + + async def _run_doctor_via_codex_cli( + self, + *, + msg: InboundMessage, + prompt: str, + run_id: str, + turn_id: str, + session_key: str | None = None, + ) -> None: + """Run doctor diagnostics through Codex CLI and publish final result.""" + resume_session_id = self._get_doctor_codex_session_id(session_key) if session_key else None + cmd = self._build_doctor_codex_command(prompt, resume_session_id=resume_session_id) + proc: asyncio.subprocess.Process | None = None + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(self.workspace), + ) + except FileNotFoundError: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=( + "🩺 Doctor failed: codex CLI not found on PATH. " + "Please install Codex CLI or use provider-based diagnostics." + ), + run_id=run_id, + turn_id=turn_id, + ) + ) + return + except Exception as e: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"🩺 Doctor failed to start codex CLI: {e}", + run_id=run_id, + turn_id=turn_id, + ) + ) + return + + try: + output, session_id = await self._read_codex_cli_output(proc.stdout) + exit_code = await proc.wait() + stderr_text = "" + if proc.stderr is not None: + stderr_text = (await proc.stderr.read()).decode("utf-8", "replace").strip() + + if exit_code == 0: + final = output or "Doctor completed via Codex CLI, but no final message was captured." + else: + detail = stderr_text or output or f"exited with code {exit_code}" + final = f"🩺 Doctor via Codex CLI failed: {detail}" + + if session_id: + if session_key: + self._set_doctor_codex_session_id(session_key, session_id) + final = f"{final}\n\n(codex session: {session_id})" + + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) + ) + except asyncio.CancelledError: + if proc.returncode is None: + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=2.0) + except asyncio.TimeoutError: + proc.kill() + raise + except Exception as e: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=f"🩺 Doctor via Codex CLI errored: {e}", + run_id=run_id, + turn_id=turn_id, + ) + ) + + def _get_doctor_codex_session_id(self, session_key: str) -> str | None: + """Get stored Codex session ID for a doctor-mode chat session.""" + session = self.sessions.get_or_create(session_key) + raw = session.metadata.get("doctor_codex_session_id") + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return None + + def _set_doctor_codex_session_id(self, session_key: str, session_id: str) -> None: + """Persist Codex session ID for doctor-mode resume calls.""" + if not session_id.strip(): + return + session = self.sessions.get_or_create(session_key) + session.metadata["doctor_codex_session_id"] = session_id.strip() + self.sessions.save(session) + + async def _read_codex_cli_output( + self, stream: asyncio.StreamReader | None + ) -> tuple[str | None, str | None]: + """Read Codex `--json` stdout and return (assistant_message, session_id).""" + if stream is None: + return None, None + + last_message: str | None = None + session_id: str | None = None + + while True: + line = await stream.readline() + if not line: + break + + text = line.decode("utf-8", "replace").strip() + if not text: + continue + + try: + event = json.loads(text) + except json.JSONDecodeError: + continue + + if event.get("type") == "thread.started": + thread_id = event.get("thread_id") + if isinstance(thread_id, str) and thread_id.strip(): + session_id = thread_id.strip() + continue + + if event.get("type") == "item.completed" and isinstance(event.get("item"), dict): + item = event["item"] + msg = "" + if item.get("type") == "message": + content_parts = item.get("content", []) + texts = [ + p.get("text", "") + for p in content_parts + if isinstance(p, dict) and p.get("type") == "output_text" + ] + msg = "\n".join(texts).strip() + elif item.get("type") == "agent_message": + msg = str(item.get("text", "")).strip() + if msg: + last_message = msg + continue + + if ( + event.get("type") == "response_item" + and isinstance(event.get("payload"), dict) + and event["payload"].get("type") == "message" + and event["payload"].get("role") == "assistant" + ): + content_parts = event["payload"].get("content", []) + texts = [ + p.get("text", "") + for p in content_parts + if isinstance(p, dict) and p.get("type") == "output_text" + ] + msg = "\n".join(texts).strip() + if msg: + last_message = msg + + return last_message, session_id async def _cancel_session_tasks(self, session_key: str, chat_id: str) -> int: """Cancel all active tasks, doctor task, and subagents for one session.""" @@ -747,16 +986,27 @@ async def _process_message( turn_id=turn_id, ) elif session.metadata.get("doctor_mode") and not cmd.startswith("/"): + doctor_prompt = ( + "[Doctor Mode] Diagnose issues using evidence first. " + "Use doctor_check with check=health/status/logs/events as needed. " + "Cite observed evidence and then propose next actions.\n\n" + + msg.content + ) + if self._doctor_cli_available(): + await self._run_doctor_via_codex_cli( + msg=msg, + prompt=doctor_prompt, + run_id=run_id, + turn_id=turn_id, + session_key=key, + ) + return None + msg = InboundMessage( channel=msg.channel, sender_id=msg.sender_id, chat_id=msg.chat_id, - content=( - "[Doctor Mode] Diagnose issues using evidence first. " - "Use doctor_check with check=health/status/logs/events as needed. " - "Cite observed evidence and then propose next actions.\n\n" - + msg.content - ), + content=doctor_prompt, timestamp=msg.timestamp, media=msg.media, metadata=msg.metadata, diff --git a/tests/test_doctor_command.py b/tests/test_doctor_command.py index ce6cfb4..ab0a5e8 100644 --- a/tests/test_doctor_command.py +++ b/tests/test_doctor_command.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import json from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -63,6 +64,40 @@ async def slow_task(): assert "doctor mode" in out.content.lower() +@pytest.mark.asyncio +async def test_doctor_start_prefers_codex_cli_when_available(): + loop, bus, session = _make_loop() + loop._doctor_setup_guidance = MagicMock(return_value=None) + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") + await loop._handle_doctor(msg) + await asyncio.sleep(0) + + assert session.metadata.get("doctor_mode") is True + loop._run_doctor_via_codex_cli.assert_awaited_once() + loop._dispatch.assert_not_awaited() + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "doctor mode" in out.content.lower() + + +@pytest.mark.asyncio +async def test_doctor_start_falls_back_when_codex_cli_unavailable(): + loop, _bus, session = _make_loop() + loop._doctor_setup_guidance = MagicMock(return_value=None) + loop._doctor_cli_available = MagicMock(return_value=False) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") + await loop._handle_doctor(msg) + await asyncio.sleep(0) + + assert session.metadata.get("doctor_mode") is True + loop._run_doctor_via_codex_cli.assert_not_awaited() + assert loop._dispatch.await_count == 1 + + @pytest.mark.asyncio async def test_doctor_status_reports_idle(): loop, bus, _session = _make_loop() @@ -132,3 +167,114 @@ async def test_run_does_not_route_doctor_typo_to_doctor_handler(): loop._handle_doctor.assert_not_awaited() assert loop._dispatch.await_count == 1 + + +@pytest.mark.asyncio +async def test_read_codex_cli_output_parses_session_and_message(): + loop, _bus, _session = _make_loop() + reader = asyncio.StreamReader() + reader.feed_data( + ( + json.dumps({"type": "thread.started", "thread_id": "th_123"}) + "\n" + + json.dumps( + { + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "diag ok"}], + }, + } + ) + + "\n" + ).encode("utf-8") + ) + reader.feed_eof() + + output, session_id = await loop._read_codex_cli_output(reader) + assert output == "diag ok" + assert session_id == "th_123" + + +def test_build_doctor_codex_command_with_resume_session(): + loop, _bus, _session = _make_loop() + loop._doctor_codex_model = MagicMock(return_value="gpt-5.3-codex") + + cmd = loop._build_doctor_codex_command( + "check status", + resume_session_id="th_abc", + ) + assert cmd[:4] == ["codex", "exec", "resume", "--json"] + assert "th_abc" in cmd + assert cmd[-1] == "check status" + + +@pytest.mark.asyncio +async def test_run_doctor_via_codex_cli_persists_session_id(monkeypatch): + loop, bus, session = _make_loop() + + class _FakeProc: + def __init__(self): + self.stdout = asyncio.StreamReader() + self.stdout.feed_data( + ( + json.dumps({"type": "thread.started", "thread_id": "th_saved"}) + "\n" + + json.dumps( + { + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "done"}], + }, + } + ) + + "\n" + ).encode("utf-8") + ) + self.stdout.feed_eof() + self.stderr = asyncio.StreamReader() + self.stderr.feed_eof() + self.returncode = None + + async def wait(self): + self.returncode = 0 + return 0 + + def terminate(self): + self.returncode = -15 + + def kill(self): + self.returncode = -9 + + async def _fake_spawn(*_args, **_kwargs): + return _FakeProc() + + monkeypatch.setattr(asyncio, "create_subprocess_exec", _fake_spawn) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="diag") + await loop._run_doctor_via_codex_cli( + msg=msg, + prompt="diag", + run_id="r1", + turn_id="t1", + session_key="test:c1", + ) + + assert session.metadata.get("doctor_codex_session_id") == "th_saved" + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "codex session: th_saved" in out.content + + +@pytest.mark.asyncio +async def test_process_message_doctor_mode_routes_to_codex_cli(): + loop, _bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="check logs") + result = await loop._process_message(msg) + + assert result is None + loop._run_doctor_via_codex_cli.assert_awaited_once() From 2b0e4e51d588da6abb65eec0cadaa7c1119cb72e Mon Sep 17 00:00:00 2001 From: pikaxinge <2392811793@qq.com> Date: Sat, 28 Feb 2026 06:29:08 +0000 Subject: [PATCH 2/4] fix: gate doctor precheck by CLI and chunk Feishu long replies --- snapagent/agent/loop.py | 5 +-- snapagent/channels/feishu.py | 70 +++++++++++++++++++++++++++++------- tests/test_doctor_command.py | 18 ++++++++++ tests/test_feishu_channel.py | 47 ++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 14 deletions(-) create mode 100644 tests/test_feishu_channel.py diff --git a/snapagent/agent/loop.py b/snapagent/agent/loop.py index bb4c2be..40f3159 100644 --- a/snapagent/agent/loop.py +++ b/snapagent/agent/loop.py @@ -369,7 +369,8 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: note = text[len("/doctor") :].strip() if text.lower().startswith("/doctor") else "" total = await self._cancel_session_tasks(key, msg.chat_id) session.metadata.pop("doctor_codex_session_id", None) - guidance = self._doctor_setup_guidance() + doctor_cli_available = self._doctor_cli_available() + guidance = None if doctor_cli_available else self._doctor_setup_guidance() if guidance: session.metadata.pop("doctor_mode", None) self.sessions.save(session) @@ -404,7 +405,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: "Please self-diagnose this session. Collect evidence with doctor_check " "(health/status/logs/events) before conclusions." ) - if self._doctor_cli_available(): + if doctor_cli_available: task = asyncio.create_task( self._run_doctor_via_codex_cli( msg=msg, diff --git a/snapagent/channels/feishu.py b/snapagent/channels/feishu.py index 890a8a8..e4973e0 100644 --- a/snapagent/channels/feishu.py +++ b/snapagent/channels/feishu.py @@ -46,6 +46,51 @@ "sticker": "[sticker]", } +FEISHU_CARD_CHUNK_MAX_LEN = 2000 + + +def _split_message(content: str, max_len: int = FEISHU_CARD_CHUNK_MAX_LEN) -> list[str]: + """Split content into chunks within max_len, preferring paragraph and line breaks.""" + if not content: + return [] + if len(content) <= max_len: + return [content] + + chunks: list[str] = [] + remaining = content + while remaining: + if len(remaining) <= max_len: + chunks.append(remaining) + break + + cut = remaining[:max_len] + sep_len = 0 + pos = cut.rfind("\n\n") + if pos > 0: + sep_len = 2 + if pos <= 0: + pos = cut.rfind("\n") + if pos > 0: + sep_len = 1 + if pos <= 0: + pos = cut.rfind(" ") + if pos > 0: + sep_len = 1 + if pos <= 0: + pos = max_len + sep_len = 0 + else: + pos += sep_len + + chunk = remaining[:pos] + if not chunk: + pos = max_len + chunk = remaining[:pos] + + chunks.append(chunk) + remaining = remaining[pos:] + return chunks + def _extract_share_card_content(content_json: dict, msg_type: str) -> str: """Extract text representation from share cards and interactive messages.""" @@ -709,18 +754,19 @@ async def send(self, msg: OutboundMessage) -> None: ) if msg.content and msg.content.strip(): - card = { - "config": {"wide_screen_mode": True}, - "elements": self._build_card_elements(msg.content), - } - await loop.run_in_executor( - None, - self._send_message_sync, - receive_id_type, - msg.chat_id, - "interactive", - json.dumps(card, ensure_ascii=False), - ) + for chunk in _split_message(msg.content): + card = { + "config": {"wide_screen_mode": True}, + "elements": self._build_card_elements(chunk), + } + await loop.run_in_executor( + None, + self._send_message_sync, + receive_id_type, + msg.chat_id, + "interactive", + json.dumps(card, ensure_ascii=False), + ) except Exception as e: logger.error("Error sending Feishu message: {}", e) diff --git a/tests/test_doctor_command.py b/tests/test_doctor_command.py index ab0a5e8..fb5faeb 100644 --- a/tests/test_doctor_command.py +++ b/tests/test_doctor_command.py @@ -129,6 +129,7 @@ async def test_doctor_cancel_disables_mode(): async def test_doctor_start_shows_setup_guidance_when_provider_not_ready(): loop, bus, session = _make_loop() loop._doctor_setup_guidance = MagicMock(return_value="setup guide") + loop._doctor_cli_available = MagicMock(return_value=False) msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") await loop._handle_doctor(msg) @@ -140,6 +141,23 @@ async def test_doctor_start_shows_setup_guidance_when_provider_not_ready(): loop._dispatch.assert_not_called() +@pytest.mark.asyncio +async def test_doctor_start_skips_setup_guidance_when_codex_cli_available(): + loop, _bus, session = _make_loop() + loop._doctor_setup_guidance = MagicMock(return_value="setup guide") + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") + await loop._handle_doctor(msg) + await asyncio.sleep(0) + + assert session.metadata.get("doctor_mode") is True + loop._doctor_setup_guidance.assert_not_called() + loop._run_doctor_via_codex_cli.assert_awaited_once() + loop._dispatch.assert_not_awaited() + + @pytest.mark.asyncio async def test_help_includes_doctor_commands(): loop, _bus, _session = _make_loop() diff --git a/tests/test_feishu_channel.py b/tests/test_feishu_channel.py new file mode 100644 index 0000000..80d8d3a --- /dev/null +++ b/tests/test_feishu_channel.py @@ -0,0 +1,47 @@ +"""Feishu channel chunking behavior tests.""" + +from __future__ import annotations + +import pytest + +from snapagent.bus.events import OutboundMessage +from snapagent.bus.queue import MessageBus +from snapagent.channels.feishu import FeishuChannel, _split_message +from snapagent.config.schema import FeishuConfig + + +def _make_channel() -> FeishuChannel: + channel = FeishuChannel(config=FeishuConfig(), bus=MessageBus()) + channel._client = object() + return channel + + +def test_split_message_force_splits_when_no_separator(): + content = "x" * 23 + chunks = _split_message(content, max_len=8) + assert chunks == ["x" * 8, "x" * 8, "x" * 7] + + +def test_split_message_preserves_whitespace_and_newlines(): + content = "line1\n\nline2 with space \nline3" + chunks = _split_message(content, max_len=10) + assert "".join(chunks) == content + + +@pytest.mark.asyncio +async def test_feishu_send_splits_long_content_into_multiple_cards(): + channel = _make_channel() + calls: list[dict] = [] + + def _fake_send(_receive_id_type: str, _receive_id: str, msg_type: str, content: str) -> bool: + calls.append({"msg_type": msg_type, "content": content}) + return True + + channel._send_message_sync = _fake_send # type: ignore[method-assign] + long_content = "a" * 9000 + await channel.send(OutboundMessage(channel="feishu", chat_id="ou_test", content=long_content)) + + chunks = _split_message(long_content) + assert len(calls) == len(chunks) + assert all(c["msg_type"] == "interactive" for c in calls) + assert all(len(chunk) <= 2000 for chunk in chunks) From b17974b5b9f70575dc9b2c9f4c239701cf232103 Mon Sep 17 00:00:00 2001 From: pikaxinge <2392811793@qq.com> Date: Sat, 28 Feb 2026 06:59:37 +0000 Subject: [PATCH 3/4] fix(doctor): serialize bootstrap flow and fallback on codex runtime failure --- snapagent/agent/loop.py | 146 ++++++++++++++++++++--------------- snapagent/channels/feishu.py | 2 + tests/test_doctor_command.py | 57 ++++++++++++-- tests/test_feishu_channel.py | 10 +++ 4 files changed, 144 insertions(+), 71 deletions(-) diff --git a/snapagent/agent/loop.py b/snapagent/agent/loop.py index 40f3159..8f66764 100644 --- a/snapagent/agent/loop.py +++ b/snapagent/agent/loop.py @@ -405,28 +405,16 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: "Please self-diagnose this session. Collect evidence with doctor_check " "(health/status/logs/events) before conclusions." ) - if doctor_cli_available: - task = asyncio.create_task( - self._run_doctor_via_codex_cli( - msg=msg, - prompt=bootstrap, - run_id=run_id, - turn_id=turn_id, - session_key=key, - ) - ) - else: - logger.warning("Codex CLI unavailable; falling back to provider-based doctor flow") - follow_up = InboundMessage( - channel=msg.channel, - sender_id=msg.sender_id, - chat_id=msg.chat_id, - content=bootstrap, - media=[], - metadata=dict(msg.metadata or {}), - session_key_override=msg.session_key_override, - ) - task = asyncio.create_task(self._dispatch(follow_up)) + follow_up = InboundMessage( + channel=msg.channel, + sender_id=msg.sender_id, + chat_id=msg.chat_id, + content=bootstrap, + media=[], + metadata=dict(msg.metadata or {}), + session_key_override=msg.session_key_override, + ) + task = asyncio.create_task(self._dispatch(follow_up)) self._doctor_tasks[key] = task self._active_tasks.setdefault(key, []).append(task) task.add_done_callback(lambda t, k=key: self._cleanup_task(k, t)) @@ -485,8 +473,13 @@ async def _run_doctor_via_codex_cli( run_id: str, turn_id: str, session_key: str | None = None, - ) -> None: - """Run doctor diagnostics through Codex CLI and publish final result.""" + publish: bool = True, + ) -> tuple[str, bool]: + """Run doctor diagnostics through Codex CLI. + + Returns: + (final_message, success) + """ resume_session_id = self._get_doctor_codex_session_id(session_key) if session_key else None cmd = self._build_doctor_codex_command(prompt, resume_session_id=resume_session_id) proc: asyncio.subprocess.Process | None = None @@ -499,30 +492,34 @@ async def _run_doctor_via_codex_cli( cwd=str(self.workspace), ) except FileNotFoundError: - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=( - "🩺 Doctor failed: codex CLI not found on PATH. " - "Please install Codex CLI or use provider-based diagnostics." - ), - run_id=run_id, - turn_id=turn_id, - ) + final = ( + "🩺 Doctor failed: codex CLI not found on PATH. " + "Please install Codex CLI or use provider-based diagnostics." ) - return + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) + ) + return final, False except Exception as e: - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"🩺 Doctor failed to start codex CLI: {e}", - run_id=run_id, - turn_id=turn_id, + final = f"🩺 Doctor failed to start codex CLI: {e}" + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) ) - ) - return + return final, False try: output, session_id = await self._read_codex_cli_output(proc.stdout) @@ -542,15 +539,18 @@ async def _run_doctor_via_codex_cli( self._set_doctor_codex_session_id(session_key, session_id) final = f"{final}\n\n(codex session: {session_id})" - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=final, - run_id=run_id, - turn_id=turn_id, + success = exit_code == 0 + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) ) - ) + return final, success except asyncio.CancelledError: if proc.returncode is None: proc.terminate() @@ -560,15 +560,18 @@ async def _run_doctor_via_codex_cli( proc.kill() raise except Exception as e: - await self.bus.publish_outbound( - OutboundMessage( - channel=msg.channel, - chat_id=msg.chat_id, - content=f"🩺 Doctor via Codex CLI errored: {e}", - run_id=run_id, - turn_id=turn_id, + final = f"🩺 Doctor via Codex CLI errored: {e}" + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) ) - ) + return final, False def _get_doctor_codex_session_id(self, session_key: str) -> str | None: """Get stored Codex session ID for a doctor-mode chat session.""" @@ -994,14 +997,31 @@ async def _process_message( + msg.content ) if self._doctor_cli_available(): - await self._run_doctor_via_codex_cli( + codex_final, codex_ok = await self._run_doctor_via_codex_cli( msg=msg, prompt=doctor_prompt, run_id=run_id, turn_id=turn_id, session_key=key, + publish=False, + ) + if codex_ok: + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=codex_final, + metadata={ + **(msg.metadata or {}), + "run_id": run_id, + "turn_id": turn_id, + }, + run_id=run_id, + turn_id=turn_id, + ) + logger.warning( + "Codex CLI doctor run failed; falling back to provider diagnostics for {}", + key, ) - return None msg = InboundMessage( channel=msg.channel, diff --git a/snapagent/channels/feishu.py b/snapagent/channels/feishu.py index e4973e0..804272c 100644 --- a/snapagent/channels/feishu.py +++ b/snapagent/channels/feishu.py @@ -53,6 +53,8 @@ def _split_message(content: str, max_len: int = FEISHU_CARD_CHUNK_MAX_LEN) -> li """Split content into chunks within max_len, preferring paragraph and line breaks.""" if not content: return [] + if max_len <= 0: + raise ValueError("max_len must be greater than 0") if len(content) <= max_len: return [content] diff --git a/tests/test_doctor_command.py b/tests/test_doctor_command.py index fb5faeb..b9c8ad2 100644 --- a/tests/test_doctor_command.py +++ b/tests/test_doctor_command.py @@ -69,15 +69,13 @@ async def test_doctor_start_prefers_codex_cli_when_available(): loop, bus, session = _make_loop() loop._doctor_setup_guidance = MagicMock(return_value=None) loop._doctor_cli_available = MagicMock(return_value=True) - loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") await loop._handle_doctor(msg) await asyncio.sleep(0) assert session.metadata.get("doctor_mode") is True - loop._run_doctor_via_codex_cli.assert_awaited_once() - loop._dispatch.assert_not_awaited() + loop._dispatch.assert_awaited_once() out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) assert "doctor mode" in out.content.lower() @@ -146,7 +144,6 @@ async def test_doctor_start_skips_setup_guidance_when_codex_cli_available(): loop, _bus, session = _make_loop() loop._doctor_setup_guidance = MagicMock(return_value="setup guide") loop._doctor_cli_available = MagicMock(return_value=True) - loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") await loop._handle_doctor(msg) @@ -154,8 +151,7 @@ async def test_doctor_start_skips_setup_guidance_when_codex_cli_available(): assert session.metadata.get("doctor_mode") is True loop._doctor_setup_guidance.assert_not_called() - loop._run_doctor_via_codex_cli.assert_awaited_once() - loop._dispatch.assert_not_awaited() + loop._dispatch.assert_awaited_once() @pytest.mark.asyncio @@ -289,10 +285,55 @@ async def test_process_message_doctor_mode_routes_to_codex_cli(): loop, _bus, session = _make_loop() session.metadata["doctor_mode"] = True loop._doctor_cli_available = MagicMock(return_value=True) - loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("diag ok", True)) msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="check logs") result = await loop._process_message(msg) - assert result is None + assert result is not None + assert result.content == "diag ok" + assert result.channel == "test" + assert result.chat_id == "c1" + loop._run_doctor_via_codex_cli.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_process_message_doctor_mode_falls_back_to_provider_when_codex_fails(): + loop, _bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("codex failed", False)) + loop._run_agent_loop = AsyncMock( + return_value=( + "provider diag", + [], + [{"role": "assistant", "content": "provider diag"}], + ) + ) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="check logs") + result = await loop._process_message(msg) + + assert result is not None + assert result.content == "provider diag" + loop._run_doctor_via_codex_cli.assert_awaited_once() + loop._run_agent_loop.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_process_direct_returns_doctor_cli_output_without_empty_fallback(): + loop, _bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("diag via cli", True)) + loop._connect_mcp = AsyncMock(return_value=None) + + response = await loop.process_direct( + "check logs", + session_key="test:c1", + channel="cli", + chat_id="c1", + ) + + assert response == "diag via cli" loop._run_doctor_via_codex_cli.assert_awaited_once() diff --git a/tests/test_feishu_channel.py b/tests/test_feishu_channel.py index 80d8d3a..bf5b68d 100644 --- a/tests/test_feishu_channel.py +++ b/tests/test_feishu_channel.py @@ -2,6 +2,8 @@ from __future__ import annotations +import json + import pytest from snapagent.bus.events import OutboundMessage @@ -28,6 +30,11 @@ def test_split_message_preserves_whitespace_and_newlines(): assert "".join(chunks) == content +def test_split_message_rejects_non_positive_max_len(): + with pytest.raises(ValueError): + _split_message("abc", max_len=0) + + @pytest.mark.asyncio async def test_feishu_send_splits_long_content_into_multiple_cards(): channel = _make_channel() @@ -45,3 +52,6 @@ def _fake_send(_receive_id_type: str, _receive_id: str, msg_type: str, content: assert len(calls) == len(chunks) assert all(c["msg_type"] == "interactive" for c in calls) assert all(len(chunk) <= 2000 for chunk in chunks) + for i, call in enumerate(calls): + payload = json.loads(call["content"]) + assert payload["elements"] == [{"tag": "markdown", "content": chunks[i]}] From d2f5b4635d1337091660c0acc5982e8045a54d0b Mon Sep 17 00:00:00 2001 From: pikaxinge <2392811793@qq.com> Date: Sat, 28 Feb 2026 07:27:35 +0000 Subject: [PATCH 4/4] fix(doctor): stabilize codex fallback signaling and feishu chunk rendering --- snapagent/agent/loop.py | 35 ++++++++++++++++- snapagent/channels/feishu.py | 11 +++--- tests/test_doctor_command.py | 76 +++++++++++++++++++++++++++++++++++- tests/test_feishu_channel.py | 9 +++++ 4 files changed, 123 insertions(+), 8 deletions(-) diff --git a/snapagent/agent/loop.py b/snapagent/agent/loop.py index 8f66764..ef137d4 100644 --- a/snapagent/agent/loop.py +++ b/snapagent/agent/loop.py @@ -483,6 +483,7 @@ async def _run_doctor_via_codex_cli( resume_session_id = self._get_doctor_codex_session_id(session_key) if session_key else None cmd = self._build_doctor_codex_command(prompt, resume_session_id=resume_session_id) proc: asyncio.subprocess.Process | None = None + stderr_task: asyncio.Task[bytes] | None = None try: proc = await asyncio.create_subprocess_exec( @@ -522,11 +523,14 @@ async def _run_doctor_via_codex_cli( return final, False try: + if proc.stderr is not None: + # Drain stderr concurrently to avoid subprocess pipe backpressure deadlocks. + stderr_task = asyncio.create_task(proc.stderr.read()) output, session_id = await self._read_codex_cli_output(proc.stdout) exit_code = await proc.wait() stderr_text = "" - if proc.stderr is not None: - stderr_text = (await proc.stderr.read()).decode("utf-8", "replace").strip() + if stderr_task is not None: + stderr_text = (await stderr_task).decode("utf-8", "replace").strip() if exit_code == 0: final = output or "Doctor completed via Codex CLI, but no final message was captured." @@ -558,8 +562,12 @@ async def _run_doctor_via_codex_cli( await asyncio.wait_for(proc.wait(), timeout=2.0) except asyncio.TimeoutError: proc.kill() + if stderr_task is not None and not stderr_task.done(): + stderr_task.cancel() raise except Exception as e: + if stderr_task is not None and not stderr_task.done(): + stderr_task.cancel() final = f"🩺 Doctor via Codex CLI errored: {e}" if publish: await self.bus.publish_outbound( @@ -1022,6 +1030,29 @@ async def _process_message( "Codex CLI doctor run failed; falling back to provider diagnostics for {}", key, ) + fallback_notice = ( + "🩺 Codex CLI diagnostics failed; " + "falling back to provider-based diagnostics.\n\n" + f"{codex_final}" + ) + if on_progress is not None: + await on_progress(fallback_notice) + elif msg.channel != "cli": + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=fallback_notice, + metadata={ + **(msg.metadata or {}), + "_progress": True, + "run_id": run_id, + "turn_id": turn_id, + }, + run_id=run_id, + turn_id=turn_id, + ) + ) msg = InboundMessage( channel=msg.channel, diff --git a/snapagent/channels/feishu.py b/snapagent/channels/feishu.py index 9804674..465d551 100644 --- a/snapagent/channels/feishu.py +++ b/snapagent/channels/feishu.py @@ -488,8 +488,8 @@ def _split_headings(self, content: str) -> list[dict]: elements = [] last_end = 0 for m in self._HEADING_RE.finditer(protected): - before = protected[last_end : m.start()].strip() - if before: + before = protected[last_end : m.start()] + if before.strip(): elements.append({"tag": "markdown", "content": before}) text = m.group(2).strip() elements.append( @@ -502,8 +502,8 @@ def _split_headings(self, content: str) -> list[dict]: } ) last_end = m.end() - remaining = protected[last_end:].strip() - if remaining: + remaining = protected[last_end:] + if remaining.strip(): elements.append({"tag": "markdown", "content": remaining}) for i, cb in enumerate(code_blocks): @@ -789,7 +789,8 @@ async def send(self, msg: OutboundMessage) -> None: ) if msg.content and msg.content.strip(): - for chunk in _split_message(msg.content): + chunks = _split_message(msg.content) + for chunk in chunks: card = { "config": {"wide_screen_mode": True}, "elements": self._build_card_elements(chunk), diff --git a/tests/test_doctor_command.py b/tests/test_doctor_command.py index b9c8ad2..8416bff 100644 --- a/tests/test_doctor_command.py +++ b/tests/test_doctor_command.py @@ -280,6 +280,48 @@ async def _fake_spawn(*_args, **_kwargs): assert "codex session: th_saved" in out.content +@pytest.mark.asyncio +async def test_run_doctor_via_codex_cli_returns_stderr_detail_on_failure(monkeypatch): + loop, _bus, _session = _make_loop() + + class _FakeProc: + def __init__(self): + self.stdout = asyncio.StreamReader() + self.stdout.feed_eof() + self.stderr = asyncio.StreamReader() + self.stderr.feed_data(b"auth failed") + self.stderr.feed_eof() + self.returncode = None + + async def wait(self): + self.returncode = 2 + return 2 + + def terminate(self): + self.returncode = -15 + + def kill(self): + self.returncode = -9 + + async def _fake_spawn(*_args, **_kwargs): + return _FakeProc() + + monkeypatch.setattr(asyncio, "create_subprocess_exec", _fake_spawn) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="diag") + final, ok = await loop._run_doctor_via_codex_cli( + msg=msg, + prompt="diag", + run_id="r1", + turn_id="t1", + session_key="test:c1", + publish=False, + ) + + assert ok is False + assert "auth failed" in final + + @pytest.mark.asyncio async def test_process_message_doctor_mode_routes_to_codex_cli(): loop, _bus, session = _make_loop() @@ -299,7 +341,7 @@ async def test_process_message_doctor_mode_routes_to_codex_cli(): @pytest.mark.asyncio async def test_process_message_doctor_mode_falls_back_to_provider_when_codex_fails(): - loop, _bus, session = _make_loop() + loop, bus, session = _make_loop() session.metadata["doctor_mode"] = True loop._doctor_cli_available = MagicMock(return_value=True) loop._run_doctor_via_codex_cli = AsyncMock(return_value=("codex failed", False)) @@ -318,6 +360,38 @@ async def test_process_message_doctor_mode_falls_back_to_provider_when_codex_fai assert result.content == "provider diag" loop._run_doctor_via_codex_cli.assert_awaited_once() loop._run_agent_loop.assert_awaited_once() + notice = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "falling back to provider-based diagnostics" in notice.content + assert "codex failed" in notice.content + assert notice.metadata.get("_progress") is True + assert notice.metadata.get("run_id") + assert notice.metadata.get("turn_id") + + +@pytest.mark.asyncio +async def test_process_message_doctor_mode_fallback_uses_on_progress_callback(): + loop, bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("codex failed", False)) + loop._run_agent_loop = AsyncMock( + return_value=( + "provider diag", + [], + [{"role": "assistant", "content": "provider diag"}], + ) + ) + on_progress = AsyncMock(return_value=None) + + msg = InboundMessage(channel="cli", sender_id="u1", chat_id="c1", content="check logs") + result = await loop._process_message(msg, on_progress=on_progress) + + assert result is not None + assert result.content == "provider diag" + on_progress.assert_awaited() + assert "falling back to provider-based diagnostics" in on_progress.await_args.args[0] + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(bus.consume_outbound(), timeout=0.05) @pytest.mark.asyncio diff --git a/tests/test_feishu_channel.py b/tests/test_feishu_channel.py index 2e46fcc..1333bca 100644 --- a/tests/test_feishu_channel.py +++ b/tests/test_feishu_channel.py @@ -40,6 +40,15 @@ def test_split_message_rejects_non_positive_max_len(): _split_message("abc", max_len=0) +def test_split_headings_preserves_boundary_whitespace(): + channel = _make_channel() + content = "alpha\n\n# Title\n\nbeta" + elements = channel._split_headings(content) + markdown_chunks = [e["content"] for e in elements if e.get("tag") == "markdown"] + assert markdown_chunks[0] == "alpha\n\n" + assert markdown_chunks[-1] == "\n\nbeta" + + @pytest.mark.asyncio async def test_feishu_send_splits_long_content_into_multiple_cards(): channel = _make_channel()