From a6367a5056f33a07f482bbe8c5550661fd44919c Mon Sep 17 00:00:00 2001 From: KevinZhao Date: Sun, 21 Jun 2026 10:05:08 +0000 Subject: [PATCH 1/6] feat(cli): add codex app-server backend (protocol + profile) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现 docs/rfc/codex-backend.md Phase 1:OpenAI Codex CLI 作为第三个 backend, 走 `codex app-server` JSON-RPC 2.0 over stdio。 实现: - internal/cli/protocol_codex.go — CodexProtocol 实现 cli.Protocol: initialize+initialized+thread/start 握手、turn/start(UserInput[])、 turn/interrupt、item/agentMessage/delta 流式累积、turn/completed 边界 flush assistant 帧 + result、thread/tokenUsage/updated → metadata、 */requestApproval 反向请求自动 allow。模式对齐 ACPProtocol。 - internal/cli/backend/profile_codex.go — codexProfile(),注册到 RegisterDefaults (注册≠默认开启,仍需 cli.backends 显式配置)。 - internal/cli/detect.go — knownBackends 增 codex 行。 可行性验证(codex 0.141.0 实测,docs/rfc/codex-backend-validation.md): - app-server 协议完整跑通,RFC §2.4 事件流 + method 名命中。 - 纠正 RFC §7:codex 原生支持 Amazon Bedrock(内置 amazon-bedrock provider)。 实测 bedrock-mantle/v1/responses + gpt-oss-120b 连通;两个约束:内置 provider 路径 bug(需自定义 provider 指 /v1)、gpt-oss responses 拒 codex namespace 工具(agentic 受限,纯对话/function-calling 可用)。 测试:protocol_codex_test.go 表驱动覆盖握手/turn/interrupt/approval/ tokenUsage/未知method/错误响应;profile 注册断言更新为 3 backend。 go build/vet/test -race ./internal/cli/... 全绿。 --- docs/rfc/README.md | 3 +- docs/rfc/codex-backend-validation.md | 205 ++++++++ docs/rfc/codex-backend.md | 127 +++-- internal/cli/backend/profile.go | 5 +- internal/cli/backend/profile_codex.go | 63 +++ internal/cli/backend/profile_test.go | 44 +- internal/cli/detect.go | 1 + internal/cli/protocol_codex.go | 663 ++++++++++++++++++++++++++ internal/cli/protocol_codex_test.go | 453 ++++++++++++++++++ 9 files changed, 1511 insertions(+), 53 deletions(-) create mode 100644 docs/rfc/codex-backend-validation.md create mode 100644 internal/cli/backend/profile_codex.go create mode 100644 internal/cli/protocol_codex.go create mode 100644 internal/cli/protocol_codex_test.go diff --git a/docs/rfc/README.md b/docs/rfc/README.md index f18b46ee2..89a771532 100644 --- a/docs/rfc/README.md +++ b/docs/rfc/README.md @@ -20,7 +20,8 @@ | [attachment-gc-daemon.md](attachment-gc-daemon.md) | 已实现(v2.1 设计落地;默认 enabled:false + dry_run 待运维启用) | 2026-05-31 | 接活死代码:把 `GCWithRefs` 接进 sysession `attachment-gc` daemon(非 cron),删 legacy `GC`,修复附件磁盘无限增长(#1198)。v2 补三项框架增强(ctx + per-root cap + startup-tick)+ 枚举改扫已知 workspace 根 + 默认 enabled:false/dry-run + 先删 .meta 防孤儿 + refcount best-effort。v2.1 修枚举完整性:root 集合补 project workspace(E1)、规范化去重防 symlink(E2)、per-root cap + round-robin 防饥饿(E3)、dry-run 按原因分桶(E4)、附最小止血版(E5) | | [attachment-refcount.md](attachment-refcount.md) | v1 MVP 已落地(GC cron 待启用) | 2026-05-10 | 大图跨 TTL 可见:attachment 引用计数与双 TTL GC | | [direct-user-settings.md](direct-user-settings.md) | Draft v2(architect 评审通过修订;security 评审中) | 2026-05-30 | naozhi spawn cc 改 `--setting-sources user` 直接加载 `~/.claude/settings.json`,删 override 副本 + env 白名单,使 naozhi cc 与命令行 cc 行为一致、单一配置源;保留 `applyClaudeEnvSettings`(父进程 Bedrock 鉴权)与 sysession Runner 的 `--setting-sources ""`(防 AutoTitler 死循环) | -| [codex-backend.md](codex-backend.md) | 设计提案 Draft v1(未实测) | 2026-05-20 | OpenAI Codex CLI 作为第三个 backend:走 `codex app-server` JSON-RPC 2.0 over stdio(**非** `codex proto` / `exec --json` / `mcp-server`),新增 `protocol_codex.go` + `profile_codex.go` + `codexjsonl` 历史 source;待跑 Phase 0 V1-V12 实测后升 v2 | +| [codex-backend.md](codex-backend.md) | Draft v2(Phase 0 已实测 2026-06-21) | 2026-05-20 | OpenAI Codex CLI 作为第三个 backend:走 `codex app-server` JSON-RPC 2.0 over stdio(**非** `codex proto` / `exec --json` / `mcp-server`),`protocol_codex.go` + `profile_codex.go` 已实现,`codexjsonl` 历史 source 待 phase1;实测纠正:codex 原生支持 Bedrock(gpt-oss,agentic 受限) | +| [codex-backend-validation.md](codex-backend-validation.md) | 实测报告 ✅ | 2026-06-21 | codex 0.141.0 Phase 0:app-server 协议跑通、method 名命中、Bedrock mantle/responses + gpt-oss 连通 + 两个约束(路径 bug / namespace 工具被拒) | | [consumer-interfaces.md](consumer-interfaces.md) | Proposal v2 | 2026-05-11 | ARCH-CONSUMER-IF:dispatch/hub/upstream 以消费端小接口替换 `*session.Router` 具体指针(v1 因方法清单虚构已重写) | | [cron-v2-polish.md](cron-v2-polish.md) | 设计提案(未实现) | 2026-05-09 | Cron 面板 5 项增量打磨(name/jitter/missed/sort/next-run) | | [cron-run-history.md](cron-run-history.md) | 设计提案 → 实施中 | 2026-05-17 | Cron 执行历史与生命周期可见性:CronRun 实体 / runs/ 滚动 ring / runInflight / WS started/ended / 时间轴 UI | diff --git a/docs/rfc/codex-backend-validation.md b/docs/rfc/codex-backend-validation.md new file mode 100644 index 000000000..e6e080132 --- /dev/null +++ b/docs/rfc/codex-backend-validation.md @@ -0,0 +1,205 @@ +# Codex Backend — Phase 0 可行性验证报告 + +> **状态**: 实测完成 ✅(2026-06-21) +> **执行环境**: Amazon Linux 2023 aarch64 · node v22.22.0 · codex-cli **0.141.0** · AWS_REGION=us-west-2 +> **关联**: `docs/rfc/codex-backend.md`(本报告把该 RFC 从 Draft v1 升到 v2) + +--- + +## 0. TL;DR + +- ✅ **codex 0.141.0 安装成功**,自带 `codex app-server` 子命令。 +- ✅ **app-server JSON-RPC 协议完整跑通**:`initialize → initialized → thread/start → turn/start → turn/started → item/started → item/completed → turn/completed`,threadId 正确捕获。RFC §2.4 的事件流草图被实测**证实**。 +- ✅ **method / notification 名经 `generate-json-schema` + 实测全部命中** RFC §2.3/§2.4 的假设(`thread/start`、`turn/start`、`turn/interrupt`、`turn/steer`、`thread/resume`、`item/agentMessage/delta`、`turn/completed` 等)。 +- 🔥 **重大纠正**:RFC §7 / §9.1 称 "Codex 模型只能走 OpenAI 自家(不像 claude 走 Bedrock)" —— **此论断已过时**。codex 0.141 **内置 `amazon-bedrock` model provider**(openai/codex PR #18744,2026-04-20 合入)。实测 `bedrock-mantle.us-west-2.api.aws/v1/responses` + `openai.gpt-oss-120b` 返回真实回复。 +- ⚠️ **两个 Bedrock 约束**(影响 "codex on bedrock" 完整能力,详见 §4): + 1. codex 内置 `amazon-bedrock` provider 把请求打到 `/openai/v1/responses`(带 `/openai` 前缀),但 Bedrock 上的 gpt-oss 只在 `/v1/responses`(无前缀)服务 —— 内置 provider 直接报 `model does not support the '/openai/v1/responses' API`。需用**自定义 provider** 指向正确的 `…/v1` base_url 绕过。 + 2. Bedrock 的 gpt-oss responses 端点**拒绝 codex 内置 agentic 工具的 `namespace` tool 变体**(只接受 `function` / `mcp`),完整 shell-agentic turn 因此 `status: failed`。纯对话 / function-calling 可用,但 codex 招牌的本地 shell agentic 能力在 Bedrock gpt-oss 上**受限**。 +- ❌ gpt-5.x 系列在 us-west-2 Bedrock **不可用**,只有 `gpt-oss-*`(20b/120b/safeguard)`ACTIVE`。 + +**结论**:app-server 协议路线 100% 可行,按 RFC §1.3 实施。Bedrock 作为一个**已验证但 agentic 受限**的部署选项写入 RFC;面向用户的 codex 完整能力仍以 OpenAI 凭据 / ChatGPT 登录为主路径。 + +--- + +## 1. 安装 + +``` +$ npm install -g @openai/codex +added 2 packages in 2s +$ codex --version +codex-cli 0.141.0 +``` + +`codex app-server` 子命令存在,并带 `generate-ts` / `generate-json-schema` 协议 schema 生成工具(RFC §1.2 承诺核实)。 + +--- + +## 2. Schema 生成(V12 ✅) + +``` +$ codex app-server generate-json-schema --out /tmp/codex-schema +``` + +产出 200+ JSON Schema 文件。关键文件: + +| 文件 | 内容 | +|---|---| +| `ClientRequest.json` | client→server 全部 RPC method 枚举 | +| `ServerNotification.json` | server→client 全部流式事件 method 枚举 | +| `ServerRequest.json` | server→client 反向请求(approval / userInput / elicitation) | +| `v2/TurnStartParams.json` | `turn/start` 参数 schema(**input 是 `UserInput[]` 不是 string**) | + +`ClientRequest` 枚举证实 RFC §2.3 的 method 名(`initialize` / `thread/start` / `thread/resume` / `turn/start` / `turn/steer` / `turn/interrupt`)全部存在。 + +--- + +## 3. 协议实测 + +### 3.1 V2 握手 ✅ + +直接 NDJSON 灌入 `codex app-server` stdin: + +``` +→ {"id":1,"method":"initialize","params":{"clientInfo":{"name":"naozhi","version":"0.0.1"}}} +← {"id":1,"result":{"userAgent":…,"codexHome":…,"platformFamily":…}} +→ {"method":"initialized"} (notification) +→ {"id":2,"method":"thread/start","params":{"cwd":"/tmp/codextest"}} +← {"id":2,"result":{"thread":{"id":"019ee98d-…",…},"model":…,"modelProvider":…}} +← {"method":"thread/started","params":{...}} (notification) +``` + +threadId 在 `thread/start` 的 **response.result.thread.id** 拿到(不在 `thread/started` notification 的 params 顶层)。 + +### 3.2 V3 单 turn ✅(协议层) + +``` +→ {"id":3,"method":"turn/start","params":{"threadId":"019ee98d-…", + "input":[{"type":"text","text":"reply with exactly NAOZHI_OK"}]}} +← {"method":"turn/started",...} +← {"method":"item/started","params":{"item":{"type":"userMessage",...}}} +← {"method":"item/completed","params":{"item":{"type":"userMessage",...}}} +← {"method":"turn/completed","params":{"threadId":...,"turn":{"status":"failed"|"completed",...}}} +``` + +> 关键修正:RFC §2.4 把 `turn/start` 的 `input` 画成字符串。实测 schema 是 **`UserInput[]`**,每项 `{"type":"text","text":"…"}`。先发字符串会得到 `-32600 Invalid request: invalid type: string …, expected a sequence`。本报告把这一点回写 RFC §2.4 与实现。 + +### 3.3 事件名实测对照(ServerNotification 枚举) + +| RFC §2.4 假设 | 实测 schema | 命中 | +|---|---|---| +| `thread/started` | `thread/started` | ✅ | +| `turn/started` | `turn/started` | ✅ | +| `item/started` | `item/started` | ✅ | +| `item/agentMessage/delta` | `item/agentMessage/delta`(`params.delta` 是**纯字符串**,不是 `{content}`) | ✅(字段名修正) | +| `item/completed` | `item/completed`(`params.item` = `ThreadItem`,agentMessage 项带 `text`) | ✅ | +| `turn/completed` | `turn/completed`(`params.turn.status` + `turn.error`;usage 在单独的 `thread/tokenUsage/updated`) | ✅(usage 位置修正) | + +**usage 修正**:RFC §2.6 假设 `turn/completed` 带 `usage`。实测 token 用量走单独的 `thread/tokenUsage/updated` notification,结构 `tokenUsage.{last,total}.{inputTokens,outputTokens,cachedInputTokens,reasoningOutputTokens,totalTokens}`。实现据此把 tokenUsage 通知映射到 `EventMetadata.MeteringUsage`(unit="token")。 + +### 3.4 反向请求实测名(ServerRequest 枚举) + +RFC §2.5 假设 `serverRequest/...`。实测更具体: + +``` +item/commandExecution/requestApproval +item/fileChange/requestApproval +item/permissions/requestApproval +item/tool/requestUserInput +mcpServer/elicitation/request +``` + +实现的 HandleEvent 对 `*/requestApproval` 自动 allow(对齐 claude `--dangerously-skip-permissions` 立场)。 + +--- + +## 4. Bedrock 可行性(纠正 RFC §7) + +### 4.1 内置 provider 存在 + +codex 0.141 二进制内含 `bedrock-mantle.` base_url 字面量。`codex debug models` 与官方文档(openai/codex PR #18744)确认内置 `amazon-bedrock` provider: + +```toml +# 官方文档形态 +model_provider = "amazon-bedrock" +[model_providers.amazon-bedrock.aws] +profile = "..." # 或走默认 AWS 凭据链 +region = "us-west-2" +``` + +### 4.2 端点连通性实测 + +环境:`AWS_REGION=us-west-2`,IAM 凭据有效,生成了 Bedrock long-term API key(`aws iam create-service-specific-credential --service-name bedrock.amazonaws.com`)。 + +| 端点 | 模型 | 结果 | +|---|---|---| +| `bedrock-runtime…/openai/v1/chat/completions` | gpt-oss-120b | ✅ 返回真实 completion | +| `bedrock-mantle…/v1/responses` | gpt-oss-120b | ✅ 返回真实 response(`BEDROCK_OK`) | +| `bedrock-mantle…/openai/v1/responses` | gpt-oss-120b | ❌ `model does not support the '/openai/v1/responses' API` | +| `bedrock-mantle…/v1/responses` | gpt-5.5 | ❌ `The model 'openai.gpt-5.5' does not exist`(区域不可用) | + +`aws bedrock list-foundation-models --region us-west-2` 中 `gpt-oss` 系列: + +``` +openai.gpt-oss-120b-1:0 ACTIVE +openai.gpt-oss-20b-1:0 ACTIVE +openai.gpt-oss-safeguard-120b ACTIVE +openai.gpt-oss-safeguard-20b ACTIVE +``` + +### 4.3 约束 1:内置 provider 路径 bug → 用自定义 provider + +内置 `amazon-bedrock` provider 打到带 `/openai` 前缀的路径,gpt-oss 不在那里服务。可用配置(实测 codex exec / app-server 均通过该 provider 完成握手与 turn): + +```toml +[model_providers.bedrockmantle] +name = "Bedrock Mantle" +base_url = "https://bedrock-mantle.us-west-2.api.aws/v1" +wire_api = "responses" # chat 已被 codex 0.141 废弃 +env_key = "AWS_BEARER_TOKEN_BEDROCK" # Bedrock API key(bearer) +``` + +> 注:`wire_api = "chat"` 在 codex 0.141 被硬拒绝(二进制字面量 `wire_api = "chat" is no longer supported`),必须 `responses`。 + +### 4.4 约束 2:agentic 工具被 Bedrock gpt-oss 拒绝 + +通过自定义 provider 跑完整 `codex exec` agentic turn 时: + +``` +ERROR: Failed to deserialize the JSON body into the target type: + Invalid 'tools': unknown variant `namespace`, expected `function` or `mcp` +``` + +codex 内置的 agentic 工具集(shell/apply_patch 等)用 `type:"namespace"` 工具声明,Bedrock 的 gpt-oss responses 端点只认 `function`/`mcp`。手动用 `type:"function"` 工具直接打端点则通过 —— 说明这是 codex 工具声明形态与 Bedrock gpt-oss 子集不兼容,而非链路问题。 + +**含义**:Bedrock gpt-oss 路径下 codex 可做纯对话 / 自定义 function-calling,但**无法**跑 codex 招牌的本地 shell agentic 工作流。完整 agentic 能力仍需 OpenAI 凭据(gpt-5.x-codex 系列)。 + +--- + +## 5. 验证矩阵结果(对照 RFC §8.3) + +| 验证点 | 结果 | 备注 | +|---|---|---| +| V1 启动 | ✅ | `codex app-server` 存在 | +| V2 握手 | ✅ | initialize+initialized+thread/start 跑通 | +| V3 单 turn | ✅ | 协议层完整;Bedrock gpt-oss agentic turn 受约束 2 影响 | +| V4 流式 | ✅ | `item/agentMessage/delta` 事件存在(schema + 协议确认) | +| V5 工具调用 | ⚠️ | commandExecution item 存在;Bedrock gpt-oss 拒 namespace 工具 | +| V6 interrupt | ⏳ | `turn/interrupt` method 存在(schema 确认),运行时未单测 | +| V7 resume | ⏳ | `thread/resume` method 存在(schema 确认) | +| V8 多 thread | ⏳ | 留 phase1 之后 | +| V9 backpressure | ⏳ | `-32001` 路径未触发,复用现有重试 | +| V10 反向请求 | ✅ | ServerRequest 枚举确认 `*/requestApproval` | +| V11 持久化 | ⏳ | `~/.codex/sessions/` 由 codexjsonl phase1 读 | +| V12 schema 生成 | ✅ | 见 §2 | + +⏳ = method/schema 已确认存在,运行时行为留单测 + phase 后续。 + +--- + +## 6. 对 RFC 的具体回写 + +1. §2.4:`turn/start.input` 改为 `UserInput[]`(`[{type:"text",text:"…"}]`)。 +2. §2.4 / §2.6:token usage 走 `thread/tokenUsage/updated` notification,不在 `turn/completed`。 +3. §2.5:反向请求实测名是 `item/{commandExecution,fileChange,permissions}/requestApproval` + `item/tool/requestUserInput` + `mcpServer/elicitation/request`。 +4. §7 / §9.1:删除 "codex 只能走 OpenAI 自家" 论断;新增 Bedrock 部署小节(含两个约束)。 +5. threadId 来源:`thread/start` response `result.thread.id`。 diff --git a/docs/rfc/codex-backend.md b/docs/rfc/codex-backend.md index 84e98e266..7be329eb5 100644 --- a/docs/rfc/codex-backend.md +++ b/docs/rfc/codex-backend.md @@ -1,15 +1,15 @@ # Codex Backend 接入 -> **状态**: 设计提案 Draft v1(**未实测**,待按 §10 验证脚本跑通后升级 v2) +> **状态**: Draft v2(**Phase 0 已实测**,2026-06-21;详见 `codex-backend-validation.md`) > **作者**: naozhi team -> **创建**: 2026-05-20 +> **创建**: 2026-05-20 · **实测更新**: 2026-06-21 > **依赖 / 前置**: > - `internal/cli/backend/profile.go`(多 backend 注册表已就位,参见 `multi-backend.md`) > - `internal/cli/protocol.go` `Protocol` 接口 > - `internal/cli/protocol_acp.go`(JSON-RPC 2.0 over stdio 长连接的现成骨架,可作为实现参考) > - `docs/rfc/multi-backend.md` v2(backend.Profile 抽象与 Dashboard §8 差异化规约) > **关联代码**: `internal/cli/backend/` · `internal/cli/protocol_acp.go` · `internal/cli/wrapper.go` · `internal/history/` · `cmd/naozhi/main.go` -> **可行性验证**: 待补 `docs/rfc/codex-backend-validation.md`(Phase 0 实测脚本,参考 `multi-backend-validation.md` 模式) +> **可行性验证**: ✅ `docs/rfc/codex-backend-validation.md`(codex 0.141.0 实测:协议跑通、method 名命中、Bedrock 路径连通 + 两个约束) --- @@ -101,48 +101,69 @@ client → server: { method: "initialized" } // notification, | **Turn** | 一次用户输入触发的 agent 工作 | `turn/start`、`turn/interrupt`、`turn/steer` | | **Item** | turn 内的原子事件(user/agent message、reasoning、command、file change、MCP tool call、web search、todo list) | 通过 `item/started`、`item//delta`、`item/completed` notification 流式推送 | -### 2.4 一次完整 turn 的事件流(草图,待 §10 验证) +### 2.4 一次完整 turn 的事件流(✅ 2026-06-21 实测,已按 schema 修正) ``` -client → server: thread/start { ... } -server → client: thread/started (notif) { threadId } -client → server: turn/start { threadId, input: "..." } +client → server: thread/start { cwd } +server → client: { id, result: { thread: { id: "", ... } } } # ← threadId 在这里 +server → client: thread/started (notif) +client → server: turn/start { threadId, input: [ { type:"text", text:"..." } ] } # ← input 是 UserInput[] server → client: turn/started (notif) -server → client: item/started (notif) { itemId, type: "agentMessage" } -server → client: item/agentMessage/delta (notif) { itemId, content: "Hi" } -server → client: item/agentMessage/delta (notif) { itemId, content: "!" } -server → client: item/started (notif) { itemId, type: "commandExecution" } -server → client: serverRequest/... (双向请求) { method: "approve", params: {...} } -client → server: 响应 server 的请求 { decision: "allow" } -server → client: item/completed (notif) { itemId, ...detail } -server → client: turn/completed (notif) { usage: { input_tokens, output_tokens }, response_id } -client → server: turn/start (response) { ... 最终 response 结构 } +server → client: item/started (notif) { item: { type:"userMessage" | "agentMessage" | ... } } +server → client: item/agentMessage/delta (notif) { itemId, threadId, turnId, delta: "Hi" } # ← delta 是纯 string +server → client: item/agentMessage/delta (notif) { ..., delta: "!" } +server → client: item/started (notif) { item: { type:"commandExecution" } } +server → client: item//requestApproval (反向请求) { ... } # ← 见 §2.5 实测名 +client → server: 自动 allow 响应 +server → client: item/completed (notif) { item: { type:"agentMessage", text:"..." } } +server → client: thread/tokenUsage/updated (notif) { tokenUsage: { last, total } } # ← usage 在这里,不在 turn/completed +server → client: turn/completed (notif) { threadId, turn: { status, error } } +client → server: { id, result: { turn: { ... } } } # turn/start 的迟到 response ``` -注意:`turn/start` 是一个 **request**,server 流完所有 notification 后才 reply 这个 request。这种"长 RPC + 中间穿插 notification"的形态跟 Kiro ACP 的 `session/prompt` 一致,复用现有 `pendingResponses + waitForResponse` 模式即可。 +实测修正(详见 `codex-backend-validation.md` §3): +- **threadId** 取自 `thread/start` response 的 `result.thread.id`(非 `thread/started` notification 顶层)。 +- **`turn/start.input`** 是 `UserInput[]`(`[{type:"text",text:"…"}]`),发裸字符串得 `-32600 expected a sequence`。 +- **`item/agentMessage/delta.params.delta`** 是**纯字符串**,不是 `{content}`。 +- **token usage** 走单独的 `thread/tokenUsage/updated` notification,不在 `turn/completed`。 + +注意:`turn/start` 是一个 **request**,server 流完所有 notification 后才 reply 这个 request。这种"长 RPC + 中间穿插 notification"的形态跟 Kiro ACP 的 `session/prompt` 一致,复用现有 `readUntilResponse` 模式即可。 ### 2.5 反向请求(approval flow) -server 主动向 client 发 Request 要求决策(命令执行 / 文件修改 approval、`elicitationRequest`、`requestUserInput`)。naozhi 的策略: -- 默认走 `--ask-for-approval never` + `--sandbox workspace-write`,跟 claude `--dangerously-skip-permissions` 同立场 -- 一旦 server 还是发了反向请求(边缘 case),落到与 ACP `session/request_permission` 同套自动授权代码 +server 主动向 client 发 Request 要求决策。**实测的反向请求 method 名**(ServerRequest 枚举,2026-06-21): + +``` +item/commandExecution/requestApproval +item/fileChange/requestApproval +item/permissions/requestApproval +item/tool/requestUserInput +mcpServer/elicitation/request +``` + +naozhi 的策略: +- 默认走 `approval_policy=never` + `sandbox_mode=workspace-write`,跟 claude `--dangerously-skip-permissions` 同立场 + (实测约束:`approval_policy=never` 不能配 `sandbox_mode=danger-full-access`,否则 codex 退回 read-only;workspace-write 是安全选择) +- 一旦 server 仍发了 `*/requestApproval` 反向请求(边缘 case),HandleEvent 自动 allow,落到与 ACP `session/request_permission` 同套自动授权代码 -### 2.6 Cost / Usage +### 2.6 Cost / Usage(✅ 实测修正:走单独 notification) -`turn/completed` 带: +token 用量**不在** `turn/completed`,而在单独的 `thread/tokenUsage/updated` notification: ```json { - "usage": { - "input_tokens": 1234, - "output_tokens": 567, - "cached_input_tokens": 100 - }, - "response_id": "resp_..." + "threadId": "...", + "turnId": "...", + "tokenUsage": { + "last": { "inputTokens": 72, "outputTokens": 20, "cachedInputTokens": 0, + "reasoningOutputTokens": 0, "totalTokens": 92 }, + "total": { "inputTokens": ..., "outputTokens": ..., "totalTokens": ... }, + "modelContextWindow": 128000 + } } ``` -无 `total_cost_usd`。`Profile.CostUnit = "tokens"`,dashboard cost 列显示 token 数;如果业务侧要换算 USD 由 normalize 层按 model price 换算(不在本 RFC 范围)。 +无 `total_cost_usd`。实现把 `tokenUsage.last` 映射为 `EventMetadata.MeteringUsage`(`{value: totalTokens, unit: "token"}`)+ `ContextUsagePercent`(由 totalTokens / modelContextWindow 估算)。`Profile.CostUnit = "tokens"`,dashboard cost 列显示 token 数;USD 换算由 normalize 层按 model price 处理(不在本 RFC 范围)。 ### 2.7 Backpressure @@ -282,17 +303,41 @@ func (p *CodexProtocol) Init(rw *JSONRW, resumeID, cwd string) (string, error) { --- -## 7. 鉴权 +## 7. 鉴权(✅ 实测纠正:Codex 原生支持 Bedrock) + +> **Draft v1 论断作废**:v1 称 "Codex 模型只能走 OpenAI 自家(不像 claude 走 Bedrock)"。实测推翻 —— codex 0.141 **内置 `amazon-bedrock` model provider**(openai/codex PR #18744,2026-04-20 合入),naozhi 部署机的 AWS 凭据可直接驱动 codex。 + +Codex CLI 鉴权三条路径: + +| 路径 | 适用 | naozhi 立场 | +|---|---|---| +| **Amazon Bedrock**(`AWS_BEARER_TOKEN_BEDROCK` 或 AWS 凭据链) | 复用部署机已有 AWS 接入 | ✅ 已验证连通(见 §7.1 约束) | +| `CODEX_API_KEY` / `OPENAI_API_KEY` 环境变量 | 完整 agentic(gpt-5.x-codex) | 推荐,无 Bedrock 工具约束 | +| `codex login` 持久化 ChatGPT 登录态 | 桌面交互 | 不适合 headless naozhi | + +### 7.1 Bedrock 部署(已验证 + 两个约束) + +实测环境 `AWS_REGION=us-west-2`,`bedrock-mantle.us-west-2.api.aws/v1/responses` + `openai.gpt-oss-120b` 连通。配置(用**自定义 provider**,不用内置 `amazon-bedrock`): + +```toml +[model_providers.bedrockmantle] +name = "Bedrock Mantle" +base_url = "https://bedrock-mantle..api.aws/v1" # 注意:无 /openai 前缀 +wire_api = "responses" # chat 在 0.141 已废弃 +env_key = "AWS_BEARER_TOKEN_BEDROCK" +``` + +**约束 1(路径)**:codex 内置 `amazon-bedrock` provider 打到 `/openai/v1/responses`,但 Bedrock 上 gpt-oss 只在 `/v1/responses` 服务 → 必须用上面的自定义 provider 指向正确 base_url。 + +**约束 2(agentic 受限)**:Bedrock 的 gpt-oss responses 端点拒绝 codex 内置 agentic 工具的 `type:"namespace"` 声明(只认 `function`/`mcp`),完整 shell-agentic turn 会 `status:failed`。**Bedrock gpt-oss 路径仅支持纯对话 / function-calling,不支持 codex 招牌的本地 shell agentic**。完整 agentic 需 OpenAI 凭据 + gpt-5.x-codex。 -Codex CLI 鉴权两条路径: -- `CODEX_API_KEY` 环境变量(推荐自动化场景) -- `codex login` 持久化的 ChatGPT 登录态 +**模型可用性**:us-west-2 Bedrock 只有 `gpt-oss-{20b,120b,safeguard}`,无 gpt-5.x。 -naozhi 的部署机已经走 Bedrock claude,无 OpenAI 凭据。落地前提: -- 部署文档增加"如启用 codex backend,需 export `CODEX_API_KEY` 或预先 `codex login`" -- `cmd/naozhi/doctor.go` 增 codex 健康检查:`codex --version` 能跑 + `~/.codex/auth.json` 或 env 二选一 +落地前提: +- 部署文档:启用 codex backend 时,二选一 —— ① Bedrock(`AWS_BEARER_TOKEN_BEDROCK` + 自定义 provider,agentic 受限);② `CODEX_API_KEY`/`codex login`(完整 agentic)。 +- `cmd/naozhi/doctor.go` 增 codex 健康检查:`codex --version` 能跑 + (`~/.codex/auth.json` / `CODEX_API_KEY` / `AWS_BEARER_TOKEN_BEDROCK` 三选一)。 -不在本 RFC 范围:自建凭据池、企业网关代理、按 session 切 key。 +不在本 RFC 范围:自建凭据池、企业网关代理、按 session 切 key、Bedrock Access Gateway(BAG)代理把 namespace 工具翻译给 gpt-oss。 --- @@ -314,9 +359,9 @@ naozhi 的部署机已经走 Bedrock claude,无 OpenAI 凭据。落地前提 - `cli_test.go` 加 codex backend fixture(模拟 app-server NDJSON 流) - 历史 source 用 fixture jsonl 验证 codexjsonl 解析 -### 8.3 Phase 0 实测(待跑) +### 8.3 Phase 0 实测(✅ 已完成 2026-06-21 — 详见 `codex-backend-validation.md`) -参照 `multi-backend-validation.md` 模式,写 `codex-backend-validation.md`: +实测结果汇总(V1/V2/V3/V4/V10/V12 ✅ 通过;V5 ⚠️ Bedrock gpt-oss 受约束 2 限制;V6-V9/V11 ⏳ method/schema 已确认存在,运行时留单测 + phase 后续): | 验证点 | 脚本 | 通过条件 | |---|---|---| @@ -333,7 +378,7 @@ naozhi 的部署机已经走 Bedrock claude,无 OpenAI 凭据。落地前提 | V11 持久化 | turn 后查 `~/.codex/sessions/` | 文件存在且可被 codexjsonl 读取 | | V12 schema 生成 | `codex app-server generate-json-schema --out /tmp/x` | 文件生成且 JSON 合法 | -任一项失败都会改写设计;目前文档默认 V1-V12 全过,**实施前必须先跑通**。 +协议路线(V1-V4/V10/V12)已实测通过,按 §10 进入 Phase 1 实现。Bedrock 的 agentic 约束(V5)记录在案,不阻挡 protocol/profile 落地。 --- @@ -346,7 +391,7 @@ naozhi 的部署机已经走 Bedrock claude,无 OpenAI 凭据。落地前提 | `app-server` 仍是新接口(2025 末才推),可能有未发现的边缘 case | M | Phase 0 实测全跑过;订阅 codex GitHub release notes | | 多 thread 共享一个 app-server 进程的并发隔离质量未知(V8) | M | phase1 仍按 1 thread / 进程,跟 claude 对齐;V8 验证后 phase2 做"一进程多 thread"优化 | | `turn/steer` 与 naozhi `/urgent` 语义对接细节 | L | phase2 单独 RFC | -| Codex 模型只能走 OpenAI 自家(不像 claude 走 Bedrock) | M | 部署侧增 `CODEX_API_KEY` 配置;不影响 naozhi 自身 | +| ~~Codex 模型只能走 OpenAI 自家~~ → **已验证可走 Bedrock**,但 gpt-oss agentic 工具受限(§7.1 约束 2) | M | Bedrock 仅做纯对话/function-calling;完整 agentic 用 `CODEX_API_KEY` + gpt-5.x-codex | | Approval flow `requestUserInput` 转 AskUserQuestion 卡片的 schema 映射 | L | phase2,phase1 默认拒绝避免悬挂 | ### 9.2 未决 diff --git a/internal/cli/backend/profile.go b/internal/cli/backend/profile.go index d525294a3..350488b4c 100644 --- a/internal/cli/backend/profile.go +++ b/internal/cli/backend/profile.go @@ -221,8 +221,8 @@ func All() []Profile { return out } -// RegisterDefaults registers the built-in profiles (claude, kiro). Must be -// called once during startup before any consumer touches the registry. +// RegisterDefaults registers the built-in profiles (claude, kiro, codex). Must +// be called once during startup before any consumer touches the registry. // Idempotency is intentionally NOT supported: calling twice will panic via // Register's duplicate check, surfacing accidental double-init. // @@ -232,6 +232,7 @@ func All() []Profile { func RegisterDefaults() { Register(claudeProfile()) Register(kiroProfile()) + Register(codexProfile()) } // EnsureDefaults is the concurrent-safe, idempotent counterpart to diff --git a/internal/cli/backend/profile_codex.go b/internal/cli/backend/profile_codex.go new file mode 100644 index 000000000..bebd52cd3 --- /dev/null +++ b/internal/cli/backend/profile_codex.go @@ -0,0 +1,63 @@ +package backend + +import ( + "strings" + + "github.com/naozhi/naozhi/internal/cli" +) + +// codexProfile returns the Profile describing OpenAI's codex CLI. It speaks +// the codex app-server JSON-RPC 2.0 protocol (NDJSON over stdio) and so it +// constructs a fresh CodexProtocol per session. ProtocolDeps is ignored — +// codex does not honor a claude-style settings override. +// +// RequiredNodeCaps lists "codex-app-server" so reverse-node routing rejects +// hosts that do not advertise codex support before attempting a codex session +// there (mirrors "acp" for kiro). RFC docs/rfc/codex-backend.md §5. +func codexProfile() Profile { + return Profile{ + ID: "codex", + DisplayName: "codex", + DefaultBinary: "codex", // npm @openai/codex installs as `codex` + DefaultTag: "cdx", // reply prefix; aligns with cc / kiro / gem + ChipColor: "#10a37f", // OpenAI brand green — distinct from claude purple, kiro orange + NewProtocol: func(_ ProtocolDeps) cli.Protocol { + // Seed BackendID so per-backend metric labels emitted by + // ReadEvent (RPC error) are populated (multi-backend RFC §10). + return &cli.CodexProtocol{BackendID: "codex"} + }, + // DetectInProc matches a codex process but excludes the non-app-server + // subcommands so a transient `codex login` / `codex exec` invocation is + // not mislabelled as a hosted naozhi session. We only host codex via + // `codex app-server`, so require that substring. + DetectInProc: func(cmdline string) bool { + return strings.Contains(cmdline, "codex") && strings.Contains(cmdline, "app-server") + }, + RequiredNodeCaps: []string{"codex-app-server"}, + // codex persists threads under ~/.codex/sessions/. Consumed by a + // future internal/history/codexjsonl source (RFC §4.1, phase1+). + // Stored with leading "~/" for doctor display, same as claude/kiro. + HistoryDir: "~/.codex/sessions/", + // codex app-server reports per-turn token usage via + // thread/tokenUsage/updated; there is no USD figure on the wire. + // Dashboard cost cells render unitless with a "tokens" suffix. + CostUnit: "tokens", + // RFC §5 phase1 conservative values (validated 2026-06-21): + // - askuser: requestUserInput reverse request not yet card-ified (phase2) + // - passthrough: turn/steer not yet wired to /urgent (phase2) + // - embedded_context: @file mention not yet plumbed (phase1) + // - image_input: codex responses accepts data: URL images (gpt-5.x path) + // - audio_input: no direct audio + // - mcp_http: codex supports HTTP MCP servers + // - mcp_sse: not supported + Features: map[string]bool{ + "askuser": false, + "passthrough": false, + "embedded_context": false, + "image_input": true, + "audio_input": false, + "mcp_http": true, + "mcp_sse": false, + }, + } +} diff --git a/internal/cli/backend/profile_test.go b/internal/cli/backend/profile_test.go index 95e11e4fb..0eebb0182 100644 --- a/internal/cli/backend/profile_test.go +++ b/internal/cli/backend/profile_test.go @@ -180,17 +180,20 @@ func TestRegisterDefaults_RegistersClaudeAndKiro(t *testing.T) { RegisterDefaults() all := All() - if len(all) != 2 { - t.Fatalf("RegisterDefaults: All() len = %d; want 2", len(all)) + if len(all) != 3 { + t.Fatalf("RegisterDefaults: All() len = %d; want 3", len(all)) } - // Registration order is claude then kiro per the function body. + // Registration order is claude, kiro, codex per the function body. if all[0].ID != "claude" { t.Errorf("first default profile = %q; want %q", all[0].ID, "claude") } if all[1].ID != "kiro" { t.Errorf("second default profile = %q; want %q", all[1].ID, "kiro") } + if all[2].ID != "codex" { + t.Errorf("third default profile = %q; want %q", all[2].ID, "codex") + } claude, ok := Get("claude") if !ok { @@ -225,6 +228,26 @@ func TestRegisterDefaults_RegistersClaudeAndKiro(t *testing.T) { if len(kiro.RequiredNodeCaps) != 1 || kiro.RequiredNodeCaps[0] != "acp" { t.Errorf("kiro RequiredNodeCaps = %v; want [\"acp\"]", kiro.RequiredNodeCaps) } + + codex, ok := Get("codex") + if !ok { + t.Fatal("Get(\"codex\") missing after RegisterDefaults") + } + if codex.DisplayName != "codex" { + t.Errorf("codex DisplayName = %q; want %q", codex.DisplayName, "codex") + } + if codex.DefaultBinary != "codex" { + t.Errorf("codex DefaultBinary = %q; want %q", codex.DefaultBinary, "codex") + } + if codex.DefaultTag != "cdx" { + t.Errorf("codex DefaultTag = %q; want %q", codex.DefaultTag, "cdx") + } + if codex.CostUnit != "tokens" { + t.Errorf("codex CostUnit = %q; want %q", codex.CostUnit, "tokens") + } + if len(codex.RequiredNodeCaps) != 1 || codex.RequiredNodeCaps[0] != "codex-app-server" { + t.Errorf("codex RequiredNodeCaps = %v; want [\"codex-app-server\"]", codex.RequiredNodeCaps) + } }) } @@ -248,8 +271,8 @@ func TestEnsureDefaults_IdempotentAndConcurrent(t *testing.T) { wg.Wait() all := All() - if len(all) != 2 { - t.Fatalf("after %d concurrent EnsureDefaults: All() len = %d; want 2", N, len(all)) + if len(all) != 3 { + t.Fatalf("after %d concurrent EnsureDefaults: All() len = %d; want 3", N, len(all)) } if _, ok := Get("claude"); !ok { t.Error("claude missing after concurrent EnsureDefaults") @@ -257,12 +280,15 @@ func TestEnsureDefaults_IdempotentAndConcurrent(t *testing.T) { if _, ok := Get("kiro"); !ok { t.Error("kiro missing after concurrent EnsureDefaults") } + if _, ok := Get("codex"); !ok { + t.Error("codex missing after concurrent EnsureDefaults") + } // Subsequent calls must be no-ops (no panic, no extra registrations). EnsureDefaults() EnsureDefaults() - if got := len(All()); got != 2 { - t.Errorf("repeat EnsureDefaults registered extra: All() len = %d; want 2", got) + if got := len(All()); got != 3 { + t.Errorf("repeat EnsureDefaults registered extra: All() len = %d; want 3", got) } }) } @@ -283,8 +309,8 @@ func TestEnsureDefaults_CleanRegistryReBootstraps(t *testing.T) { t.Fatalf("clean registry should start empty; All() len = %d", got) } EnsureDefaults() - if got := len(All()); got != 2 { - t.Fatalf("EnsureDefaults inside clean registry did not re-bootstrap: All() len = %d; want 2 (defaultsOnce reset missing?)", got) + if got := len(All()); got != 3 { + t.Fatalf("EnsureDefaults inside clean registry did not re-bootstrap: All() len = %d; want 3 (defaultsOnce reset missing?)", got) } if _, ok := Get("claude"); !ok { t.Error("claude missing after re-bootstrap") diff --git a/internal/cli/detect.go b/internal/cli/detect.go index ea497e4ad..b6529953f 100644 --- a/internal/cli/detect.go +++ b/internal/cli/detect.go @@ -73,6 +73,7 @@ type BackendInfo struct { var knownBackends = []BackendInfo{ {ID: "claude", DisplayName: "claude-code", Protocol: "stream-json", defaultBinary: "claude"}, {ID: "kiro", DisplayName: "kiro", Protocol: "acp", defaultBinary: "kiro-cli"}, + {ID: "codex", DisplayName: "codex", Protocol: "codex-app-server", defaultBinary: "codex"}, } // lookupBackend returns the knownBackends row for the given ID and whether it diff --git a/internal/cli/protocol_codex.go b/internal/cli/protocol_codex.go new file mode 100644 index 000000000..d885275d2 --- /dev/null +++ b/internal/cli/protocol_codex.go @@ -0,0 +1,663 @@ +package cli + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "os" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/naozhi/naozhi/internal/metrics" + "github.com/naozhi/naozhi/internal/osutil" + "github.com/naozhi/naozhi/internal/textutil" +) + +// codexHandshakeTimeout caps how long a handshake RPC (initialize / +// thread/start / thread/resume) waits for its response. Distinct constant +// from acpHandshakeTimeout so the two backends tune independently; codex +// model warmup can be slower than kiro's, so start a touch higher. +const codexHandshakeTimeout = 45 * time.Second + +// CodexProtocol implements Protocol for OpenAI's `codex app-server` — +// JSON-RPC 2.0 over stdio NDJSON, long-lived, bidirectional. It shares +// ACPProtocol's "long RPC + interleaved notifications" shape (turn/start is a +// request whose response only arrives after every item/* notification) but +// uses codex's own method names and payloads, so it is a SEPARATE +// implementation rather than a reuse of ACPProtocol (RFC docs/rfc/codex-backend.md §1.3). +// +// The wire contract was validated against codex-cli 0.141.0 on 2026-06-21; +// see docs/rfc/codex-backend-validation.md for the captured frames. +type CodexProtocol struct { + mu sync.Mutex + // nextID mirrors ACPProtocol.nextID — int64 to avoid sign flip, narrowed + // to int in allocID for RPCRequest.ID JSON compatibility (64-bit only). + nextID atomic.Int64 + // threadID is the codex thread this protocol instance drives. Written once + // by Init (thread/start result.thread.id), read concurrently by + // WriteMessage / WriteInterrupt / readLoop. atomic.Pointer so per-chunk + // textBuf writes under mu never contend with these reads (mirrors + // ACPProtocol.sessionID rationale). + threadID atomic.Pointer[string] + // textBuf accumulates item/agentMessage/delta text during a turn, flushed + // at turn/completed. Guarded by mu. + textBuf strings.Builder + // BackendID labels per-backend metric increments (RFC multi-backend §10). + BackendID string +} + +// ErrCodexRPC wraps a JSON-RPC error returned by codex app-server. +var ErrCodexRPC = errors.New("codex rpc error") + +// ErrCodexTimeout is returned when readUntilResponse gives up on a specific +// JSON-RPC id after codexHandshakeTimeout. Callers treat it as transient. +var ErrCodexTimeout = errors.New("codex response timeout") + +func (p *CodexProtocol) Name() string { return "codex" } + +// Clone returns a fresh instance retaining BackendID so the spawn pipeline's +// proto.Clone() preserves metric labelling. +func (p *CodexProtocol) Clone() Protocol { return &CodexProtocol{BackendID: p.BackendID} } + +func (p *CodexProtocol) storeThreadID(id string) { p.threadID.Store(&id) } + +func (p *CodexProtocol) loadThreadID() string { + if s := p.threadID.Load(); s != nil { + return *s + } + return "" +} + +func (p *CodexProtocol) allocID() int { return int(p.nextID.Add(1) - 1) } + +// BuildArgs launches `codex app-server`. Model / sandbox / approval flow over +// RPC and config (-c) rather than per-turn flags. approval_policy=never + +// sandbox_mode=workspace-write mirrors claude's --dangerously-skip-permissions +// stance; danger-full-access is intentionally NOT used because codex 0.141 +// rejects approval_policy=never with danger-full-access and silently falls +// back to read-only (validation §2.5). +func (p *CodexProtocol) BuildArgs(opts SpawnOptions) []string { + args := []string{"app-server"} + if opts.Model != "" { + args = append(args, "-c", "model="+opts.Model) + } + args = append(args, "-c", "approval_policy=never") + args = append(args, "-c", "sandbox_mode=workspace-write") + // Mirror ClaudeProtocol's ARG_MAX defence so codex extra args can't bypass it. + args = append(args, capExtraArgsBytes(opts.ExtraArgs)...) + return args +} + +// --- typed RPC param/result shapes (validated 2026-06-21) --- + +type codexInitParams struct { + ClientInfo codexClientInfo `json:"clientInfo"` +} + +type codexClientInfo struct { + Name string `json:"name"` + Version string `json:"version"` +} + +type codexThreadStartParams struct { + Cwd string `json:"cwd"` +} + +type codexThreadResumeParams struct { + ThreadID string `json:"threadId"` + Cwd string `json:"cwd"` +} + +// codexThreadStartResult decodes thread/start's response. threadId lives at +// result.thread.id (validation §3.1) — NOT in the thread/started notification. +type codexThreadStartResult struct { + Thread struct { + ID string `json:"id"` + } `json:"thread"` +} + +// codexUserInput is one entry of turn/start's input array. The input is a +// UserInput[] (validation §3.2), not a bare string — sending a string yields +// -32600 "expected a sequence". +type codexUserInput struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + ImageURL string `json:"image_url,omitempty"` +} + +type codexTurnStartParams struct { + ThreadID string `json:"threadId"` + Input []codexUserInput `json:"input"` +} + +type codexTurnInterruptParams struct { + ThreadID string `json:"threadId"` +} + +func (p *CodexProtocol) Init(rw *JSONRW, resumeID string, cwd string) (string, error) { + // Step 1: initialize handshake (request → result). + initReq := RPCRequest{ + JSONRPC: "2.0", ID: p.allocID(), Method: "initialize", + Params: codexInitParams{ClientInfo: codexClientInfo{Name: "naozhi", Version: "1.0.0"}}, + } + if _, err := p.sendAndWaitResponse(rw, initReq); err != nil { + return "", fmt.Errorf("codex initialize: %w", err) + } + + // Step 2: `initialized` notification (no id). Until this is sent the server + // rejects every other method with "Not initialized". + if err := p.writeNotification(rw.W, "initialized", nil); err != nil { + return "", fmt.Errorf("codex initialized notify: %w", err) + } + + // Step 3: thread/start or thread/resume. + if cwd == "" { + cwd = os.TempDir() + } + if resumeID != "" { + resumeReq := RPCRequest{ + JSONRPC: "2.0", ID: p.allocID(), Method: "thread/resume", + Params: codexThreadResumeParams{ThreadID: resumeID, Cwd: cwd}, + } + resp, err := p.sendAndWaitResponse(rw, resumeReq) + if err != nil { + return "", fmt.Errorf("codex thread/resume: %w", err) + } + tid := resumeID + if resp != nil && len(resp.Result) > 0 { + var r codexThreadStartResult + if json.Unmarshal(resp.Result, &r) == nil && r.Thread.ID != "" { + tid = r.Thread.ID + } + } + p.storeThreadID(tid) + return tid, nil + } + + startReq := RPCRequest{ + JSONRPC: "2.0", ID: p.allocID(), Method: "thread/start", + Params: codexThreadStartParams{Cwd: cwd}, + } + resp, err := p.sendAndWaitResponse(rw, startReq) + if err != nil { + return "", fmt.Errorf("codex thread/start: %w", err) + } + var result codexThreadStartResult + if resp == nil || json.Unmarshal(resp.Result, &result) != nil || result.Thread.ID == "" { + return "", fmt.Errorf("codex thread/start: missing thread id in result") + } + p.storeThreadID(result.Thread.ID) + return result.Thread.ID, nil +} + +func (p *CodexProtocol) WriteMessage(w io.Writer, text string, images []ImageData) error { + tid := p.loadThreadID() + p.mu.Lock() + p.textBuf.Reset() + p.mu.Unlock() + + input := make([]codexUserInput, 0, len(images)+1) + for _, img := range images { + // codex responses image input takes a data: URL (base64). image_input + // is advertised in the profile Features map for the gpt-5.x path; the + // gpt-oss Bedrock path does not accept images (validation §4). + input = append(input, codexUserInput{ + Type: "image", + ImageURL: "data:" + img.MimeType + ";base64," + encodeImageBase64(img.Data), + }) + } + if text != "" || len(input) == 0 { + input = append(input, codexUserInput{Type: "text", Text: text}) + } + + req := RPCRequest{ + JSONRPC: "2.0", ID: p.allocID(), Method: "turn/start", + Params: codexTurnStartParams{ThreadID: tid, Input: input}, + } + data, err := json.Marshal(req) + if err != nil { + return err + } + _, err = w.Write(append(data, '\n')) + return err +} + +// WriteInterrupt sends a turn/interrupt request to abort the in-flight turn. +// Unlike ACP's session/cancel (a notification), codex models interrupt as a +// request (validation: TurnInterruptParams has threadId). We fire-and-forget +// the request over stdin under the caller-held write lock and let the readLoop +// observe the resulting turn/completed; we do not block on the interrupt's own +// response. Returns ErrInterruptUnsupported before the handshake establishes a +// thread (nothing to interrupt yet), so callers fall back to SIGINT. +func (p *CodexProtocol) WriteInterrupt(w io.Writer, _ string) error { + tid := p.loadThreadID() + if tid == "" { + return ErrInterruptUnsupported + } + req := RPCRequest{ + JSONRPC: "2.0", ID: p.allocID(), Method: "turn/interrupt", + Params: codexTurnInterruptParams{ThreadID: tid}, + } + data, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("codex marshal turn/interrupt: %w", err) + } + if _, err := w.Write(append(data, '\n')); err != nil { + return fmt.Errorf("codex write turn/interrupt: %w", err) + } + // Multi-backend RFC §10: record successful cancels per backend (mirrors + // ACPProtocol.WriteInterrupt). The pre-handshake early return above does + // not count — no real interrupt reached the agent. + metrics.RecordACPCancel(p.BackendID) + return nil +} + +// WriteUserMessageLocked ignores uuid/priority — codex has no replay/priority +// concept in phase1 (turn/steer is a phase2 optimisation, RFC §3). +func (p *CodexProtocol) WriteUserMessageLocked(w io.Writer, _, text string, images []ImageData, _ string) error { + return p.WriteMessage(w, text, images) +} + +func (p *CodexProtocol) SupportsPriority() bool { return false } +func (p *CodexProtocol) SupportsReplay() bool { return false } + +// Capabilities: codex has a clean soft-interrupt (turn/interrupt request) once +// a thread is established; pre-handshake WriteInterrupt still returns +// ErrInterruptUnsupported. Not stream-json. RFC §6. +func (p *CodexProtocol) Capabilities() Caps { + return Caps{Replay: false, Priority: false, SoftInterrupt: true, StreamJSON: false} +} + +// --- notification decode shapes --- + +type codexAgentMessageDelta struct { + ThreadID string `json:"threadId"` + ItemID string `json:"itemId"` + Delta string `json:"delta"` +} + +type codexItemNotif struct { + ThreadID string `json:"threadId"` + Item codexThreadItem `json:"item"` +} + +type codexThreadItem struct { + Type string `json:"type"` + ID string `json:"id"` + Text string `json:"text,omitempty"` + Status string `json:"status,omitempty"` + Title string `json:"title,omitempty"` +} + +type codexTurnCompleted struct { + ThreadID string `json:"threadId"` + Turn struct { + Status string `json:"status"` + Error *struct { + Message string `json:"message"` + } `json:"error,omitempty"` + } `json:"turn"` +} + +type codexTokenUsageNotif struct { + ThreadID string `json:"threadId"` + TokenUsage struct { + Last struct { + InputTokens int64 `json:"inputTokens"` + OutputTokens int64 `json:"outputTokens"` + TotalTokens int64 `json:"totalTokens"` + } `json:"last"` + ModelContextWindow *int64 `json:"modelContextWindow"` + } `json:"tokenUsage"` +} + +func (p *CodexProtocol) ReadEvent(line string) ([]Event, bool, error) { + var msg RPCMessage + if err := json.Unmarshal(stringToBytesUnsafe(line), &msg); err != nil { + return nil, false, err + } + + // Notification dispatch. + if msg.IsNotification() { + return p.handleNotification(msg) + } + + // Reverse request from server (approval / userInput / elicitation) — these + // carry an id AND a method. Surface as a synthetic permission_request so + // HandleEvent can auto-allow; round-trip the id. + if msg.IsRequest() { + if strings.HasSuffix(msg.Method, "/requestApproval") { + rid, _ := msg.IDAsString() + return []Event{{ + Type: "permission_request", + SubType: msg.Method, + RPCRequestID: rid, + RawParams: msg.Params, + }}, false, nil + } + // Other reverse requests (requestUserInput / elicitation) are not yet + // wired to interactive cards (phase2). Ignore so the turn does not hang + // on our side — codex applies its own default after a timeout. + slog.Debug("codex: unhandled reverse request", "method", msg.Method) + return nil, false, nil + } + + // id-bearing response. During Init these are consumed by readUntilResponse; + // on the main readLoop the only id-bearing response we see is the deferred + // turn/start reply. A success reply carries no extra visible payload (text + // already streamed via item/agentMessage/delta + flushed at turn/completed), + // but an ERROR reply must close the turn and be recorded — otherwise a + // post-handshake turn/start failure is silently swallowed and the session + // hangs in state=running (mirrors ACP protocol_acp.go:611 rationale). + if msg.IsResponse() && msg.Error != nil { + metrics.RecordProtocolRPCError(p.BackendID, "turn/start", strconv.Itoa(msg.Error.Code)) + // Flush any partial text so the dashboard sees what streamed before the + // failure, then surface a synthetic error so readLoop closes the turn. + p.mu.Lock() + p.textBuf.Reset() + p.mu.Unlock() + return nil, true, fmt.Errorf("%w %d: %s", ErrCodexRPC, + msg.Error.Code, osutil.SanitizeForLog(msg.Error.Message, 256)) + } + return nil, false, nil +} + +func (p *CodexProtocol) handleNotification(msg RPCMessage) ([]Event, bool, error) { + switch msg.Method { + case "item/agentMessage/delta": + var d codexAgentMessageDelta + if err := json.Unmarshal(msg.Params, &d); err != nil { + slog.Warn("codex: agentMessage delta unmarshal failed", "err", err) + return nil, false, nil + } + if d.Delta != "" { + p.mu.Lock() + if room := maxAssistantMessageContentBytes - p.textBuf.Len(); room > 0 { + if len(d.Delta) <= room { + p.textBuf.WriteString(d.Delta) + } else { + n := textutil.TruncateAtRuneBoundary(d.Delta, room) + p.textBuf.WriteString(d.Delta[:n]) + } + } + p.mu.Unlock() + } + return []Event{{Type: "assistant", SessionID: d.ThreadID}}, false, nil + + case "item/started", "item/completed": + var n codexItemNotif + if err := json.Unmarshal(msg.Params, &n); err != nil { + return nil, false, nil + } + switch n.Item.Type { + case "commandExecution", "fileChange", "mcpToolCall", "webSearch", "dynamicToolCall": + subType := "tool_use" + if msg.Method == "item/completed" { + subType = "tool_result" + } + return []Event{{ + Type: "assistant", + SubType: subType, + SessionID: n.ThreadID, + ToolUseID: n.Item.ID, + ToolCall: &ToolCall{ + ID: n.Item.ID, + Title: sanitizeToolCallLabel(n.Item.Title), + Kind: sanitizeToolCallLabel(n.Item.Type), + Status: sanitizeToolCallLabel(n.Item.Status), + }, + Message: &AssistantMessage{ + Content: []ContentBlock{{Type: "tool_use", Name: sanitizeToolCallLabel(n.Item.Type)}}, + }, + }}, false, nil + default: + // userMessage / agentMessage / reasoning / plan items: text is + // already streamed via item/agentMessage/delta and flushed at + // turn/completed, so the lifecycle markers themselves carry no + // extra visible payload. Skip. + return nil, false, nil + } + + case "thread/tokenUsage/updated": + var u codexTokenUsageNotif + if err := json.Unmarshal(msg.Params, &u); err != nil { + return nil, false, nil + } + meta := &EventMetadata{} + if u.TokenUsage.Last.TotalTokens > 0 { + meta.MeteringUsage = []MeteringEntry{{ + Value: float64(u.TokenUsage.Last.TotalTokens), + Unit: "token", + UnitPlural: "tokens", + }} + } + if u.TokenUsage.ModelContextWindow != nil && *u.TokenUsage.ModelContextWindow > 0 { + meta.ContextUsagePercent = normalizeContextUsage( + float64(u.TokenUsage.Last.TotalTokens) / float64(*u.TokenUsage.ModelContextWindow)) + } + return []Event{{Type: "metadata", SessionID: u.ThreadID, Metadata: meta}}, false, nil + + case "turn/completed": + var c codexTurnCompleted + _ = json.Unmarshal(msg.Params, &c) // best-effort; even on decode failure the turn is over + p.mu.Lock() + text := p.textBuf.String() + p.textBuf.Reset() + p.mu.Unlock() + + // Turn boundary emits up to TWO events, mirroring ACPProtocol.ReadEvent: + // + // 1. A synthesised assistant frame carrying the accumulated text as a + // single "text" content block. This is the ONLY place the visible + // reply materialises — item/agentMessage/delta notifications feed + // textBuf but never make it onto EventLog, so without this frame + // the dashboard has no bubble to render + // (process_event_format.go derives bubbles only from + // ev.Message.Content, never from a result event's Result field). + // 2. A pure result event carrying stopReason/threadId. Result is still + // populated so process_send.Send can plumb text into + // SendResult.Text for passthrough callers, but the EventLog + // converter treats result strictly as turn metadata. + var events []Event + if text != "" { + events = append(events, Event{ + Type: "assistant", + SessionID: c.ThreadID, + Message: &AssistantMessage{ + Content: []ContentBlock{{Type: "text", Text: text}}, + }, + }) + } + result := Event{Type: "result", SessionID: c.ThreadID, Result: text} + if c.Turn.Status == "failed" && c.Turn.Error != nil { + // Keep the accumulated text (if any) on the assistant frame above; + // the result carries the failure reason in SubType so partial + // streamed content is not lost (review #5). + result.SubType = "error" + result.Result = osutil.SanitizeForLog(c.Turn.Error.Message, 1024) + } + events = append(events, result) + return events, true, nil + + case "error": + // Stream-level error notification (e.g. provider 5xx). Surface as a + // system event; turn/completed with status:failed follows separately. + var e struct { + Error struct { + Message string `json:"message"` + } `json:"error"` + } + _ = json.Unmarshal(msg.Params, &e) + slog.Warn("codex: error notification", "msg", osutil.SanitizeForLog(e.Error.Message, 256)) + return nil, false, nil + + case "thread/started", "turn/started", "thread/status/changed", + "configWarning", "warning", "remoteControl/status/changed", + "item/reasoning/textDelta", "item/reasoning/summaryTextDelta", + "turn/plan/updated", "turn/diff/updated": + // Known-but-uninteresting lifecycle / reasoning / plan frames. Skip + // quietly so they do not pollute the event log. + return nil, false, nil + + default: + // Forward-compat: unknown methods are tolerated. + slog.Debug("codex: unknown notification", "method", msg.Method) + return nil, false, nil + } +} + +func (p *CodexProtocol) HandleEvent(w io.Writer, ev Event) bool { + if ev.Type != "permission_request" { + return false + } + // Auto-allow every approval request (validation §3.4: */requestApproval). + // Round-trip the id; codex approval responses take {decision: "approved"}. + // requestApproval ids may be numeric or string; reuse ACP HandleEvent's + // numeric-fast-path / string-marshal split (protocol_acp.go:809). If the + // source string parses as an integer it is already a valid JSON number + // literal — reuse it verbatim and skip a marshal; otherwise quote it. + idRaw := json.RawMessage(`null`) + if ev.RPCRequestID != "" { + if _, err := strconv.Atoi(ev.RPCRequestID); err == nil { + idRaw = json.RawMessage(ev.RPCRequestID) + } else if b, err := json.Marshal(ev.RPCRequestID); err == nil { + idRaw = b + } + } + resp := struct { + JSONRPC string `json:"jsonrpc"` + ID json.RawMessage `json:"id"` + Result struct { + Decision string `json:"decision"` + } `json:"result"` + }{JSONRPC: "2.0", ID: idRaw} + resp.Result.Decision = "approved" + data, err := json.Marshal(resp) + if err != nil { + slog.Warn("codex: marshal approval response failed", "err", err) + return true + } + if _, err := w.Write(append(data, '\n')); err != nil { + slog.Warn("codex: write approval response failed", "err", err) + } + return true +} + +// writeNotification emits a JSON-RPC notification (no id) on stdin. +func (p *CodexProtocol) writeNotification(w io.Writer, method string, params any) error { + msg := struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params any `json:"params,omitempty"` + }{JSONRPC: "2.0", Method: method, Params: params} + data, err := json.Marshal(msg) + if err != nil { + return err + } + _, err = w.Write(append(data, '\n')) + return err +} + +// sendAndWaitResponse writes req then blocks until the matching response +// arrives, consuming interleaved notifications. Reuses ACP's readUntilResponse +// (shared JSONRW + RPCMessage matching). The handshake RPCs (initialize / +// thread/start / thread/resume) reply promptly; turn/start is NOT sent through +// here (its response is deferred until after all item/* notifications and is +// instead surfaced via ReadEvent on the readLoop). +func (p *CodexProtocol) sendAndWaitResponse(rw *JSONRW, req RPCRequest) (*RPCMessage, error) { + data, err := json.Marshal(req) + if err != nil { + return nil, err + } + if err := rw.WriteLine(data); err != nil { + return nil, err + } + resp, err := p.readUntilResponse(rw, req.ID) + if err != nil { + metrics.RecordProtocolRPCError(p.BackendID, req.Method, "") + } + return resp, err +} + +// readUntilResponse reads lines until a JSON-RPC response with the matching id +// arrives, consuming interleaved notifications. Modelled on +// ACPProtocol.readUntilResponse (same timeout + shim-deadline-pulse contract); +// kept separate so the two handshake state machines stay independent and a +// codex schema change cannot perturb the kiro path. Times out after +// codexHandshakeTimeout. +func (p *CodexProtocol) readUntilResponse(rw *JSONRW, expectedID int) (*RPCMessage, error) { + type readResult struct { + msg *RPCMessage + err error + } + ch := make(chan readResult, 1) + var done atomic.Bool + send := func(r readResult) { + if done.Load() { + return + } + select { + case ch <- r: + default: + } + } + go func() { + for { + line, eof, err := rw.R.ReadLine() + if err != nil { + send(readResult{nil, fmt.Errorf("read codex response: %w", err)}) + return + } + if eof { + send(readResult{nil, fmt.Errorf("unexpected EOF during codex init")}) + return + } + if len(line) == 0 { + continue + } + var msg RPCMessage + if err := json.Unmarshal(line, &msg); err != nil { + continue + } + gotID, gotOK := msg.IDAsInt() + if msg.IsResponse() && gotOK && gotID == expectedID { + if msg.Error != nil { + send(readResult{nil, fmt.Errorf("%w %d: %s", ErrCodexRPC, + msg.Error.Code, osutil.SanitizeForLog(msg.Error.Message, 256))}) + return + } + send(readResult{&msg, nil}) + return + } + if done.Load() { + return + } + } + }() + + timer := time.NewTimer(codexHandshakeTimeout) + defer timer.Stop() + select { + case r := <-ch: + done.Store(true) + return r.msg, r.err + case <-timer.C: + done.Store(true) + // Mirror ACP's shim read-deadline pulse so a reader parked inside + // bufio.ReadBytes unblocks instead of lingering for the connection's + // lifetime (R184-CONCUR-H1). + if sl, ok := rw.R.(*shimLineReader); ok && sl.proc != nil && sl.proc.shimConn != nil { + _ = sl.proc.shimConn.SetReadDeadline(time.Now()) + _ = sl.proc.shimConn.SetReadDeadline(time.Time{}) + } + // Non-shim readers (no SetReadDeadline hook) leak the goroutine until + // the underlying codex process pipe closes — same known limitation as + // ACP's R224-GO-2. + return nil, fmt.Errorf("%w (id=%d)", ErrCodexTimeout, expectedID) + } +} diff --git a/internal/cli/protocol_codex_test.go b/internal/cli/protocol_codex_test.go new file mode 100644 index 000000000..9dfc1a3ad --- /dev/null +++ b/internal/cli/protocol_codex_test.go @@ -0,0 +1,453 @@ +package cli + +import ( + "bytes" + "encoding/json" + "strings" + "testing" +) + +// All wire shapes here were captured against codex-cli 0.141.0 on 2026-06-21; +// see docs/rfc/codex-backend-validation.md. + +func TestCodexProtocol_Name_Clone(t *testing.T) { + t.Parallel() + p := &CodexProtocol{BackendID: "codex"} + if p.Name() != "codex" { + t.Errorf("Name() = %q; want codex", p.Name()) + } + clone, ok := p.Clone().(*CodexProtocol) + if !ok { + t.Fatalf("Clone() returned %T; want *CodexProtocol", p.Clone()) + } + if clone.BackendID != "codex" { + t.Errorf("Clone lost BackendID = %q; want codex", clone.BackendID) + } +} + +func TestCodexProtocol_BuildArgs(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + args := p.BuildArgs(SpawnOptions{Model: "openai.gpt-oss-120b"}) + if len(args) == 0 || args[0] != "app-server" { + t.Fatalf("BuildArgs[0] = %v; want app-server first", args) + } + joined := strings.Join(args, " ") + for _, want := range []string{ + "model=openai.gpt-oss-120b", + "approval_policy=never", + "sandbox_mode=workspace-write", + } { + if !strings.Contains(joined, want) { + t.Errorf("BuildArgs missing %q; got %v", want, args) + } + } + // danger-full-access must NOT be requested (codex 0.141 rejects it with + // approval_policy=never and falls back to read-only). + if strings.Contains(joined, "danger-full-access") { + t.Errorf("BuildArgs must not request danger-full-access; got %v", args) + } +} + +func TestCodexProtocol_BuildArgs_NoModel(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + args := p.BuildArgs(SpawnOptions{}) + if strings.Contains(strings.Join(args, " "), "model=") { + t.Errorf("empty Model should not emit model=; got %v", args) + } +} + +func TestCodexProtocol_Init_NewThread(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + var written bytes.Buffer + mock := &mockLineReader{lines: []string{ + // initialize (id=0) response + `{"jsonrpc":"2.0","id":0,"result":{"userAgent":"codex"}}`, + // thread/start (id=1) response — threadId at result.thread.id + `{"jsonrpc":"2.0","id":1,"result":{"thread":{"id":"019ee98d-thread"}}}`, + }} + rw := &JSONRW{W: &written, R: mock} + + tid, err := p.Init(rw, "", "/tmp/work") + if err != nil { + t.Fatalf("Init error: %v", err) + } + if tid != "019ee98d-thread" { + t.Errorf("threadID = %q; want 019ee98d-thread", tid) + } + // The handshake must send initialize, then an `initialized` notification + // (no id), then thread/start. + out := written.String() + if !strings.Contains(out, `"method":"initialize"`) { + t.Error("Init did not send initialize") + } + if !strings.Contains(out, `"method":"initialized"`) { + t.Error("Init did not send initialized notification") + } + if !strings.Contains(out, `"method":"thread/start"`) { + t.Error("Init did not send thread/start") + } + // cwd must be threaded into thread/start. + if !strings.Contains(out, `/tmp/work`) { + t.Error("Init did not pass cwd into thread/start") + } +} + +func TestCodexProtocol_Init_Resume(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + var written bytes.Buffer + mock := &mockLineReader{lines: []string{ + `{"jsonrpc":"2.0","id":0,"result":{}}`, + `{"jsonrpc":"2.0","id":1,"result":{"thread":{"id":"resumed-thread"}}}`, + }} + rw := &JSONRW{W: &written, R: mock} + + tid, err := p.Init(rw, "resumed-thread", "/tmp/work") + if err != nil { + t.Fatalf("Init error: %v", err) + } + if tid != "resumed-thread" { + t.Errorf("threadID = %q; want resumed-thread", tid) + } + if !strings.Contains(written.String(), `"method":"thread/resume"`) { + t.Error("Init(resume) did not send thread/resume") + } +} + +func TestCodexProtocol_Init_RPCError(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + var written bytes.Buffer + mock := &mockLineReader{lines: []string{ + `{"jsonrpc":"2.0","id":0,"error":{"code":-32600,"message":"Not initialized"}}`, + }} + rw := &JSONRW{W: &written, R: mock} + if _, err := p.Init(rw, "", ""); err == nil { + t.Fatal("expected error on initialize RPC error") + } +} + +func TestCodexProtocol_WriteMessage_TurnStart(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + p.storeThreadID("t-1") + var w bytes.Buffer + if err := p.WriteMessage(&w, "hello codex", nil); err != nil { + t.Fatalf("WriteMessage error: %v", err) + } + var req struct { + Method string `json:"method"` + Params struct { + ThreadID string `json:"threadId"` + Input []struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"input"` + } `json:"params"` + } + if err := json.Unmarshal(w.Bytes(), &req); err != nil { + t.Fatalf("turn/start not valid JSON: %v\n%s", err, w.String()) + } + if req.Method != "turn/start" { + t.Errorf("method = %q; want turn/start", req.Method) + } + if req.Params.ThreadID != "t-1" { + t.Errorf("threadId = %q; want t-1", req.Params.ThreadID) + } + // input MUST be an array of UserInput (validation §3.2) — a bare string + // gets rejected with -32600. + if len(req.Params.Input) != 1 || req.Params.Input[0].Type != "text" || + req.Params.Input[0].Text != "hello codex" { + t.Errorf("input = %+v; want [{text:hello codex}]", req.Params.Input) + } +} + +func TestCodexProtocol_WriteInterrupt(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + + // Pre-handshake: no thread → ErrInterruptUnsupported. + var w0 bytes.Buffer + if err := p.WriteInterrupt(&w0, "req-1"); err != ErrInterruptUnsupported { + t.Errorf("pre-handshake WriteInterrupt err = %v; want ErrInterruptUnsupported", err) + } + + // Post-handshake: emits turn/interrupt request with threadId. + p.storeThreadID("t-9") + var w bytes.Buffer + if err := p.WriteInterrupt(&w, "req-2"); err != nil { + t.Fatalf("WriteInterrupt error: %v", err) + } + if !strings.Contains(w.String(), `"method":"turn/interrupt"`) { + t.Errorf("did not emit turn/interrupt; got %s", w.String()) + } + if !strings.Contains(w.String(), `"threadId":"t-9"`) { + t.Errorf("turn/interrupt missing threadId; got %s", w.String()) + } +} + +func TestCodexProtocol_ReadEvent_AgentMessageDelta(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + // delta is a plain string (validation §3.3). + line := `{"jsonrpc":"2.0","method":"item/agentMessage/delta","params":{"threadId":"t","itemId":"i","turnId":"u","delta":"Hi"}}` + evs, done, err := p.ReadEvent(line) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if done { + t.Error("delta should not end the turn") + } + if len(evs) != 1 || evs[0].Type != "assistant" { + t.Fatalf("evs = %+v; want one assistant event", evs) + } + // Accumulated text flushes on turn/completed. + line2 := `{"jsonrpc":"2.0","method":"item/agentMessage/delta","params":{"threadId":"t","itemId":"i","turnId":"u","delta":"!"}}` + if _, _, err := p.ReadEvent(line2); err != nil { + t.Fatalf("ReadEvent2 err: %v", err) + } + doneLine := `{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"t","turn":{"status":"completed"}}}` + evs3, done3, err := p.ReadEvent(doneLine) + if err != nil { + t.Fatalf("ReadEvent(turn/completed) err: %v", err) + } + if !done3 { + t.Error("turn/completed should end the turn") + } + // Turn boundary emits a synthesised assistant frame (the only source of the + // dashboard bubble) followed by a result metadata event. + if len(evs3) != 2 { + t.Fatalf("turn/completed events = %+v; want 2 (assistant + result)", evs3) + } + if evs3[0].Type != "assistant" || evs3[0].Message == nil || + len(evs3[0].Message.Content) != 1 || evs3[0].Message.Content[0].Text != "Hi!" { + t.Errorf("first event = %+v; want assistant frame with text 'Hi!'", evs3[0]) + } + if evs3[1].Type != "result" || evs3[1].Result != "Hi!" { + t.Errorf("second event = %+v; want result with text 'Hi!'", evs3[1]) + } +} + +func TestCodexProtocol_ReadEvent_TurnCompletedNoText(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + // No deltas streamed → only a result event, no empty assistant bubble. + line := `{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"t","turn":{"status":"completed"}}}` + evs, done, err := p.ReadEvent(line) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if !done { + t.Error("turn/completed should end the turn") + } + if len(evs) != 1 || evs[0].Type != "result" { + t.Fatalf("evs = %+v; want single result event when no text streamed", evs) + } +} + +func TestCodexProtocol_ReadEvent_TurnFailed(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + line := `{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"t","turn":{"status":"failed","error":{"message":"namespace tool rejected"}}}}` + evs, done, err := p.ReadEvent(line) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if !done { + t.Error("failed turn should still end the turn") + } + // No text streamed → just the error result event. + if len(evs) != 1 || evs[len(evs)-1].SubType != "error" { + t.Fatalf("evs = %+v; want one error result", evs) + } + if !strings.Contains(evs[len(evs)-1].Result, "namespace tool rejected") { + t.Errorf("error text not surfaced: %q", evs[len(evs)-1].Result) + } +} + +func TestCodexProtocol_ReadEvent_TurnFailedKeepsPartialText(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + // Partial text streamed before a failure must NOT be lost (review #5). + if _, _, err := p.ReadEvent(`{"jsonrpc":"2.0","method":"item/agentMessage/delta","params":{"threadId":"t","itemId":"i","turnId":"u","delta":"partial"}}`); err != nil { + t.Fatalf("delta err: %v", err) + } + line := `{"jsonrpc":"2.0","method":"turn/completed","params":{"threadId":"t","turn":{"status":"failed","error":{"message":"boom"}}}}` + evs, _, err := p.ReadEvent(line) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if len(evs) != 2 { + t.Fatalf("evs = %+v; want assistant(partial) + error result", evs) + } + if evs[0].Type != "assistant" || evs[0].Message == nil || evs[0].Message.Content[0].Text != "partial" { + t.Errorf("partial text lost: %+v", evs[0]) + } + if evs[1].SubType != "error" || !strings.Contains(evs[1].Result, "boom") { + t.Errorf("error result wrong: %+v", evs[1]) + } +} + +func TestCodexProtocol_ReadEvent_PostHandshakeErrorResponse(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + // A deferred turn/start error reply on the main readLoop must close the + // turn (done=true) and propagate an error, not be silently dropped. + line := `{"jsonrpc":"2.0","id":3,"error":{"code":-32001,"message":"Server overloaded"}}` + evs, done, err := p.ReadEvent(line) + if err == nil { + t.Fatal("expected error for error-bearing response") + } + if !done { + t.Error("error response should end the turn") + } + if len(evs) != 0 { + t.Errorf("evs = %+v; want none (error surfaced via err)", evs) + } +} + +func TestCodexProtocol_ReadEvent_ToolCall(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + line := `{"jsonrpc":"2.0","method":"item/started","params":{"threadId":"t","item":{"type":"commandExecution","id":"c1","status":"in_progress"}}}` + evs, done, err := p.ReadEvent(line) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if done { + t.Error("tool start should not end the turn") + } + if len(evs) != 1 || evs[0].SubType != "tool_use" || evs[0].ToolCall == nil { + t.Fatalf("evs = %+v; want one tool_use event with ToolCall", evs) + } + if evs[0].ToolCall.ID != "c1" { + t.Errorf("ToolCall.ID = %q; want c1", evs[0].ToolCall.ID) + } +} + +func TestCodexProtocol_ReadEvent_TokenUsage(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + line := `{"jsonrpc":"2.0","method":"thread/tokenUsage/updated","params":{"threadId":"t","turnId":"u","tokenUsage":{"last":{"inputTokens":72,"outputTokens":20,"totalTokens":92},"total":{"totalTokens":92},"modelContextWindow":128000}}}` + evs, done, err := p.ReadEvent(line) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if done { + t.Error("tokenUsage should not end the turn") + } + if len(evs) != 1 || evs[0].Type != "metadata" || evs[0].Metadata == nil { + t.Fatalf("evs = %+v; want one metadata event", evs) + } + if len(evs[0].Metadata.MeteringUsage) != 1 || evs[0].Metadata.MeteringUsage[0].Value != 92 { + t.Errorf("MeteringUsage = %+v; want one entry value 92", evs[0].Metadata.MeteringUsage) + } + if evs[0].Metadata.MeteringUsage[0].Unit != "token" { + t.Errorf("Unit = %q; want token", evs[0].Metadata.MeteringUsage[0].Unit) + } +} + +func TestCodexProtocol_ReadEvent_UnknownMethodTolerated(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + for _, line := range []string{ + `{"jsonrpc":"2.0","method":"thread/started","params":{}}`, + `{"jsonrpc":"2.0","method":"turn/started","params":{}}`, + `{"jsonrpc":"2.0","method":"some/brand/new/event","params":{"x":1}}`, + `{"jsonrpc":"2.0","method":"configWarning","params":{}}`, + } { + evs, done, err := p.ReadEvent(line) + if err != nil { + t.Errorf("ReadEvent(%s) err = %v; want nil", line, err) + } + if done { + t.Errorf("ReadEvent(%s) done = true; want false", line) + } + if len(evs) != 0 { + t.Errorf("ReadEvent(%s) evs = %+v; want none", line, evs) + } + } +} + +func TestCodexProtocol_ReadEvent_InvalidJSON(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + if _, _, err := p.ReadEvent(`{not json`); err == nil { + t.Error("ReadEvent should error on invalid JSON") + } +} + +func TestCodexProtocol_HandleEvent_AutoApprove(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + // A reverse approval request arrives via ReadEvent as a permission_request. + reqLine := `{"jsonrpc":"2.0","id":42,"method":"item/commandExecution/requestApproval","params":{"threadId":"t"}}` + evs, _, err := p.ReadEvent(reqLine) + if err != nil { + t.Fatalf("ReadEvent err: %v", err) + } + if len(evs) != 1 || evs[0].Type != "permission_request" { + t.Fatalf("evs = %+v; want one permission_request", evs) + } + if evs[0].RPCRequestID != "42" { + t.Errorf("RPCRequestID = %q; want 42", evs[0].RPCRequestID) + } + + var w bytes.Buffer + handled := p.HandleEvent(&w, evs[0]) + if !handled { + t.Error("HandleEvent should handle permission_request") + } + out := w.String() + if !strings.Contains(out, `"decision":"approved"`) { + t.Errorf("approval response not approved: %s", out) + } + // Numeric id must be echoed unquoted. + if !strings.Contains(out, `"id":42`) { + t.Errorf("approval response did not round-trip numeric id: %s", out) + } +} + +func TestCodexProtocol_HandleEvent_NonPermissionPassThrough(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + var w bytes.Buffer + if p.HandleEvent(&w, Event{Type: "assistant"}) { + t.Error("HandleEvent should not handle non-permission events") + } + if w.Len() != 0 { + t.Errorf("HandleEvent wrote %d bytes for non-permission event", w.Len()) + } +} + +func TestCodexProtocol_Capabilities(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + caps := p.Capabilities() + if caps.Replay || caps.Priority || caps.StreamJSON { + t.Errorf("caps = %+v; want Replay/Priority/StreamJSON all false", caps) + } + if !caps.SoftInterrupt { + t.Error("codex should advertise SoftInterrupt=true (turn/interrupt)") + } + if p.SupportsPriority() || p.SupportsReplay() { + t.Error("codex SupportsPriority/SupportsReplay must be false in phase1") + } +} + +func TestCodexProtocol_HandleEvent_StringID(t *testing.T) { + t.Parallel() + p := &CodexProtocol{} + // UUID-style ids must be quoted in the response. + ev := Event{Type: "permission_request", RPCRequestID: "abc-123-uuid"} + var w bytes.Buffer + if !p.HandleEvent(&w, ev) { + t.Fatal("HandleEvent should handle permission_request") + } + if !strings.Contains(w.String(), `"id":"abc-123-uuid"`) { + t.Errorf("string id not quoted: %s", w.String()) + } +} From c146c20c30717b44699a9605ecf957e28c015094 Mon Sep 17 00:00:00 2001 From: KevinZhao Date: Sun, 21 Jun 2026 12:08:53 +0000 Subject: [PATCH 2/6] fix(cli): fix CI + complete codex backend wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - internal/upstream/caps_test.go: derivedCaps() 现含 codex 的 RequiredNodeCaps "codex-app-server",更新断言为 ["acp","codex-app-server"] (修复 CI test/test-macos FAILURE:TestDerivedCaps_FromDefaultRegistry)。 - internal/node/caps.go: knownServerCaps 增 "codex-app-server",与 kiro 的 "acp" 对齐,消除 codex 子节点注册时的 spurious "unknown capabilities" WARN。 - docs/rfc: 二次实测纠正 —— Bedrock + gpt-5.5(us-east-1/2,内置 amazon-bedrock provider)支持 codex 完整 shell agentic(codex exec + app-server 实测通过), 是首选 Bedrock 部署路径;§7.1 约束 2(namespace 工具被拒)仅适用于 gpt-oss。 完整性审计(vs kiro 端到端接线):profile 注册 / detect / 节点 cap 派生+远程 门控 / config 校验 / doctor / dashboard chip+tag+feature+cost API / 本地 wrapper 全部经 backend.Profile 注册表泛型驱动,codex 零额外编辑生效。codexjsonl 历史 source 为 phase1+(当前 NoopHistorySource 优雅降级,不 panic)。 go build/vet/test ./internal/... ./cmd/... 全绿。 --- docs/rfc/codex-backend-validation.md | 45 ++++++++++++++++++++++++---- docs/rfc/codex-backend.md | 41 +++++++++++++++++-------- internal/node/caps.go | 11 +++---- internal/upstream/caps_test.go | 5 ++-- 4 files changed, 77 insertions(+), 25 deletions(-) diff --git a/docs/rfc/codex-backend-validation.md b/docs/rfc/codex-backend-validation.md index e6e080132..7efca9600 100644 --- a/docs/rfc/codex-backend-validation.md +++ b/docs/rfc/codex-backend-validation.md @@ -12,12 +12,13 @@ - ✅ **app-server JSON-RPC 协议完整跑通**:`initialize → initialized → thread/start → turn/start → turn/started → item/started → item/completed → turn/completed`,threadId 正确捕获。RFC §2.4 的事件流草图被实测**证实**。 - ✅ **method / notification 名经 `generate-json-schema` + 实测全部命中** RFC §2.3/§2.4 的假设(`thread/start`、`turn/start`、`turn/interrupt`、`turn/steer`、`thread/resume`、`item/agentMessage/delta`、`turn/completed` 等)。 - 🔥 **重大纠正**:RFC §7 / §9.1 称 "Codex 模型只能走 OpenAI 自家(不像 claude 走 Bedrock)" —— **此论断已过时**。codex 0.141 **内置 `amazon-bedrock` model provider**(openai/codex PR #18744,2026-04-20 合入)。实测 `bedrock-mantle.us-west-2.api.aws/v1/responses` + `openai.gpt-oss-120b` 返回真实回复。 -- ⚠️ **两个 Bedrock 约束**(影响 "codex on bedrock" 完整能力,详见 §4): - 1. codex 内置 `amazon-bedrock` provider 把请求打到 `/openai/v1/responses`(带 `/openai` 前缀),但 Bedrock 上的 gpt-oss 只在 `/v1/responses`(无前缀)服务 —— 内置 provider 直接报 `model does not support the '/openai/v1/responses' API`。需用**自定义 provider** 指向正确的 `…/v1` base_url 绕过。 - 2. Bedrock 的 gpt-oss responses 端点**拒绝 codex 内置 agentic 工具的 `namespace` tool 变体**(只接受 `function` / `mcp`),完整 shell-agentic turn 因此 `status: failed`。纯对话 / function-calling 可用,但 codex 招牌的本地 shell agentic 能力在 Bedrock gpt-oss 上**受限**。 -- ❌ gpt-5.x 系列在 us-west-2 Bedrock **不可用**,只有 `gpt-oss-*`(20b/120b/safeguard)`ACTIVE`。 +- ✅ **Bedrock + gpt-5.5 = 完整 agentic**(2026-06-21 二次实测,**首选 Bedrock 路径**):gpt-5.x 在 **us-east-1 / us-east-2** 的 `bedrock-mantle/openai/v1/responses` 可用,直接用 codex **内置 `amazon-bedrock` provider**(其默认路径正是 `/openai/v1/responses`)。`codex exec` + `app-server` 两路径验证:纯对话 + **shell agentic 工具调用全部跑通**(`cat probe.txt` 执行成功),不触发 gpt-oss 的 namespace 工具被拒。 +- ⚠️ **gpt-oss 是受限退路**(详见 §4): + 1. codex 内置 `amazon-bedrock` provider 打 `/openai/v1/responses`,但 gpt-oss 只在 `/v1/responses` 服务 → gpt-oss 需**自定义 provider** 指向无前缀 base_url。 + 2. Bedrock gpt-oss responses 端点**拒绝 codex 内置 agentic 工具的 `namespace` tool 变体**(只接受 `function`/`mcp`),shell-agentic turn `status:failed`。gpt-oss 仅纯对话 / function-calling。 +- 模型可用性:us-west-2 只有 `gpt-oss-*`;gpt-5.x 在 us-east-1/2(mantle-only,`list-foundation-models` 不列,须直接探端点)。 -**结论**:app-server 协议路线 100% 可行,按 RFC §1.3 实施。Bedrock 作为一个**已验证但 agentic 受限**的部署选项写入 RFC;面向用户的 codex 完整能力仍以 OpenAI 凭据 / ChatGPT 登录为主路径。 +**结论**:app-server 协议路线 100% 可行,按 RFC §1.3 实施。**Bedrock + gpt-5.5(us-east-1/2,内置 provider)是完整 agentic 的首选部署路径**;gpt-oss 是 agentic 受限的退路;OpenAI 直连凭据为第三选择。 --- @@ -171,7 +172,39 @@ ERROR: Failed to deserialize the JSON body into the target type: codex 内置的 agentic 工具集(shell/apply_patch 等)用 `type:"namespace"` 工具声明,Bedrock 的 gpt-oss responses 端点只认 `function`/`mcp`。手动用 `type:"function"` 工具直接打端点则通过 —— 说明这是 codex 工具声明形态与 Bedrock gpt-oss 子集不兼容,而非链路问题。 -**含义**:Bedrock gpt-oss 路径下 codex 可做纯对话 / 自定义 function-calling,但**无法**跑 codex 招牌的本地 shell agentic 工作流。完整 agentic 能力仍需 OpenAI 凭据(gpt-5.x-codex 系列)。 +**含义**:Bedrock **gpt-oss** 路径下 codex 可做纯对话 / 自定义 function-calling,但**无法**跑 codex 招牌的本地 shell agentic 工作流。完整 agentic 能力需走 gpt-5.x(§4.5)或 OpenAI 凭据。 + +### 4.5 gpt-5.5 完整 agentic(首选路径,2026-06-21 二次实测) + +gpt-5.x 与 gpt-oss 行为相反 —— 它走带前缀的 `/openai/v1/responses`,正好是内置 provider 的默认路径,且**没有** namespace 工具约束。 + +**区域可用性探测**(`list-foundation-models` 不列 mantle-only 模型,须直接探端点): + +| region / path | 结果 | +|---|---| +| us-west-2 `/v1/responses` openai.gpt-5.5 | ❌ 404 does not exist | +| us-east-2 / us-east-1 `/v1/responses` openai.gpt-5.5 | ⚠️ 400 "does not support the '/v1/responses' API"(存在,但走别的路径) | +| **us-east-2 / us-east-1 `/openai/v1/responses` openai.gpt-5.5** | ✅ **HTTP 200** 真实 response | +| us-east-2 `/openai/v1/chat/completions` openai.gpt-5.5 | ❌ 400 responses-only | + +**端到端(内置 `amazon-bedrock` provider,无需自定义)**: + +```toml +model_provider = "amazon-bedrock" +model = "openai.gpt-5.5" +[model_providers.amazon-bedrock.aws] +region = "us-east-2" +``` + +| 验证 | 结果 | +|---|---| +| `codex exec` 纯对话 | ✅ 返回 `NAOZHI_GPT55_OK`(7748 tokens) | +| `codex exec` **shell agentic** | ✅ codex 调用 shell 工具 `cat probe.txt` 成功执行并返回正确内容(15580 tokens),**无 namespace 被拒** | +| `app-server` 协议层(naozhi 实际路径) | ✅ `turn/completed` status=`completed`,事件流 `item/agentMessage/delta` + `thread/tokenUsage/updated` 完整 | + +本机坑:bubblewrap 在容器内报 `Unexpected capabilities but not setuid` → 测 agentic 用 `--dangerously-bypass-approvals-and-sandbox`,与模型/协议无关。 + +**结论**:**Bedrock + gpt-5.5(us-east-1/2,内置 provider)支持 codex 完整 shell agentic**,是 naozhi 接 codex 的首选 Bedrock 部署路径。RFC §7.1 约束 2 仅适用于 gpt-oss。 --- diff --git a/docs/rfc/codex-backend.md b/docs/rfc/codex-backend.md index 7be329eb5..19f477214 100644 --- a/docs/rfc/codex-backend.md +++ b/docs/rfc/codex-backend.md @@ -311,30 +311,47 @@ Codex CLI 鉴权三条路径: | 路径 | 适用 | naozhi 立场 | |---|---|---| -| **Amazon Bedrock**(`AWS_BEARER_TOKEN_BEDROCK` 或 AWS 凭据链) | 复用部署机已有 AWS 接入 | ✅ 已验证连通(见 §7.1 约束) | -| `CODEX_API_KEY` / `OPENAI_API_KEY` 环境变量 | 完整 agentic(gpt-5.x-codex) | 推荐,无 Bedrock 工具约束 | +| **Amazon Bedrock + gpt-5.5**(`AWS_BEARER_TOKEN_BEDROCK` 或 AWS 凭据链,us-east-1/2) | 复用部署机 AWS 接入 + 完整 agentic | ✅ **首选**,已验证完整 agentic(见 §7.1) | +| Amazon Bedrock + gpt-oss(us-west-2) | 纯对话 / function-calling | ⚠️ agentic 受限退路(见 §7.2) | +| `CODEX_API_KEY` / `OPENAI_API_KEY` 环境变量 | 完整 agentic | 直连 OpenAI,无区域限制 | | `codex login` 持久化 ChatGPT 登录态 | 桌面交互 | 不适合 headless naozhi | -### 7.1 Bedrock 部署(已验证 + 两个约束) +### 7.1 Bedrock + gpt-5.5(首选路径,2026-06-21 实测完整 agentic) -实测环境 `AWS_REGION=us-west-2`,`bedrock-mantle.us-west-2.api.aws/v1/responses` + `openai.gpt-oss-120b` 连通。配置(用**自定义 provider**,不用内置 `amazon-bedrock`): +gpt-5.x 在 **us-east-1 / us-east-2** 的 `bedrock-mantle/openai/v1/responses` 上可用(responses-only,不支持 chat completions)。这恰好是 codex **内置 `amazon-bedrock` provider** 的默认路径,所以**直接用内置 provider,无需自定义**: + +```toml +model_provider = "amazon-bedrock" +model = "openai.gpt-5.5" + +[model_providers.amazon-bedrock.aws] +region = "us-east-2" +# 凭据走 AWS_BEARER_TOKEN_BEDROCK(Bedrock API key)或标准 AWS 凭据链 +``` + +实测(codex 0.141.0,`codex exec` + `app-server` 两路径): +- ✅ 纯对话返回正确(`turn/completed` status=`completed`)。 +- ✅ **完整 agentic 跑通**:codex 成功调用 shell 工具(`cat probe.txt` 执行并返回正确内容),**不**触发 gpt-oss 的 `namespace` 工具被拒错误。 + +> 注意:`aws bedrock list-foundation-models` **不列** mantle-only 的 gpt-5.x —— 必须直接探端点确认(404=不存在;400 "does not support /v1/responses"=存在但走 `/openai/v1/responses`)。 + +### 7.2 Bedrock + gpt-oss(受限退路) + +实测 `bedrock-mantle.us-west-2.api.aws/v1/responses` + `openai.gpt-oss-120b` 连通,但有两个约束,需用**自定义 provider**: ```toml [model_providers.bedrockmantle] name = "Bedrock Mantle" -base_url = "https://bedrock-mantle..api.aws/v1" # 注意:无 /openai 前缀 +base_url = "https://bedrock-mantle.us-west-2.api.aws/v1" # 注意:无 /openai 前缀 wire_api = "responses" # chat 在 0.141 已废弃 env_key = "AWS_BEARER_TOKEN_BEDROCK" ``` -**约束 1(路径)**:codex 内置 `amazon-bedrock` provider 打到 `/openai/v1/responses`,但 Bedrock 上 gpt-oss 只在 `/v1/responses` 服务 → 必须用上面的自定义 provider 指向正确 base_url。 - -**约束 2(agentic 受限)**:Bedrock 的 gpt-oss responses 端点拒绝 codex 内置 agentic 工具的 `type:"namespace"` 声明(只认 `function`/`mcp`),完整 shell-agentic turn 会 `status:failed`。**Bedrock gpt-oss 路径仅支持纯对话 / function-calling,不支持 codex 招牌的本地 shell agentic**。完整 agentic 需 OpenAI 凭据 + gpt-5.x-codex。 - -**模型可用性**:us-west-2 Bedrock 只有 `gpt-oss-{20b,120b,safeguard}`,无 gpt-5.x。 +- **约束 1(路径)**:内置 `amazon-bedrock` provider 打 `/openai/v1/responses`,gpt-oss 只在 `/v1/responses` 服务 → 必须自定义 provider 指向无前缀 base_url。 +- **约束 2(agentic 受限)**:Bedrock gpt-oss responses 拒绝 codex 内置 agentic 工具的 `type:"namespace"` 声明(只认 `function`/`mcp`),完整 shell-agentic turn `status:failed`。**gpt-oss 路径仅纯对话 / function-calling**。 落地前提: -- 部署文档:启用 codex backend 时,二选一 —— ① Bedrock(`AWS_BEARER_TOKEN_BEDROCK` + 自定义 provider,agentic 受限);② `CODEX_API_KEY`/`codex login`(完整 agentic)。 +- 部署文档:启用 codex backend 时择一 —— ① **Bedrock + gpt-5.5**(内置 provider,us-east-1/2,完整 agentic,推荐);② Bedrock + gpt-oss(自定义 provider,us-west-2,agentic 受限);③ `CODEX_API_KEY`/`codex login`。 - `cmd/naozhi/doctor.go` 增 codex 健康检查:`codex --version` 能跑 + (`~/.codex/auth.json` / `CODEX_API_KEY` / `AWS_BEARER_TOKEN_BEDROCK` 三选一)。 不在本 RFC 范围:自建凭据池、企业网关代理、按 session 切 key、Bedrock Access Gateway(BAG)代理把 namespace 工具翻译给 gpt-oss。 @@ -391,7 +408,7 @@ env_key = "AWS_BEARER_TOKEN_BEDROCK" | `app-server` 仍是新接口(2025 末才推),可能有未发现的边缘 case | M | Phase 0 实测全跑过;订阅 codex GitHub release notes | | 多 thread 共享一个 app-server 进程的并发隔离质量未知(V8) | M | phase1 仍按 1 thread / 进程,跟 claude 对齐;V8 验证后 phase2 做"一进程多 thread"优化 | | `turn/steer` 与 naozhi `/urgent` 语义对接细节 | L | phase2 单独 RFC | -| ~~Codex 模型只能走 OpenAI 自家~~ → **已验证可走 Bedrock**,但 gpt-oss agentic 工具受限(§7.1 约束 2) | M | Bedrock 仅做纯对话/function-calling;完整 agentic 用 `CODEX_API_KEY` + gpt-5.x-codex | +| ~~Codex 模型只能走 OpenAI 自家~~ → **已验证可走 Bedrock 且完整 agentic**(gpt-5.5 @ us-east-1/2,§7.1);gpt-oss 路径 agentic 受限(§7.2) | L | 首选 Bedrock + gpt-5.5(内置 provider,完整 shell agentic 实测通过);gpt-oss 仅纯对话退路 | | Approval flow `requestUserInput` 转 AskUserQuestion 卡片的 schema 映射 | L | phase2,phase1 默认拒绝避免悬挂 | ### 9.2 未决 diff --git a/internal/node/caps.go b/internal/node/caps.go index 794568c38..c44fef08d 100644 --- a/internal/node/caps.go +++ b/internal/node/caps.go @@ -14,11 +14,12 @@ import ( // // R212-ARCH-402. var knownServerCaps = map[string]struct{}{ - "gemini": {}, - "acp": {}, - "askuser": {}, - "attach": {}, - "scratch": {}, + "gemini": {}, + "acp": {}, + "codex-app-server": {}, + "askuser": {}, + "attach": {}, + "scratch": {}, } // logUnknownCaps emits a WARN when `advertised` contains cap strings not diff --git a/internal/upstream/caps_test.go b/internal/upstream/caps_test.go index 228001a77..0a6ad7d03 100644 --- a/internal/upstream/caps_test.go +++ b/internal/upstream/caps_test.go @@ -26,7 +26,8 @@ func seedDefaultBackends(t *testing.T) { // TestDerivedCaps_FromDefaultRegistry asserts that the union over the // shipped Profiles produces the expected sorted slice. Today: claude -// has no caps, kiro has "acp"; the wire output is ["acp"]. +// has no caps, kiro has "acp", codex has "codex-app-server"; the wire +// output is the alpha-sorted ["acp", "codex-app-server"]. // // If we ever add a backend with a cap, the assertion will fail // loudly — by design — so the operator-facing register frame change @@ -35,7 +36,7 @@ func TestDerivedCaps_FromDefaultRegistry(t *testing.T) { seedDefaultBackends(t) got := derivedCaps() - want := []string{"acp"} + want := []string{"acp", "codex-app-server"} if !reflect.DeepEqual(got, want) { t.Fatalf("derivedCaps() = %v; want %v", got, want) } From 10f4330df85311886c15d8609ac5e926a912fa05 Mon Sep 17 00:00:00 2001 From: KevinZhao Date: Sun, 21 Jun 2026 12:31:18 +0000 Subject: [PATCH 3/6] =?UTF-8?q?feat(history):=20add=20codexjsonl=20source?= =?UTF-8?q?=20=E2=80=94=20history-panel=20parity=20with=20cc/kiro?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 补齐 codex backend 唯一的结构性抽象缺口:历史面板回填。codex 之前是 NoopHistorySource(优雅降级,历史会话显示空),现与 claude/kiro 对等。 - internal/history/codexjsonl/source.go — history.Source 实现,读 ~/.codex/sessions/YYYY/MM/DD/rollout--.jsonl。codex 的 rollout 格式比 kiro 更干净:event_msg 行(user_message/agent_message) 自带 ISO-8601 时间戳和已拼接纯文本,无需借时间戳。WalkDir 按 `-.jsonl` 后缀匹配日期分桶树;镜像 kirojsonl 的 path-traversal 防护 / 16MiB 上限 / ctx 取消 / 逐行容错降级。init() 注册 codex factory。 - HistoryWiring.CodexSessionsDir 字段贯通:cli/history.go + RouterConfig + router_core/lifecycle(codexSessionsDir) + main.go(~/.codex/sessions) + wireup/history_backends.go blank-import。 - 测试:source_test.go(glob/event_msg 映射/ISO 时间戳/limit/before 过滤/ path-traversal/缺文件/降级/坏行跳过/factory)+ 实测 codex 0.141 真实 rollout 文件解析正确(user prompt + agent reply 时间戳准确)。 go build/vet/test ./internal/... ./cmd/... 全绿。 说明:askuser / passthrough / embedded_context 三个 Feature 标志仍为 phase1 false——codex 协议有对应原语(item/tool/requestUserInput / turn/steer / mention UserInput),但需各自实现底层管线后再翻 true,否则 dashboard 会显示点了不工作的控件。属 RFC §10 Phase 2 范围。 --- cmd/naozhi/main.go | 7 +- internal/cli/history.go | 4 + internal/history/codexjsonl/source.go | 307 +++++++++++++++++++++ internal/history/codexjsonl/source_test.go | 213 ++++++++++++++ internal/session/router_core.go | 36 ++- internal/session/router_lifecycle.go | 7 +- internal/wireup/history_backends.go | 1 + 7 files changed, 559 insertions(+), 16 deletions(-) create mode 100644 internal/history/codexjsonl/source.go create mode 100644 internal/history/codexjsonl/source_test.go diff --git a/cmd/naozhi/main.go b/cmd/naozhi/main.go index 3f157529a..458422b13 100644 --- a/cmd/naozhi/main.go +++ b/cmd/naozhi/main.go @@ -222,7 +222,12 @@ func main() { // CLI's per-session jsonl after naozhi restart. Default path // is the kiro CLI's documented location; a config override is // a follow-up sprint. - KiroSessionsDir: osutil.ExpandHome("~/.kiro/sessions/cli"), + KiroSessionsDir: osutil.ExpandHome("~/.kiro/sessions/cli"), + // CodexSessionsDir feeds the codexjsonl history factory so "load + // earlier" pages can fall back to the codex CLI's date-bucketed + // rollout transcripts after a naozhi restart. Default path is the + // codex CLI's documented location. + CodexSessionsDir: osutil.ExpandHome("~/.codex/sessions"), EventLogDir: eventLogDir, EventLogGenerator: "naozhi", }) diff --git a/internal/cli/history.go b/internal/cli/history.go index 1ff5f1001..b099336e7 100644 --- a/internal/cli/history.go +++ b/internal/cli/history.go @@ -74,6 +74,10 @@ type HistoryWiring struct { // the kirojsonl factory reads per-session JSONL files from beneath this // directory. R228-CR-P3-6. KiroSessionsDir string + // CodexSessionsDir is ~/.codex/sessions. Wired from cmd/naozhi/main.go; + // the codexjsonl factory globs date-bucketed rollout files + // (YYYY/MM/DD/rollout-*-.jsonl) beneath this directory. + CodexSessionsDir string // EventLogDir is naozhi's per-session event log directory. Listed // here for symmetry; current backend factories don't read it // (naozhilog is the local tier in MergedSource and is wired diff --git a/internal/history/codexjsonl/source.go b/internal/history/codexjsonl/source.go new file mode 100644 index 000000000..d77554637 --- /dev/null +++ b/internal/history/codexjsonl/source.go @@ -0,0 +1,307 @@ +// Package codexjsonl implements history.Source on top of the codex CLI's +// per-session rollout transcripts under ~/.codex/sessions. +// +// Unlike claude (project-slug dirs) or kiro (flat .jsonl), codex +// persists each thread as a date-bucketed rollout file whose name embeds +// the thread UUID: +// +// ~/.codex/sessions/YYYY/MM/DD/rollout--.jsonl +// +// The threadId is the same UUID naozhi captures from thread/start, so the +// source globs for the suffix `-.jsonl` across the date tree +// rather than composing a single deterministic path. +// +// Each line is a self-describing record with a top-level "type" + ISO-8601 +// "timestamp". We consume the two `event_msg` payloads that map cleanly to +// a chat-history view (codex's transcript is friendlier than kiro's — these +// lines carry both a real timestamp AND already-joined plain text): +// +// {"timestamp":"2026-06-21T11:53:07.956Z","type":"event_msg", +// "payload":{"type":"user_message","message":"..."}} +// {"timestamp":"2026-06-21T11:53:11.127Z","type":"event_msg", +// "payload":{"type":"agent_message","message":"..."}} +// +// Other line types (session_meta, turn_context, response_item, +// token_count, task_started/complete) are silently skipped so the schema +// can evolve without breaking pagination. +package codexjsonl + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/naozhi/naozhi/internal/cli" +) + +// SessionIDFunc returns the codex thread ID for the bound session, or "" +// when no thread has been negotiated yet. Re-evaluated on every LoadBefore +// call so a thread/resume transition is observed by the next page request. +type SessionIDFunc func() string + +// maxFileBytes caps how many bytes LoadBefore reads from a single rollout +// file. Mirrors kirojsonl's per-session safety limit so a runaway +// transcript can't OOM the dashboard. +const maxFileBytes = 16 << 20 // 16 MiB + +// ctxCheckEvery is how many parsed lines elapse between context.Done +// checks during parsing. Mirrors kirojsonl. +const ctxCheckEvery = 100 + +// Source is the codex rollout-JSONL-backed history.Source. +type Source struct { + rootDir string // ~/.codex/sessions — empty disables the source + sessionID SessionIDFunc // produces the current codex thread ID +} + +// New constructs a Source. If rootDir is empty or sessionIDFn is nil, the +// Source degrades to a zero-result implementation so misconfiguration never +// produces a nil-pointer panic. Callers always get a non-nil *Source and +// can rely on LoadBefore returning (nil, nil) in degraded states. +func New(rootDir string, sessionIDFn SessionIDFunc) *Source { + return &Source{rootDir: rootDir, sessionID: sessionIDFn} +} + +// init registers this backend's factory so any *cli.Wrapper constructed +// with BackendID="codex" picks up the codex rollout history source +// automatically when this package is blank-imported (wireup). +func init() { + cli.RegisterHistoryFactory("codex", factory) +} + +// factory is the cli.HistoryFactoryFn for codex. Returns +// cli.NoopHistorySource when the wiring lacks a CodexSessionsDir so a +// router-level misconfig still yields a non-nil source. +func factory(s cli.HistorySessionView, deps cli.HistoryWiring) cli.HistorySource { + if deps.CodexSessionsDir == "" { + return cli.NoopHistorySource{} + } + return New(deps.CodexSessionsDir, s.SessionID) +} + +// codexRecord is the on-disk line wrapper. Payload is held as RawMessage so +// only the event_msg lines we care about pay the second decode. +type codexRecord struct { + Timestamp string `json:"timestamp"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` +} + +// codexEventMsg is the payload of an event_msg line. message is the +// already-joined plain text for user_message / agent_message. +type codexEventMsg struct { + Type string `json:"type"` + Message string `json:"message"` +} + +// LoadBefore returns up to `limit` entries strictly older than beforeMS for +// the bound codex thread, in chronological order (oldest → newest). When +// beforeMS <= 0 the upper bound is dropped and callers receive the newest +// `limit` entries. +// +// Errors are informational: the history.Source contract treats them as +// end-of-history, so an unreadable rollout falls through to MergedSource's +// non-fatal logging path rather than aborting pagination. +func (s *Source) LoadBefore(ctx context.Context, beforeMS int64, limit int) ([]cli.EventEntry, error) { + if limit <= 0 { + return nil, nil + } + if s == nil || s.rootDir == "" || s.sessionID == nil { + return nil, nil + } + sid := s.sessionID() + if sid == "" { + return nil, nil + } + + // Defence-in-depth: SessionIDFunc is exported, so reject a sid that + // could escape rootDir via the glob pattern. Treat a bad sid as "no + // session" rather than an error (matches kirojsonl). + if strings.ContainsAny(sid, `/\`) || strings.Contains(sid, "..") { + slog.Warn("codexjsonl: refusing sid containing path separator or '..'", + "sid_len", len(sid)) + return nil, nil + } + + path, err := s.findRollout(sid) + if err != nil || path == "" { + // No matching rollout yet (new thread, or codex hasn't flushed). + return nil, nil + } + + f, err := os.Open(path) // #nosec G304 -- path resolved from a glob rooted at rootDir; sid validated above + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, fmt.Errorf("codexjsonl: open %s: %w", path, err) + } + defer f.Close() + + entries := s.parseFile(ctx, f, beforeMS) + + sort.SliceStable(entries, func(i, j int) bool { + return entries[i].Time < entries[j].Time + }) + + if len(entries) > limit { + entries = entries[len(entries)-limit:] + } + return entries, nil +} + +// findRollout locates the rollout file whose name ends in +// `-.jsonl` under the date-bucketed tree. codex names files +// rollout--.jsonl inside YYYY/MM/DD/, so a recursive +// match on the suffix is the robust lookup (the leading timestamp is not +// known to naozhi). When multiple match (should not happen — threadId is a +// UUID), the lexicographically last is returned so a resumed/forked thread +// reading the freshest file wins. +func (s *Source) findRollout(sid string) (string, error) { + suffix := "-" + sid + ".jsonl" + var best string + walkErr := filepath.WalkDir(s.rootDir, func(p string, d os.DirEntry, err error) error { + if err != nil { + // Unreadable subdir: skip it, keep walking the rest. + if d != nil && d.IsDir() { + return filepath.SkipDir + } + return nil + } + if d.IsDir() { + return nil + } + name := d.Name() + if strings.HasPrefix(name, "rollout-") && strings.HasSuffix(name, suffix) { + if p > best { + best = p + } + } + return nil + }) + if walkErr != nil && best == "" { + return "", walkErr + } + return best, nil +} + +// parseFile streams the rollout file, decoding each event_msg line that +// satisfies the beforeMS upper bound. Blank lines, malformed JSON, unknown +// types, and unparseable timestamps are individually skipped so a bad line +// never poisons the rest of the file. Returns entries in arrival order +// (chronological per codex's append contract). +func (s *Source) parseFile(ctx context.Context, f *os.File, beforeMS int64) []cli.EventEntry { + limited := io.LimitReader(f, maxFileBytes) + scanner := bufio.NewScanner(limited) + // Allow 1 MiB lines — assistant messages can be long; the default 64 KiB + // would truncate token-rich replies. + scanner.Buffer(make([]byte, 0, 64*1024), 1<<20) + + out := make([]cli.EventEntry, 0, 16) + processed := 0 + for scanner.Scan() { + if processed%ctxCheckEvery == 0 { + select { + case <-ctx.Done(): + return out + default: + } + } + processed++ + + line := scanner.Bytes() + if len(line) == 0 { + continue + } + entry, ok := decodeLine(line) + if !ok { + continue + } + if beforeMS > 0 && entry.Time >= beforeMS { + continue + } + out = append(out, entry) + } + if err := scanner.Err(); err != nil { + slog.Debug("codexjsonl: scanner error treated as EOF", "err", err) + } + return out +} + +// decodeLine parses one rollout record into an EventEntry. Returns +// (EventEntry{}, false) when the line is not a renderable event_msg +// (user_message / agent_message), is malformed, has no parseable timestamp, +// or carries empty text. +func decodeLine(line []byte) (cli.EventEntry, bool) { + var rec codexRecord + if err := json.Unmarshal(line, &rec); err != nil { + slog.Debug("codexjsonl: skip malformed line", "err", err) + return cli.EventEntry{}, false + } + if rec.Type != "event_msg" || len(rec.Payload) == 0 { + return cli.EventEntry{}, false + } + + var ev codexEventMsg + if err := json.Unmarshal(rec.Payload, &ev); err != nil { + slog.Debug("codexjsonl: skip line with bad payload", "err", err) + return cli.EventEntry{}, false + } + + var entryType string + switch ev.Type { + case "user_message": + entryType = "user" + case "agent_message": + // "text" matches the cc dashboard contract — dashboard.js renders the + // markdown bubble on e.type === 'text'. Emitting "assistant" would + // fall through to the unknown-type card. + entryType = "text" + default: + // system / reasoning / token_count / task_* lines are not chat bubbles. + return cli.EventEntry{}, false + } + + if strings.TrimSpace(ev.Message) == "" { + return cli.EventEntry{}, false + } + + timeMS, ok := parseISOms(rec.Timestamp) + if !ok { + return cli.EventEntry{}, false + } + + return cli.EventEntry{ + Time: timeMS, + Type: entryType, + Summary: ev.Message, + }, true +} + +// parseISOms converts codex's ISO-8601 RFC3339 timestamp (e.g. +// "2026-06-21T11:53:07.956Z") to unix milliseconds. Returns (0, false) on +// an unparseable or non-positive value so the entry is dropped rather than +// collapsed to epoch (which would corrupt the strict-< pagination cursor). +func parseISOms(s string) (int64, bool) { + if s == "" { + return 0, false + } + t, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + return 0, false + } + ms := t.UnixMilli() + if ms <= 0 { + return 0, false + } + return ms, true +} diff --git a/internal/history/codexjsonl/source_test.go b/internal/history/codexjsonl/source_test.go new file mode 100644 index 000000000..4073c8796 --- /dev/null +++ b/internal/history/codexjsonl/source_test.go @@ -0,0 +1,213 @@ +package codexjsonl + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/naozhi/naozhi/internal/cli" +) + +// writeRollout creates a date-bucketed rollout file for sid under root and +// returns nothing — codex names files YYYY/MM/DD/rollout--.jsonl. +func writeRollout(t *testing.T, root, sid string, lines []string) { + t.Helper() + dir := filepath.Join(root, "2026", "06", "21") + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + name := "rollout-2026-06-21T09-35-20-" + sid + ".jsonl" + body := "" + for _, l := range lines { + body += l + "\n" + } + if err := os.WriteFile(filepath.Join(dir, name), []byte(body), 0o644); err != nil { + t.Fatalf("write rollout: %v", err) + } +} + +func TestSource_ImplementsInterface(t *testing.T) { + t.Parallel() + var _ cli.HistorySource = (*Source)(nil) +} + +func TestSource_LoadBefore_FullRoundTrip(t *testing.T) { + t.Parallel() + dir := t.TempDir() + sid := "019ee988-da7f-7821-b6d1-7b74a7db62d6" + writeRollout(t, dir, sid, []string{ + `{"timestamp":"2026-06-21T09:35:20.700Z","type":"session_meta","payload":{"id":"x"}}`, + `{"timestamp":"2026-06-21T09:35:21.000Z","type":"event_msg","payload":{"type":"user_message","message":"hello codex"}}`, + `{"timestamp":"2026-06-21T09:35:22.500Z","type":"event_msg","payload":{"type":"token_count","message":""}}`, + `{"timestamp":"2026-06-21T09:35:23.000Z","type":"event_msg","payload":{"type":"agent_message","message":"hi there"}}`, + `{"timestamp":"2026-06-21T09:35:24.000Z","type":"response_item","payload":{"type":"message"}}`, + }) + + src := New(dir, func() string { return sid }) + got, err := src.LoadBefore(context.Background(), 0, 10) + if err != nil { + t.Fatalf("LoadBefore: %v", err) + } + if len(got) != 2 { + t.Fatalf("got %d entries, want 2 (user + agent only): %+v", len(got), got) + } + if got[0].Type != "user" || got[0].Summary != "hello codex" { + t.Errorf("entry[0] = %+v; want user/hello codex", got[0]) + } + if got[1].Type != "text" || got[1].Summary != "hi there" { + t.Errorf("entry[1] = %+v; want text/hi there", got[1]) + } + // Chronological order via real ISO timestamps. + if !(got[0].Time < got[1].Time) { + t.Errorf("entries not chronological: %d >= %d", got[0].Time, got[1].Time) + } +} + +func TestSource_LoadBefore_Limit(t *testing.T) { + t.Parallel() + dir := t.TempDir() + sid := "lim-thread" + writeRollout(t, dir, sid, []string{ + `{"timestamp":"2026-06-21T09:35:21.000Z","type":"event_msg","payload":{"type":"user_message","message":"m1"}}`, + `{"timestamp":"2026-06-21T09:35:22.000Z","type":"event_msg","payload":{"type":"agent_message","message":"m2"}}`, + `{"timestamp":"2026-06-21T09:35:23.000Z","type":"event_msg","payload":{"type":"user_message","message":"m3"}}`, + }) + src := New(dir, func() string { return sid }) + got, err := src.LoadBefore(context.Background(), 0, 2) + if err != nil { + t.Fatalf("LoadBefore: %v", err) + } + if len(got) != 2 { + t.Fatalf("got %d, want 2 (newest tail)", len(got)) + } + // Newest two: m2, m3. + if got[0].Summary != "m2" || got[1].Summary != "m3" { + t.Errorf("got %q,%q; want m2,m3", got[0].Summary, got[1].Summary) + } +} + +func TestSource_LoadBefore_BeforeFiltering(t *testing.T) { + t.Parallel() + dir := t.TempDir() + sid := "before-thread" + writeRollout(t, dir, sid, []string{ + `{"timestamp":"2026-06-21T09:35:21.000Z","type":"event_msg","payload":{"type":"user_message","message":"old"}}`, + `{"timestamp":"2026-06-21T09:35:25.000Z","type":"event_msg","payload":{"type":"agent_message","message":"new"}}`, + }) + src := New(dir, func() string { return sid }) + // beforeMS just after the first entry → only "old" survives. + oldMS := int64(1782034521000 + 1) // approx; compute from actual below + // Recompute precisely: 2026-06-21T09:35:21Z. + _ = oldMS + full, _ := src.LoadBefore(context.Background(), 0, 10) + if len(full) != 2 { + t.Fatalf("precondition: want 2 entries, got %d", len(full)) + } + cut := full[1].Time // == "new" time; strictly-< drops it + got, err := src.LoadBefore(context.Background(), cut, 10) + if err != nil { + t.Fatalf("LoadBefore: %v", err) + } + if len(got) != 1 || got[0].Summary != "old" { + t.Fatalf("before-filter got %+v; want only 'old'", got) + } +} + +func TestSource_LoadBefore_RejectsPathTraversal(t *testing.T) { + t.Parallel() + dir := t.TempDir() + for _, sid := range []string{"../etc/passwd", "a/b", `a\b`, ".."} { + src := New(dir, func() string { return sid }) + got, err := src.LoadBefore(context.Background(), 0, 10) + if err != nil || got != nil { + t.Errorf("sid %q: got (%v,%v); want (nil,nil)", sid, got, err) + } + } +} + +func TestSource_LoadBefore_MissingFile(t *testing.T) { + t.Parallel() + dir := t.TempDir() + src := New(dir, func() string { return "no-such-thread" }) + got, err := src.LoadBefore(context.Background(), 0, 10) + if err != nil || got != nil { + t.Errorf("got (%v,%v); want (nil,nil) for missing rollout", got, err) + } +} + +func TestSource_LoadBefore_DegradedStates(t *testing.T) { + t.Parallel() + // nil sessionID fn / empty rootDir / empty sid all degrade to (nil,nil). + cases := []*Source{ + New("", func() string { return "x" }), + New("/tmp", nil), + New("/tmp", func() string { return "" }), + } + for i, src := range cases { + got, err := src.LoadBefore(context.Background(), 0, 10) + if err != nil || got != nil { + t.Errorf("case %d: got (%v,%v); want (nil,nil)", i, got, err) + } + } + // limit <= 0 short-circuits. + src := New("/tmp", func() string { return "x" }) + if got, err := src.LoadBefore(context.Background(), 0, 0); got != nil || err != nil { + t.Errorf("limit 0: got (%v,%v); want (nil,nil)", got, err) + } +} + +func TestSource_LoadBefore_MalformedLinesSkipped(t *testing.T) { + t.Parallel() + dir := t.TempDir() + sid := "malformed-thread" + writeRollout(t, dir, sid, []string{ + `{not json`, + ``, + `{"timestamp":"2026-06-21T09:35:21.000Z","type":"event_msg","payload":{"type":"user_message","message":"survivor"}}`, + `{"timestamp":"bad-ts","type":"event_msg","payload":{"type":"agent_message","message":"dropped-no-ts"}}`, + `{"timestamp":"2026-06-21T09:35:23.000Z","type":"event_msg","payload":{"type":"agent_message","message":" "}}`, + }) + src := New(dir, func() string { return sid }) + got, err := src.LoadBefore(context.Background(), 0, 10) + if err != nil { + t.Fatalf("LoadBefore: %v", err) + } + if len(got) != 1 || got[0].Summary != "survivor" { + t.Fatalf("got %+v; want only 'survivor' (bad ts + blank text dropped)", got) + } +} + +func TestFactory_DegradesWithoutDir(t *testing.T) { + t.Parallel() + got := factory(stubView{sid: "x"}, cli.HistoryWiring{}) // no CodexSessionsDir + if _, ok := got.(cli.NoopHistorySource); !ok { + t.Errorf("factory without CodexSessionsDir = %T; want NoopHistorySource", got) + } + got2 := factory(stubView{sid: "x"}, cli.HistoryWiring{CodexSessionsDir: "/tmp"}) + if _, ok := got2.(*Source); !ok { + t.Errorf("factory with CodexSessionsDir = %T; want *Source", got2) + } +} + +func TestParseISOms(t *testing.T) { + t.Parallel() + if _, ok := parseISOms(""); ok { + t.Error("empty ts should fail") + } + if _, ok := parseISOms("not-a-time"); ok { + t.Error("garbage ts should fail") + } + ms, ok := parseISOms("2026-06-21T09:35:21.000Z") + if !ok || ms <= 0 { + t.Errorf("valid ts parse failed: ms=%d ok=%v", ms, ok) + } +} + +// stubView is a minimal cli.HistorySessionView for factory tests. +type stubView struct{ sid string } + +func (s stubView) SessionKey() string { return "k" } +func (s stubView) Workspace() string { return "/tmp" } +func (s stubView) SessionID() string { return s.sid } +func (s stubView) SnapshotChainIDs() []string { return nil } diff --git a/internal/session/router_core.go b/internal/session/router_core.go index ba9342248..52883ad14 100644 --- a/internal/session/router_core.go +++ b/internal/session/router_core.go @@ -270,6 +270,12 @@ type Router struct { // 读写: core (init), lifecycle (attachHistorySource), discovery (attachHistorySource via Register* / Takeover) kiroSessionsDir string + // codexSessionsDir is the codex session-state root (~/.codex/sessions). + // Plumbed into cli.HistoryWiring at attachHistorySource time so the + // codexjsonl factory can glob per-thread rollout files. Wired from + // RouterConfig.CodexSessionsDir in cmd/naozhi/main.go. + codexSessionsDir string + // wsStore is the per-chat workspace-override facet (Router P1, #383): // the overrides map plus its dirty flag and mutation gen, extracted into // workspaceStore (router_workspace.go) which carries the verbatim @@ -692,6 +698,11 @@ type RouterConfig struct { // in this file's import block). Set by cmd/naozhi/main.go from // config. R228-CR-P3-4. KiroSessionsDir string + // CodexSessionsDir is the codex CLI's session-state root, typically + // ~/.codex/sessions. Empty disables codex history fallback; non-empty + // enables the codexjsonl factory (registered via blank import in + // wireup). Set by cmd/naozhi/main.go from config. + CodexSessionsDir string // EventLogDir is where naozhi's per-session event log files live. // When empty, event log persistence is DISABLED and the router // falls back to Claude CLI JSONL as the sole history source. When @@ -801,18 +812,19 @@ func NewRouter(cfg RouterConfig) *Router { } r := &Router{ - maxProcs: cfg.MaxProcs, - ttl: cfg.TTL, - pruneTTL: cfg.PruneTTL, - defaultCWD: cfg.Workspace, - claudeDir: cfg.ClaudeDir, - kiroSessionsDir: cfg.KiroSessionsDir, - storePath: cfg.StorePath, - noOutputTimeout: cfg.NoOutputTimeout, - totalTimeout: cfg.TotalTimeout, - eventLogDir: cfg.EventLogDir, - historyLoader: cfg.HistoryLoader, - resolver: cfg.Resolver, + maxProcs: cfg.MaxProcs, + ttl: cfg.TTL, + pruneTTL: cfg.PruneTTL, + defaultCWD: cfg.Workspace, + claudeDir: cfg.ClaudeDir, + kiroSessionsDir: cfg.KiroSessionsDir, + codexSessionsDir: cfg.CodexSessionsDir, + storePath: cfg.StorePath, + noOutputTimeout: cfg.NoOutputTimeout, + totalTimeout: cfg.TotalTimeout, + eventLogDir: cfg.EventLogDir, + historyLoader: cfg.HistoryLoader, + resolver: cfg.Resolver, } // ss is a value field (no lock of its own); its maps must be allocated // explicitly since composite-literal sub-struct field init is not used here diff --git a/internal/session/router_lifecycle.go b/internal/session/router_lifecycle.go index ea7c54c35..8bb50c49b 100644 --- a/internal/session/router_lifecycle.go +++ b/internal/session/router_lifecycle.go @@ -117,9 +117,10 @@ func (r *Router) attachHistorySource(s *ManagedSession) { } deps := cli.HistoryWiring{ - ClaudeDir: r.claudeDir, - KiroSessionsDir: r.kiroSessionsDir, - EventLogDir: r.eventLogDir, + ClaudeDir: r.claudeDir, + KiroSessionsDir: r.kiroSessionsDir, + CodexSessionsDir: r.codexSessionsDir, + EventLogDir: r.eventLogDir, } // Wrapper.NewHistorySource is nil-safe and never returns nil; the diff --git a/internal/wireup/history_backends.go b/internal/wireup/history_backends.go index 91d1942ef..6cca86037 100644 --- a/internal/wireup/history_backends.go +++ b/internal/wireup/history_backends.go @@ -23,5 +23,6 @@ import ( // surfaces accidental double-wireup at startup rather than at // runtime. _ "github.com/naozhi/naozhi/internal/history/claudejsonl" + _ "github.com/naozhi/naozhi/internal/history/codexjsonl" _ "github.com/naozhi/naozhi/internal/history/kirojsonl" ) From 5b142a6da0619e1318989c458e04a93c298639cd Mon Sep 17 00:00:00 2001 From: KevinZhao Date: Sun, 21 Jun 2026 12:48:47 +0000 Subject: [PATCH 4/6] fix(session): add router-field access annotation for codexSessionsDir MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit check-router-fields lint 要求每个 Router 字段声明 `// 读写:` 访问集; codexSessionsDir 与 kiroSessionsDir 同访问模式(core init / lifecycle attachHistorySource / discovery)。修复 lint-router CI 失败。 --- internal/session/router_core.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/session/router_core.go b/internal/session/router_core.go index 52883ad14..42a5628e1 100644 --- a/internal/session/router_core.go +++ b/internal/session/router_core.go @@ -274,6 +274,7 @@ type Router struct { // Plumbed into cli.HistoryWiring at attachHistorySource time so the // codexjsonl factory can glob per-thread rollout files. Wired from // RouterConfig.CodexSessionsDir in cmd/naozhi/main.go. + // 读写: core (init), lifecycle (attachHistorySource), discovery (attachHistorySource via Register* / Takeover) codexSessionsDir string // wsStore is the per-chat workspace-override facet (Router P1, #383): From 97cc7cc92d2aa57ffe9d4abde90bfde6a0160fee Mon Sep 17 00:00:00 2001 From: KevinZhao Date: Sun, 21 Jun 2026 13:00:33 +0000 Subject: [PATCH 5/6] =?UTF-8?q?docs(rfc):=20codex=20Phase-2=20feature=20fe?= =?UTF-8?q?asibility=20analysis=20(=E5=AE=9E=E6=B5=8B=E6=8E=A2=E9=92=88)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 三个 claude 专属 Feature 在 codex backend 的可行性评估,含 codex 0.141 live 探针结论: - embedded_context: EASY/MODERATE — @path 经 shell 工具读取(非原生内联), 倾向结构化 mention UserInput;推荐第一做(~0.5-1d)。 - passthrough: MODERATE — 实测确认 clientUserMessageId 逐字 round-trip 为 item.clientId + turn/steer mid-turn 成功,slot-matching 适配器可行(非重设计); 第二做(~3-4d)。 - askuser: HARD — 阻塞式 RPC 重引入 pending/TTL 表,且 requestUserInput 请求/响应 schema 未验证;推迟,先 live 捕获。 codex 与 kiro Feature map 逐位相同,三个 false 是所有非-replay 后端的 共同 Phase-2 空缺,非 codex 独有。 --- docs/rfc/codex-backend-phase2-feasibility.md | 95 ++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 docs/rfc/codex-backend-phase2-feasibility.md diff --git a/docs/rfc/codex-backend-phase2-feasibility.md b/docs/rfc/codex-backend-phase2-feasibility.md new file mode 100644 index 000000000..854b68629 --- /dev/null +++ b/docs/rfc/codex-backend-phase2-feasibility.md @@ -0,0 +1,95 @@ +# Codex Backend — Phase 2 Feature 可行性分析 + +> **状态**: 可行性分析(含 codex 0.141.0 实测探针,2026-06-21) +> **前置**: `codex-backend.md` v2 + `codex-backend-validation.md`(Phase 1 已交付 PR #2216) +> **范围**: 三个 claude 专属 UX Feature 在 codex backend 的落地评估。**本文不含实现代码。** + +--- + +## 0. 背景 + +Phase 1 后,codex 与 kiro 的 Feature map **逐位相同**(7 项),三个 `false` 是所有非-claude(JSON-RPC)后端的共同空缺,非 codex 独有: + +| Feature | claude | kiro | codex | 本文结论 | +|---|---|---|---|---| +| askuser | ✅ | ❌ | ❌ | **HARD**(阻塞式 RPC,需 pending 表)→ 推迟 | +| passthrough | ✅ | ❌ | ❌ | **MODERATE**(实测原语齐备)→ 第二做 | +| embedded_context | ✅ | ❌ | ❌ | **EASY/MODERATE**(structured mention)→ 第一做 | +| image_input / mcp_http | ✅ | ✅ | ✅ | 已支持 | +| audio_input / mcp_sse | ✅ | ❌ | ❌ | 跨后端共同空缺,不在本文 | + +翻 `true` 前必须实现底层管线,否则 dashboard 显示点了不工作的控件(`featureForCurrent(name)` 门控见 `internal/server/static/dashboard.js`)。 + +--- + +## 1. embedded_context(@file mention)— **EASY/MODERATE,第一做** + +### claude 参考 +- **引用式,naozhi 不读不内联文件**:`@path` token 原样混在 prompt 文本里传给 CLI,claude 自己解析。dashboard `dashboard.js:~3940` 仅是发送门控(不支持的后端 block),非 UI 控件。 + +### codex 机制(实测) +- **探针结果**:`codex exec "...@context.txt..."` —— codex **不原生内联** `@path`,而是**用 shell 工具**(`rg`/`sed`)agentic 地读文件后回答。答对了,但走 tool-use 路径,非 claude 式静态内联。 +- **结构化 mention 可用**:`UserInput` 有 `mention` 变体 `{type:"mention", name, path}`(schema stable union,非 UNSTABLE)。codex 服务端解析路径(变体无 content 字段 → 服务端内联)。 + +### 落地方案 +- **方案 A(引用式,零协议码)**:直接翻 `embedded_context=true`,靠 codex 的 shell 工具 agentic 读 `@path`。**风险**:仅在 agentic 模式(sandbox 允许读文件)下工作;纯对话/受限沙箱下 `@path` 无效 → 与 claude 的"任何模式都内联"语义不符。 +- **方案 B(结构化 mention,~30-60 LoC,推荐)**:`WriteMessage`/`WriteUserMessageLocked` 解析 `@path` token,额外发一个 `{type:"mention", name, path}` UserInput 条目,codex 服务端解析。naozhi 仍不读文件。需确认 `path` 是绝对还是 cwd-相对。 + +### 估时 / 风险 +- ~0.5–1 天(含一次实测确认 mention path 语义)。风险低,隔离在 `protocol_codex.go` WriteMessage + profile flag。 + +--- + +## 2. passthrough(/urgent 抢占 + 多消息并发)— **MODERATE,第二做** + +### claude 参考 +- 硬依赖 `SupportsReplay()`(`protocol.go:106`;`passthrough.go:55` 无则提前返回)。每个 `Send` 分配带 128-bit hex uuid 的 `sendSlot`(`passthrough.go:27`),claude 用 `--replay-user-messages` 回显 `{type:"user",isReplay:true,uuid}`,`handleReplayEventLocked`(`passthrough.go:356`)按 uuid 匹配 slot。`/urgent`→`priority:"now"`(`send.go:69`)。 +- codex 当前 `SupportsReplay/SupportsPriority` 均 false → 落 Collect 模式。 + +### codex 机制(**已实测确认**) +- ✅ **`clientUserMessageId` → `item.clientId` 完整 round-trip**(探针:发 `clientUserMessageId="naozhi-slot-ABC123"`,`item/started type=userMessage` 回 `"clientId":"naozhi-slot-ABC123"` 逐字一致)。这正是 naozhi slot-matching 的前提。 +- ✅ **`turn/steer` mid-turn 可用**(探针:turn 进行中发 `turn/steer` 带 `expectedTurnId`,返回成功 `{turnId}` 无错;`slot-1`+`slot-2` 两个 clientId 都回显)。 +- ✅ `turn/started` 携带 `turnId`,供 steer 的 `expectedTurnId` 前置条件。 + +### 落地方案(**适配器,非重设计**) +1. `WriteUserMessageLocked` 把 slot uuid 写进 `clientUserMessageId`。 +2. `ReadEvent` 拦截 `item/started type=userMessage`(当前被跳过),按 `item.clientId` 匹配 slot(类比 `handleReplayEventLocked`)。 +3. 追踪活跃 `turnId`(来自 `turn/started`);mid-turn 走 `turn/steer`,无活跃 turn 回退 `turn/start`。 +4. 翻 `SupportsReplay`/`Caps.Replay`=true。 + +### 估时 / 风险 +- ~3–4 天,新增 ~200-300 LoC + 测试。 +- **风险**:① `expectedTurnId` 竞态(读 turnId 与发 steer 之间 turn 可能完成 → steer 报错,需回退);② codex **无 `priority:"now"` 通道** → `/urgent` 语义变成"append 到当前 turn"而非 claude 的"抢占+重排",产品行为不完全一致,可能需要把 `/urgent` 与通用多消息 passthrough 分开门控;③ 多 steer 是否**合并成一个 result**(claude 的 merged-replay sweep 依赖此)未实测,落地前需补一次 capture。 + +--- + +## 3. askuser(AskUserQuestion 卡片)— **HARD,推迟** + +### claude 参考 +- **观察式/fire-and-forget**:claude `-p` 自动 inject `is_error:true` tool_result ~3ms 后 turn 正常结束(`protocol_claude.go:502`)。naozhi 从不写 tool_result。用户答案作为**下一轮 user 消息**回流。RFC `docs/rfc/askuser-question.md`。Event 形状 `cli.AskQuestion`(`clievent/types.go`)。 + +### codex 机制 +- `CodexProtocol.ReadEvent`(`protocol_codex.go:343`)当前**显式丢弃** `requestUserInput`(返回 nil 防 turn 挂起)。 +- RFC §2.5 记 0.141 运行时观察到 `item/tool/requestUserInput` 反向请求,但**这是阻塞式 server→client 请求**(server 等响应),与 claude 的 fire-and-forget 根本不同。 +- **未决**:`requestUserInput` 的请求+响应 schema 在当前 schema dump 中缺失(只见 `[UNSTABLE]` guardian + mcp-elicitation)。0.141 的方法名可能已 stale,落地前必须 live capture 真实请求体+响应体。 + +### 落地难点 +- 阻塞式 RPC vs "答案作为下一轮消息"语义错配,会**重新引入** claude RFC 删掉的 pending-request + TTL 状态表:捕获 `requestId` → 持有到用户答 → 响应**那个特定 RPC**。若用户不答,codex turn 挂起到自身超时(比 claude 自动解析差),需 TTL 自动 decline。 + +### 估时 / 风险 +- ~2–3 天**起**(若响应 schema 简单稳定),新增 ~150-250 LoC。**风险高**:schema 未验证 + 可能落到 `[UNSTABLE]` guardian 路径。**结论:推迟,先 live 捕获真实方法名+响应体再排期。** + +--- + +## 4. 推荐顺序 + +1. **embedded_context**(~0.5-1d,EASY)—— 隔离、低风险,方案 B 结构化 mention。 +2. **passthrough**(~3-4d,MODERATE)—— 实测原语齐备(clientId round-trip + steer),适配器复用现有 slot 机制;落地前补一次"多 steer 是否合并 result"的 capture。 +3. **askuser**(推迟,HARD/部分 BLOCKED)—— 阻塞式 RPC 重引入 pending/TTL 机制,且请求/响应 schema 未验证;先 live 捕获再排期。 + +> 每个特性建议独立 PR(独立可回滚)。passthrough/embedded_context 的底层若做通用,等于同时惠及 kiro(同为非-replay 后端)。 + +## 5. 实测探针留痕(2026-06-21,codex 0.141 + gpt-5.5 @ Bedrock) +- embedded_context:`@context.txt` 经 shell 工具读取(非原生内联)→ 倾向结构化 mention。 +- passthrough:`clientUserMessageId` 逐字 round-trip 为 `item.clientId`;`turn/steer` mid-turn 成功返回 `{turnId}`,双 slot clientId 均回显。 +- askuser:`requestUserInput` 响应 schema 未捕获(推迟前置任务)。 From 9af0fde67e20200a3e64df95be8506c761682adf Mon Sep 17 00:00:00 2001 From: cron-bot Date: Sun, 21 Jun 2026 13:02:11 +0000 Subject: [PATCH 6/6] fix(cli): close codex turn on post-handshake RPC error (readLoop swallowed ErrCodexRPC) [#2216] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodexProtocol.ReadEvent returns (nil, true, %w ErrCodexRPC) for a deferred turn/start ERROR response on the main readLoop, intending to close the turn — but handleShimStdout only recognised ErrACPRPC, so errors.Is(codexErr, ErrACPRPC) was false and the error fell into the 'skip unparseable event' branch. No synthetic result event was dispatched, the turn never closed, and the codex session hung in state=running — exactly the failure mode the ReadEvent comment claims to prevent. Triggerable because turn/start is sent fire-and-forget via WriteMessage, so any error reply (-32001 overload, gpt-oss tool rejection) lands on this path. Fix: extract rpcErrorTurnEnd(err) which recognises BOTH ErrACPRPC and ErrCodexRPC and returns the backend tag for the synthesized result. The readLoop error branch calls it instead of an inline ACP-only errors.Is, so a new backend only needs to register its sentinel in one place. Regression test rpc_error_turn_end_test.go covers both sentinels, the wrapped+bare forms, a non-RPC parse error (must NOT synthesize), and a guard asserting ErrCodexRPC does not satisfy errors.Is(ErrACPRPC) — the exact distinction the old code missed. Full internal/cli green. Found via adversarial review of this PR. --- internal/cli/process_readloop.go | 49 ++++++++++++--- internal/cli/rpc_error_turn_end_test.go | 84 +++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 internal/cli/rpc_error_turn_end_test.go diff --git a/internal/cli/process_readloop.go b/internal/cli/process_readloop.go index 8f34a7e6f..8123f5f87 100644 --- a/internal/cli/process_readloop.go +++ b/internal/cli/process_readloop.go @@ -361,6 +361,27 @@ func (p *Process) handleShimMessage(msg shimMsg, log *slog.Logger) shimDispatchO return shimDispatchContinue } +// rpcErrorTurnEnd reports whether err is a protocol RPC-error sentinel that +// ReadEvent returns (with done=true) to signal "the backend rejected the +// request — close the turn". When it is, ok=true and tag is the short +// backend prefix for the synthesized result text. A non-RPC error (e.g. an +// unparseable frame) returns ok=false so the readLoop skips it. +// +// Every backend whose ReadEvent surfaces a post-handshake RPC error this way +// MUST register its sentinel here, else handleShimStdout drops the error and +// the session hangs in state=running. ACP/kiro: ErrACPRPC (session/prompt +// reject); codex: ErrCodexRPC (deferred turn/start reject, #2216). +func rpcErrorTurnEnd(err error) (tag string, ok bool) { + switch { + case errors.Is(err, ErrACPRPC): + return "[kiro] ", true + case errors.Is(err, ErrCodexRPC): + return "[codex] ", true + default: + return "", false + } +} + // handleShimStdout decodes a stdout frame into one or more protocol Events // and runs each through HandleEvent / dispatchProtocolEvent. Returns // shimDispatchReturn when dispatch reports killCh fired so the readLoop @@ -381,20 +402,28 @@ func (p *Process) handleShimStdout(msg shimMsg, log *slog.Logger) shimDispatchOu events, _, err = p.protocol.ReadEvent(msg.Line) } if err != nil { - // ACP RPC errors: kiro returned an error response to a request - // we sent (typically session/prompt). The turn is over from - // kiro's POV — done=true comes back from ReadEvent so we - // can synthesize a visible "result" event and let the active - // Send() unblock. Without this, state stays "running" forever - // (operator-visible as "kiro session never replies"; reproduced - // 2026-05-19 r3-cancel/r3-lifecycle stuck after restart). - if errors.Is(err, ErrACPRPC) { + // Protocol RPC errors: the backend returned an error response to a + // request we sent (ACP/kiro: session/prompt; codex: the deferred + // turn/start reply on the main readLoop). The turn is over from the + // backend's POV — done=true comes back from ReadEvent so we can + // synthesize a visible "result" event and let the active Send() + // unblock. Without this, state stays "running" forever (operator- + // visible as "session never replies"; ACP reproduced 2026-05-19 + // r3-cancel/r3-lifecycle stuck after restart; codex turn/start is + // sent fire-and-forget via WriteMessage so any error reply — e.g. + // -32001 overload — lands here and must close the turn). + // + // Both backends use distinct sentinels (ErrACPRPC / ErrCodexRPC); + // rpcErrorTurnEnd centralises the recognition so a new protocol that + // surfaces RPC errors this way only needs to register its sentinel + // there — otherwise its post-handshake failures are silently swallowed. + if tag, ok := rpcErrorTurnEnd(err); ok { events = []Event{{ Type: "result", SubType: "error", - Result: "[kiro] " + err.Error(), + Result: tag + err.Error(), }} - log.Warn("readLoop: kiro returned RPC error; surfacing as failed turn", + log.Warn("readLoop: backend returned RPC error; surfacing as failed turn", "err", err, "seq", msg.Seq) // Fall through into the normal turn-end dispatch path // below so the assistant bubble + state transition happen. diff --git a/internal/cli/rpc_error_turn_end_test.go b/internal/cli/rpc_error_turn_end_test.go new file mode 100644 index 000000000..e227a3d2d --- /dev/null +++ b/internal/cli/rpc_error_turn_end_test.go @@ -0,0 +1,84 @@ +package cli + +import ( + "errors" + "fmt" + "testing" +) + +// TestRPCErrorTurnEnd pins the readLoop's RPC-error-sentinel recognition +// (#2216). handleShimStdout only synthesizes a turn-closing result event when +// rpcErrorTurnEnd returns ok; before the fix it inlined errors.Is(err, +// ErrACPRPC) only, so a codex turn/start error (wrapping ErrCodexRPC) fell +// through to "skip unparseable event" and the session hung in state=running. +func TestRPCErrorTurnEnd(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + wantOK bool + wantTag string + }{ + { + name: "ACP RPC error wraps ErrACPRPC", + err: fmt.Errorf("%w -32000: model overloaded", ErrACPRPC), + wantOK: true, + wantTag: "[kiro] ", + }, + { + name: "codex RPC error wraps ErrCodexRPC", + err: fmt.Errorf("%w -32001: Server overloaded", ErrCodexRPC), + wantOK: true, + wantTag: "[codex] ", + }, + { + name: "bare ErrCodexRPC sentinel", + err: ErrCodexRPC, + wantOK: true, wantTag: "[codex] ", + }, + { + name: "bare ErrACPRPC sentinel", + err: ErrACPRPC, + wantOK: true, wantTag: "[kiro] ", + }, + { + name: "unrelated parse error is not a turn-end", + err: errors.New("unexpected end of JSON input"), + wantOK: false, + }, + { + name: "nil error", + err: nil, + wantOK: false, + }, + } + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + tag, ok := rpcErrorTurnEnd(tc.err) + if ok != tc.wantOK { + t.Fatalf("rpcErrorTurnEnd(%v) ok = %v, want %v", tc.err, ok, tc.wantOK) + } + if ok && tag != tc.wantTag { + t.Errorf("tag = %q, want %q", tag, tc.wantTag) + } + }) + } +} + +// TestRPCErrorTurnEnd_CodexNotConfusedWithACP guards the exact bug: the codex +// sentinel must NOT require the ACP sentinel to be recognised. errors.Is +// across the two distinct sentinels is false, which is why the old +// ACP-only check dropped codex errors. +func TestRPCErrorTurnEnd_CodexNotConfusedWithACP(t *testing.T) { + t.Parallel() + codexErr := fmt.Errorf("%w 1: boom", ErrCodexRPC) + if errors.Is(codexErr, ErrACPRPC) { + t.Fatal("ErrCodexRPC must not satisfy errors.Is(ErrACPRPC) — the two are distinct sentinels") + } + if _, ok := rpcErrorTurnEnd(codexErr); !ok { + t.Fatal("codex RPC error must be recognised as a turn-end (regression #2216)") + } +}