diff --git a/snapagent/agent/loop.py b/snapagent/agent/loop.py index 30a8d42..ef137d4 100644 --- a/snapagent/agent/loop.py +++ b/snapagent/agent/loop.py @@ -3,6 +3,9 @@ from __future__ import annotations import asyncio +import json +import os +import shutil from contextlib import AsyncExitStack from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable @@ -320,11 +323,12 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: task = self._doctor_tasks.get(key) task_status = "running" if task and not task.done() else "idle" mode = "on" if session.metadata.get("doctor_mode") else "off" + codex_session = session.metadata.get("doctor_codex_session_id") or "-" await self.bus.publish_outbound( OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, - content=f"🩺 Doctor status: {task_status} (mode={mode}).", + content=f"🩺 Doctor status: {task_status} (mode={mode}, codex_session={codex_session}).", run_id=run_id, turn_id=turn_id, ) @@ -334,6 +338,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: if action == "cancel": total = await self._cancel_session_tasks(key, msg.chat_id) session.metadata.pop("doctor_mode", None) + session.metadata.pop("doctor_codex_session_id", None) self.sessions.save(session) await self.bus.publish_outbound( OutboundMessage( @@ -348,6 +353,7 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: if action == "resume": session.metadata.pop("doctor_mode", None) + session.metadata.pop("doctor_codex_session_id", None) self.sessions.save(session) await self.bus.publish_outbound( OutboundMessage( @@ -362,7 +368,9 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: note = text[len("/doctor") :].strip() if text.lower().startswith("/doctor") else "" total = await self._cancel_session_tasks(key, msg.chat_id) - guidance = self._doctor_setup_guidance() + session.metadata.pop("doctor_codex_session_id", None) + doctor_cli_available = self._doctor_cli_available() + guidance = None if doctor_cli_available else self._doctor_setup_guidance() if guidance: session.metadata.pop("doctor_mode", None) self.sessions.save(session) @@ -408,8 +416,251 @@ async def _handle_doctor(self, msg: InboundMessage) -> None: ) task = asyncio.create_task(self._dispatch(follow_up)) self._doctor_tasks[key] = task - self._active_tasks.setdefault(follow_up.session_key, []).append(task) - task.add_done_callback(lambda t, k=follow_up.session_key: self._cleanup_task(k, t)) + self._active_tasks.setdefault(key, []).append(task) + task.add_done_callback(lambda t, k=key: self._cleanup_task(k, t)) + + def _doctor_cli_available(self) -> bool: + """Return whether `codex` CLI is available on PATH.""" + return shutil.which("codex") is not None + + def _doctor_codex_model(self) -> str: + """Resolve doctor Codex CLI model from env with a stable default.""" + model = os.environ.get("SNAPAGENT_DOCTOR_CODEX_MODEL", "").strip() + return model or "gpt-5.3-codex" + + def _build_doctor_codex_command( + self, prompt: str, *, resume_session_id: str | None = None + ) -> list[str]: + """Build Codex CLI command for doctor diagnostics.""" + if resume_session_id: + return [ + "codex", + "exec", + "resume", + "--json", + "--skip-git-repo-check", + "--model", + self._doctor_codex_model(), + "-c", + 'approval_policy="never"', + "-c", + 'model_reasoning_effort="high"', + resume_session_id, + prompt, + ] + return [ + "codex", + "exec", + "--json", + "--skip-git-repo-check", + "--model", + self._doctor_codex_model(), + "--sandbox", + "workspace-write", + "--full-auto", + "-c", + 'approval_policy="never"', + "-c", + 'model_reasoning_effort="high"', + prompt, + ] + + async def _run_doctor_via_codex_cli( + self, + *, + msg: InboundMessage, + prompt: str, + run_id: str, + turn_id: str, + session_key: str | None = None, + publish: bool = True, + ) -> tuple[str, bool]: + """Run doctor diagnostics through Codex CLI. + + Returns: + (final_message, success) + """ + resume_session_id = self._get_doctor_codex_session_id(session_key) if session_key else None + cmd = self._build_doctor_codex_command(prompt, resume_session_id=resume_session_id) + proc: asyncio.subprocess.Process | None = None + stderr_task: asyncio.Task[bytes] | None = None + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(self.workspace), + ) + except FileNotFoundError: + final = ( + "🩺 Doctor failed: codex CLI not found on PATH. " + "Please install Codex CLI or use provider-based diagnostics." + ) + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) + ) + return final, False + except Exception as e: + final = f"🩺 Doctor failed to start codex CLI: {e}" + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) + ) + return final, False + + try: + if proc.stderr is not None: + # Drain stderr concurrently to avoid subprocess pipe backpressure deadlocks. + stderr_task = asyncio.create_task(proc.stderr.read()) + output, session_id = await self._read_codex_cli_output(proc.stdout) + exit_code = await proc.wait() + stderr_text = "" + if stderr_task is not None: + stderr_text = (await stderr_task).decode("utf-8", "replace").strip() + + if exit_code == 0: + final = output or "Doctor completed via Codex CLI, but no final message was captured." + else: + detail = stderr_text or output or f"exited with code {exit_code}" + final = f"🩺 Doctor via Codex CLI failed: {detail}" + + if session_id: + if session_key: + self._set_doctor_codex_session_id(session_key, session_id) + final = f"{final}\n\n(codex session: {session_id})" + + success = exit_code == 0 + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) + ) + return final, success + except asyncio.CancelledError: + if proc.returncode is None: + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=2.0) + except asyncio.TimeoutError: + proc.kill() + if stderr_task is not None and not stderr_task.done(): + stderr_task.cancel() + raise + except Exception as e: + if stderr_task is not None and not stderr_task.done(): + stderr_task.cancel() + final = f"🩺 Doctor via Codex CLI errored: {e}" + if publish: + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=final, + run_id=run_id, + turn_id=turn_id, + ) + ) + return final, False + + def _get_doctor_codex_session_id(self, session_key: str) -> str | None: + """Get stored Codex session ID for a doctor-mode chat session.""" + session = self.sessions.get_or_create(session_key) + raw = session.metadata.get("doctor_codex_session_id") + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return None + + def _set_doctor_codex_session_id(self, session_key: str, session_id: str) -> None: + """Persist Codex session ID for doctor-mode resume calls.""" + if not session_id.strip(): + return + session = self.sessions.get_or_create(session_key) + session.metadata["doctor_codex_session_id"] = session_id.strip() + self.sessions.save(session) + + async def _read_codex_cli_output( + self, stream: asyncio.StreamReader | None + ) -> tuple[str | None, str | None]: + """Read Codex `--json` stdout and return (assistant_message, session_id).""" + if stream is None: + return None, None + + last_message: str | None = None + session_id: str | None = None + + while True: + line = await stream.readline() + if not line: + break + + text = line.decode("utf-8", "replace").strip() + if not text: + continue + + try: + event = json.loads(text) + except json.JSONDecodeError: + continue + + if event.get("type") == "thread.started": + thread_id = event.get("thread_id") + if isinstance(thread_id, str) and thread_id.strip(): + session_id = thread_id.strip() + continue + + if event.get("type") == "item.completed" and isinstance(event.get("item"), dict): + item = event["item"] + msg = "" + if item.get("type") == "message": + content_parts = item.get("content", []) + texts = [ + p.get("text", "") + for p in content_parts + if isinstance(p, dict) and p.get("type") == "output_text" + ] + msg = "\n".join(texts).strip() + elif item.get("type") == "agent_message": + msg = str(item.get("text", "")).strip() + if msg: + last_message = msg + continue + + if ( + event.get("type") == "response_item" + and isinstance(event.get("payload"), dict) + and event["payload"].get("type") == "message" + and event["payload"].get("role") == "assistant" + ): + content_parts = event["payload"].get("content", []) + texts = [ + p.get("text", "") + for p in content_parts + if isinstance(p, dict) and p.get("type") == "output_text" + ] + msg = "\n".join(texts).strip() + if msg: + last_message = msg + + return last_message, session_id async def _cancel_session_tasks(self, session_key: str, chat_id: str) -> int: """Cancel all active tasks, doctor task, and subagents for one session.""" @@ -747,16 +998,67 @@ async def _process_message( turn_id=turn_id, ) elif session.metadata.get("doctor_mode") and not cmd.startswith("/"): + doctor_prompt = ( + "[Doctor Mode] Diagnose issues using evidence first. " + "Use doctor_check with check=health/status/logs/events as needed. " + "Cite observed evidence and then propose next actions.\n\n" + + msg.content + ) + if self._doctor_cli_available(): + codex_final, codex_ok = await self._run_doctor_via_codex_cli( + msg=msg, + prompt=doctor_prompt, + run_id=run_id, + turn_id=turn_id, + session_key=key, + publish=False, + ) + if codex_ok: + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=codex_final, + metadata={ + **(msg.metadata or {}), + "run_id": run_id, + "turn_id": turn_id, + }, + run_id=run_id, + turn_id=turn_id, + ) + logger.warning( + "Codex CLI doctor run failed; falling back to provider diagnostics for {}", + key, + ) + fallback_notice = ( + "🩺 Codex CLI diagnostics failed; " + "falling back to provider-based diagnostics.\n\n" + f"{codex_final}" + ) + if on_progress is not None: + await on_progress(fallback_notice) + elif msg.channel != "cli": + await self.bus.publish_outbound( + OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=fallback_notice, + metadata={ + **(msg.metadata or {}), + "_progress": True, + "run_id": run_id, + "turn_id": turn_id, + }, + run_id=run_id, + turn_id=turn_id, + ) + ) + msg = InboundMessage( channel=msg.channel, sender_id=msg.sender_id, chat_id=msg.chat_id, - content=( - "[Doctor Mode] Diagnose issues using evidence first. " - "Use doctor_check with check=health/status/logs/events as needed. " - "Cite observed evidence and then propose next actions.\n\n" - + msg.content - ), + content=doctor_prompt, timestamp=msg.timestamp, media=msg.media, metadata=msg.metadata, diff --git a/snapagent/channels/feishu.py b/snapagent/channels/feishu.py index 67708e2..465d551 100644 --- a/snapagent/channels/feishu.py +++ b/snapagent/channels/feishu.py @@ -46,6 +46,53 @@ "sticker": "[sticker]", } +FEISHU_CARD_CHUNK_MAX_LEN = 2000 + + +def _split_message(content: str, max_len: int = FEISHU_CARD_CHUNK_MAX_LEN) -> list[str]: + """Split content into chunks within max_len, preferring paragraph and line breaks.""" + if not content: + return [] + if max_len <= 0: + raise ValueError("max_len must be greater than 0") + if len(content) <= max_len: + return [content] + + chunks: list[str] = [] + remaining = content + while remaining: + if len(remaining) <= max_len: + chunks.append(remaining) + break + + cut = remaining[:max_len] + sep_len = 0 + pos = cut.rfind("\n\n") + if pos > 0: + sep_len = 2 + if pos <= 0: + pos = cut.rfind("\n") + if pos > 0: + sep_len = 1 + if pos <= 0: + pos = cut.rfind(" ") + if pos > 0: + sep_len = 1 + if pos <= 0: + pos = max_len + sep_len = 0 + else: + pos += sep_len + + chunk = remaining[:pos] + if not chunk: + pos = max_len + chunk = remaining[:pos] + + chunks.append(chunk) + remaining = remaining[pos:] + return chunks + def _extract_share_card_content(content_json: dict, msg_type: str) -> str: """Extract text representation from share cards and interactive messages.""" @@ -441,8 +488,8 @@ def _split_headings(self, content: str) -> list[dict]: elements = [] last_end = 0 for m in self._HEADING_RE.finditer(protected): - before = protected[last_end : m.start()].strip() - if before: + before = protected[last_end : m.start()] + if before.strip(): elements.append({"tag": "markdown", "content": before}) text = m.group(2).strip() elements.append( @@ -455,8 +502,8 @@ def _split_headings(self, content: str) -> list[dict]: } ) last_end = m.end() - remaining = protected[last_end:].strip() - if remaining: + remaining = protected[last_end:] + if remaining.strip(): elements.append({"tag": "markdown", "content": remaining}) for i, cb in enumerate(code_blocks): @@ -742,18 +789,20 @@ async def send(self, msg: OutboundMessage) -> None: ) if msg.content and msg.content.strip(): - card = { - "config": {"wide_screen_mode": True}, - "elements": self._build_card_elements(msg.content), - } - await loop.run_in_executor( - None, - self._send_message_sync, - receive_id_type, - msg.chat_id, - "interactive", - json.dumps(card, ensure_ascii=False), - ) + chunks = _split_message(msg.content) + for chunk in chunks: + card = { + "config": {"wide_screen_mode": True}, + "elements": self._build_card_elements(chunk), + } + await loop.run_in_executor( + None, + self._send_message_sync, + receive_id_type, + msg.chat_id, + "interactive", + json.dumps(card, ensure_ascii=False), + ) except Exception as e: logger.error("Error sending Feishu message: {}", e) diff --git a/tests/test_doctor_command.py b/tests/test_doctor_command.py index ce6cfb4..8416bff 100644 --- a/tests/test_doctor_command.py +++ b/tests/test_doctor_command.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import json from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -63,6 +64,38 @@ async def slow_task(): assert "doctor mode" in out.content.lower() +@pytest.mark.asyncio +async def test_doctor_start_prefers_codex_cli_when_available(): + loop, bus, session = _make_loop() + loop._doctor_setup_guidance = MagicMock(return_value=None) + loop._doctor_cli_available = MagicMock(return_value=True) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") + await loop._handle_doctor(msg) + await asyncio.sleep(0) + + assert session.metadata.get("doctor_mode") is True + loop._dispatch.assert_awaited_once() + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "doctor mode" in out.content.lower() + + +@pytest.mark.asyncio +async def test_doctor_start_falls_back_when_codex_cli_unavailable(): + loop, _bus, session = _make_loop() + loop._doctor_setup_guidance = MagicMock(return_value=None) + loop._doctor_cli_available = MagicMock(return_value=False) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=None) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") + await loop._handle_doctor(msg) + await asyncio.sleep(0) + + assert session.metadata.get("doctor_mode") is True + loop._run_doctor_via_codex_cli.assert_not_awaited() + assert loop._dispatch.await_count == 1 + + @pytest.mark.asyncio async def test_doctor_status_reports_idle(): loop, bus, _session = _make_loop() @@ -94,6 +127,7 @@ async def test_doctor_cancel_disables_mode(): async def test_doctor_start_shows_setup_guidance_when_provider_not_ready(): loop, bus, session = _make_loop() loop._doctor_setup_guidance = MagicMock(return_value="setup guide") + loop._doctor_cli_available = MagicMock(return_value=False) msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") await loop._handle_doctor(msg) @@ -105,6 +139,21 @@ async def test_doctor_start_shows_setup_guidance_when_provider_not_ready(): loop._dispatch.assert_not_called() +@pytest.mark.asyncio +async def test_doctor_start_skips_setup_guidance_when_codex_cli_available(): + loop, _bus, session = _make_loop() + loop._doctor_setup_guidance = MagicMock(return_value="setup guide") + loop._doctor_cli_available = MagicMock(return_value=True) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="/doctor") + await loop._handle_doctor(msg) + await asyncio.sleep(0) + + assert session.metadata.get("doctor_mode") is True + loop._doctor_setup_guidance.assert_not_called() + loop._dispatch.assert_awaited_once() + + @pytest.mark.asyncio async def test_help_includes_doctor_commands(): loop, _bus, _session = _make_loop() @@ -132,3 +181,233 @@ async def test_run_does_not_route_doctor_typo_to_doctor_handler(): loop._handle_doctor.assert_not_awaited() assert loop._dispatch.await_count == 1 + + +@pytest.mark.asyncio +async def test_read_codex_cli_output_parses_session_and_message(): + loop, _bus, _session = _make_loop() + reader = asyncio.StreamReader() + reader.feed_data( + ( + json.dumps({"type": "thread.started", "thread_id": "th_123"}) + "\n" + + json.dumps( + { + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "diag ok"}], + }, + } + ) + + "\n" + ).encode("utf-8") + ) + reader.feed_eof() + + output, session_id = await loop._read_codex_cli_output(reader) + assert output == "diag ok" + assert session_id == "th_123" + + +def test_build_doctor_codex_command_with_resume_session(): + loop, _bus, _session = _make_loop() + loop._doctor_codex_model = MagicMock(return_value="gpt-5.3-codex") + + cmd = loop._build_doctor_codex_command( + "check status", + resume_session_id="th_abc", + ) + assert cmd[:4] == ["codex", "exec", "resume", "--json"] + assert "th_abc" in cmd + assert cmd[-1] == "check status" + + +@pytest.mark.asyncio +async def test_run_doctor_via_codex_cli_persists_session_id(monkeypatch): + loop, bus, session = _make_loop() + + class _FakeProc: + def __init__(self): + self.stdout = asyncio.StreamReader() + self.stdout.feed_data( + ( + json.dumps({"type": "thread.started", "thread_id": "th_saved"}) + "\n" + + json.dumps( + { + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "done"}], + }, + } + ) + + "\n" + ).encode("utf-8") + ) + self.stdout.feed_eof() + self.stderr = asyncio.StreamReader() + self.stderr.feed_eof() + self.returncode = None + + async def wait(self): + self.returncode = 0 + return 0 + + def terminate(self): + self.returncode = -15 + + def kill(self): + self.returncode = -9 + + async def _fake_spawn(*_args, **_kwargs): + return _FakeProc() + + monkeypatch.setattr(asyncio, "create_subprocess_exec", _fake_spawn) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="diag") + await loop._run_doctor_via_codex_cli( + msg=msg, + prompt="diag", + run_id="r1", + turn_id="t1", + session_key="test:c1", + ) + + assert session.metadata.get("doctor_codex_session_id") == "th_saved" + out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "codex session: th_saved" in out.content + + +@pytest.mark.asyncio +async def test_run_doctor_via_codex_cli_returns_stderr_detail_on_failure(monkeypatch): + loop, _bus, _session = _make_loop() + + class _FakeProc: + def __init__(self): + self.stdout = asyncio.StreamReader() + self.stdout.feed_eof() + self.stderr = asyncio.StreamReader() + self.stderr.feed_data(b"auth failed") + self.stderr.feed_eof() + self.returncode = None + + async def wait(self): + self.returncode = 2 + return 2 + + def terminate(self): + self.returncode = -15 + + def kill(self): + self.returncode = -9 + + async def _fake_spawn(*_args, **_kwargs): + return _FakeProc() + + monkeypatch.setattr(asyncio, "create_subprocess_exec", _fake_spawn) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="diag") + final, ok = await loop._run_doctor_via_codex_cli( + msg=msg, + prompt="diag", + run_id="r1", + turn_id="t1", + session_key="test:c1", + publish=False, + ) + + assert ok is False + assert "auth failed" in final + + +@pytest.mark.asyncio +async def test_process_message_doctor_mode_routes_to_codex_cli(): + loop, _bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("diag ok", True)) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="check logs") + result = await loop._process_message(msg) + + assert result is not None + assert result.content == "diag ok" + assert result.channel == "test" + assert result.chat_id == "c1" + loop._run_doctor_via_codex_cli.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_process_message_doctor_mode_falls_back_to_provider_when_codex_fails(): + loop, bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("codex failed", False)) + loop._run_agent_loop = AsyncMock( + return_value=( + "provider diag", + [], + [{"role": "assistant", "content": "provider diag"}], + ) + ) + + msg = InboundMessage(channel="test", sender_id="u1", chat_id="c1", content="check logs") + result = await loop._process_message(msg) + + assert result is not None + assert result.content == "provider diag" + loop._run_doctor_via_codex_cli.assert_awaited_once() + loop._run_agent_loop.assert_awaited_once() + notice = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + assert "falling back to provider-based diagnostics" in notice.content + assert "codex failed" in notice.content + assert notice.metadata.get("_progress") is True + assert notice.metadata.get("run_id") + assert notice.metadata.get("turn_id") + + +@pytest.mark.asyncio +async def test_process_message_doctor_mode_fallback_uses_on_progress_callback(): + loop, bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("codex failed", False)) + loop._run_agent_loop = AsyncMock( + return_value=( + "provider diag", + [], + [{"role": "assistant", "content": "provider diag"}], + ) + ) + on_progress = AsyncMock(return_value=None) + + msg = InboundMessage(channel="cli", sender_id="u1", chat_id="c1", content="check logs") + result = await loop._process_message(msg, on_progress=on_progress) + + assert result is not None + assert result.content == "provider diag" + on_progress.assert_awaited() + assert "falling back to provider-based diagnostics" in on_progress.await_args.args[0] + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(bus.consume_outbound(), timeout=0.05) + + +@pytest.mark.asyncio +async def test_process_direct_returns_doctor_cli_output_without_empty_fallback(): + loop, _bus, session = _make_loop() + session.metadata["doctor_mode"] = True + loop._doctor_cli_available = MagicMock(return_value=True) + loop._run_doctor_via_codex_cli = AsyncMock(return_value=("diag via cli", True)) + loop._connect_mcp = AsyncMock(return_value=None) + + response = await loop.process_direct( + "check logs", + session_key="test:c1", + channel="cli", + chat_id="c1", + ) + + assert response == "diag via cli" + loop._run_doctor_via_codex_cli.assert_awaited_once() diff --git a/tests/test_feishu_channel.py b/tests/test_feishu_channel.py index a175d43..1333bca 100644 --- a/tests/test_feishu_channel.py +++ b/tests/test_feishu_channel.py @@ -1,10 +1,14 @@ +"""Feishu channel send/chunk behavior tests.""" + +from __future__ import annotations + import json import pytest from snapagent.bus.events import OutboundMessage from snapagent.bus.queue import MessageBus -from snapagent.channels.feishu import FeishuChannel +from snapagent.channels.feishu import FeishuChannel, _split_message from snapagent.config.schema import FeishuConfig @@ -19,6 +23,54 @@ def _make_channel(*, workspace=None) -> FeishuChannel: return channel +def test_split_message_force_splits_when_no_separator(): + content = "x" * 23 + chunks = _split_message(content, max_len=8) + assert chunks == ["x" * 8, "x" * 8, "x" * 7] + + +def test_split_message_preserves_whitespace_and_newlines(): + content = "line1\n\nline2 with space \nline3" + chunks = _split_message(content, max_len=10) + assert "".join(chunks) == content + + +def test_split_message_rejects_non_positive_max_len(): + with pytest.raises(ValueError): + _split_message("abc", max_len=0) + + +def test_split_headings_preserves_boundary_whitespace(): + channel = _make_channel() + content = "alpha\n\n# Title\n\nbeta" + elements = channel._split_headings(content) + markdown_chunks = [e["content"] for e in elements if e.get("tag") == "markdown"] + assert markdown_chunks[0] == "alpha\n\n" + assert markdown_chunks[-1] == "\n\nbeta" + + +@pytest.mark.asyncio +async def test_feishu_send_splits_long_content_into_multiple_cards(): + channel = _make_channel() + calls: list[dict] = [] + + def _fake_send(_receive_id_type: str, _receive_id: str, msg_type: str, content: str) -> bool: + calls.append({"msg_type": msg_type, "content": content}) + return True + + channel._send_message_sync = _fake_send # type: ignore[method-assign] + long_content = "a" * 9000 + await channel.send(OutboundMessage(channel="feishu", chat_id="ou_test", content=long_content)) + + chunks = _split_message(long_content) + assert len(calls) == len(chunks) + assert all(c["msg_type"] == "interactive" for c in calls) + assert all(len(chunk) <= 2000 for chunk in chunks) + for i, call in enumerate(calls): + payload = json.loads(call["content"]) + assert payload["elements"] == [{"tag": "markdown", "content": chunks[i]}] + + @pytest.mark.asyncio async def test_send_resolves_relative_media_path_from_workspace(tmp_path) -> None: workspace = tmp_path / "workspace"