From 7ac86e3c05eed7a86ed4585cddf8a1e05e1723f7 Mon Sep 17 00:00:00 2001 From: lewis617 Date: Thu, 4 Jun 2026 11:06:38 +0800 Subject: [PATCH 1/2] fix: replay conversation history via session/update on session/load loadSession() now replays the full conversation history via user_message_chunk, agent_message_chunk, agent_thought_chunk, and tool_call/tool_call_update notifications before returning, as required by the ACP v1 spec. Uses agent.getFullMessageThread() for complete history including compacted sessions, with fallback to agent.messages. Fixes #1185 --- packages/code/examples/acp/acp-load-replay.ts | 250 ++++++++++++++++++ packages/code/src/acp/agent.ts | 136 ++++++++++ packages/code/tests/acp/agent.test.ts | 169 ++++++++++++ 3 files changed, 555 insertions(+) create mode 100644 packages/code/examples/acp/acp-load-replay.ts diff --git a/packages/code/examples/acp/acp-load-replay.ts b/packages/code/examples/acp/acp-load-replay.ts new file mode 100644 index 00000000..b22c122d --- /dev/null +++ b/packages/code/examples/acp/acp-load-replay.ts @@ -0,0 +1,250 @@ +import { spawn } from "node:child_process"; +import { Readable, Writable } from "node:stream"; +import { + ClientSideConnection, + ndJsonStream, + type Client, + type SessionNotification, +} from "@agentclientprotocol/sdk"; +import path from "node:path"; +import os from "node:os"; +import fs from "node:fs"; + +/** + * Test session/load conversation history replay. + * + * 1. Create a session, send a prompt, get a response + * 2. Stop the agent process + * 3. Start a new agent process and load the same session + * 4. Verify the conversation history is replayed via session/update notifications + */ +async function runLoadReplayExample() { + const agentCommand = [ + "tsx", + "--tsconfig", + path.join(process.cwd(), "tsconfig.dev.json"), + path.join(process.cwd(), "src/index.ts"), + "--acp", + ]; + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "acp-load-replay-")); + console.log(`Using temporary directory: ${tmpDir}`); + + let sessionId: string | undefined; + + // --- Phase 1: Create session and send a prompt --- + console.log("\n=== Phase 1: Create session and send prompt ===\n"); + { + const collectedUpdates: string[] = []; + const client: Client = { + requestPermission: async () => ({ outcome: "approved" }), + sessionUpdate: async (notification: SessionNotification) => { + const u = notification.update; + if ( + u.sessionUpdate === "agent_message_chunk" && + u.content.type === "text" + ) { + collectedUpdates.push(u.content.text); + } + }, + }; + + const { process: agentProcess, connection } = await startAgent( + agentCommand, + client, + ); + + try { + await connection.initialize({ + protocolVersion: 1, + clientInfo: { name: "load-replay-client", version: "1.0.0" }, + }); + + const session = await connection.newSession({ + cwd: tmpDir, + mcpServers: [], + }); + sessionId = session.sessionId; + console.log("Created session:", sessionId); + + console.log('Sending prompt: "say hello in one sentence"'); + const result = await connection.prompt({ + sessionId, + prompt: [{ type: "text", text: "say hello in one sentence" }], + }); + console.log("Prompt result:", result.stopReason); + console.log("Agent replied:", collectedUpdates.join("").slice(0, 100)); + } finally { + agentProcess.kill(); + } + } + + if (!sessionId) { + throw new Error("No session ID from phase 1"); + } + + // --- Phase 2: Load the session and verify history replay --- + console.log("\n=== Phase 2: Load session and verify replay ===\n"); + { + const replayLog: Array<{ + type: string; + text?: string; + toolCallId?: string; + status?: string; + }> = []; + + const client: Client = { + requestPermission: async () => ({ outcome: "approved" }), + sessionUpdate: async (notification: SessionNotification) => { + const u = notification.update; + switch (u.sessionUpdate) { + case "user_message_chunk": + replayLog.push({ + type: "user_message_chunk", + text: u.content.type === "text" ? u.content.text : undefined, + }); + break; + case "agent_message_chunk": + replayLog.push({ + type: "agent_message_chunk", + text: u.content.type === "text" ? u.content.text : undefined, + }); + break; + case "agent_thought_chunk": + replayLog.push({ + type: "agent_thought_chunk", + text: u.content.type === "text" ? u.content.text : undefined, + }); + break; + case "tool_call": + replayLog.push({ + type: "tool_call", + toolCallId: u.toolCallId, + status: u.status, + }); + break; + case "tool_call_update": + replayLog.push({ + type: "tool_call_update", + toolCallId: u.toolCallId, + status: u.status ?? undefined, + }); + break; + // Ignore other update types (available_commands_update, etc.) + } + }, + }; + + const { process: agentProcess, connection } = await startAgent( + agentCommand, + client, + ); + + try { + await connection.initialize({ + protocolVersion: 1, + clientInfo: { name: "load-replay-client", version: "1.0.0" }, + }); + + console.log("Loading session:", sessionId); + const loadResult = await connection.loadSession({ + sessionId, + cwd: tmpDir, + mcpServers: [], + }); + console.log("Load result modes:", loadResult.modes?.currentModeId); + + // Print the replayed history + console.log("\n--- Replayed conversation history ---"); + for (const entry of replayLog) { + if (entry.type === "user_message_chunk") { + console.log(`[USER] ${entry.text}`); + } else if (entry.type === "agent_message_chunk") { + console.log(`[AGENT] ${entry.text}`); + } else if (entry.type === "agent_thought_chunk") { + console.log(`[THINK] ${entry.text}`); + } else if (entry.type === "tool_call") { + console.log( + `[TOOL_START] id=${entry.toolCallId} status=${entry.status}`, + ); + } else if (entry.type === "tool_call_update") { + console.log( + `[TOOL_UPDATE] id=${entry.toolCallId} status=${entry.status}`, + ); + } + } + console.log("--- End of replay ---\n"); + + // Verify we got user_message_chunk and agent_message_chunk + const userChunks = replayLog.filter( + (e) => e.type === "user_message_chunk", + ); + const agentChunks = replayLog.filter( + (e) => e.type === "agent_message_chunk", + ); + + if (userChunks.length === 0) { + throw new Error( + "No user_message_chunk replayed — conversation history replay failed!", + ); + } + if (agentChunks.length === 0) { + throw new Error( + "No agent_message_chunk replayed — conversation history replay failed!", + ); + } + + console.log( + `Replay verified: ${userChunks.length} user chunks, ${agentChunks.length} agent chunks`, + ); + + // Also verify we can continue the conversation after loading + console.log('\nSending follow-up prompt: "what did I just ask you?"'); + const followUpResult = await connection.prompt({ + sessionId, + prompt: [{ type: "text", text: "what did I just ask you?" }], + }); + console.log("Follow-up result:", followUpResult.stopReason); + + console.log("\nLoad-replay example completed successfully!"); + } finally { + agentProcess.kill(); + } + } + + // Cleanup + if (fs.existsSync(tmpDir)) { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } +} + +function startAgent( + command: string[], + client: Client, +): Promise<{ + process: ReturnType; + connection: ClientSideConnection; +}> { + return new Promise((resolve, reject) => { + const agentProcess = spawn(command[0], [...command.slice(1)], { + stdio: ["pipe", "pipe", "inherit"], + env: { ...process.env, NODE_ENV: "integration-test" }, + }); + + const stdin = Writable.toWeb( + agentProcess.stdin!, + ) as WritableStream; + const stdout = Readable.toWeb( + agentProcess.stdout!, + ) as ReadableStream; + + const stream = ndJsonStream(stdin, stdout); + const connection = new ClientSideConnection(() => client, stream); + + agentProcess.on("error", reject); + + // Give the process a moment to start + setTimeout(() => resolve({ process: agentProcess, connection }), 500); + }); +} + +runLoadReplayExample().catch(console.error); diff --git a/packages/code/src/acp/agent.ts b/packages/code/src/acp/agent.ts index 38688842..eccdb283 100644 --- a/packages/code/src/acp/agent.ts +++ b/packages/code/src/acp/agent.ts @@ -17,6 +17,10 @@ import { ASK_USER_QUESTION_TOOL_NAME, AskUserQuestion, AskUserQuestionOption, + type Message, + type TextBlock, + type ReasoningBlock, + type ToolBlock, } from "wave-agent-sdk"; import { logger } from "../utils/logger.js"; import { @@ -262,6 +266,9 @@ export class WaveAcpAgent implements AcpAgent { logger.info(`Loading session: ${sessionId} in ${cwd}`); const agent = await this.createAgent(sessionId, cwd, mcpServers); + // Replay conversation history via session/update notifications per ACP spec + await this.replayConversationHistory(agent); + return { modes: this.getSessionModeState(agent), configOptions: this.getSessionConfigOptions(agent), @@ -799,6 +806,135 @@ export class WaveAcpAgent implements AcpAgent { } } + private async replayConversationHistory(agent: WaveAgent): Promise { + const sessionId = agent.sessionId as AcpSessionId; + + let history: Message[]; + try { + const thread = await agent.getFullMessageThread(); + history = thread.messages; + } catch { + // Fallback to in-memory messages if full thread fails + history = agent.messages; + } + + for (const message of history) { + if (message.isMeta) continue; + + const messageId = message.id; + + for (const block of message.blocks) { + if (block.type === "text") { + const textBlock = block as TextBlock; + const update = + message.role === "user" + ? { + sessionUpdate: "user_message_chunk" as const, + content: { type: "text" as const, text: textBlock.content }, + messageId, + } + : { + sessionUpdate: "agent_message_chunk" as const, + content: { type: "text" as const, text: textBlock.content }, + messageId, + }; + this.connection.sessionUpdate({ sessionId, update }); + } else if (block.type === "reasoning") { + const reasoningBlock = block as ReasoningBlock; + this.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: reasoningBlock.content }, + messageId, + }, + }); + } else if (block.type === "tool") { + const toolBlock = block as ToolBlock; + const toolCallId = + toolBlock.id || + "replay-" + Math.random().toString(36).substring(2, 9); + const effectiveName = toolBlock.name || "Tool"; + const effectiveCompactParams = toolBlock.compactParams; + + const displayTitle = + effectiveName && effectiveCompactParams + ? `${effectiveName}: ${effectiveCompactParams}` + : effectiveName; + + let parsedParameters: Record | undefined; + if (toolBlock.parameters) { + try { + const parsed = JSON.parse(toolBlock.parameters); + parsedParameters = Array.isArray(parsed) + ? { args: parsed } + : parsed; + } catch { + // Ignore parse errors + } + } + + const content = + effectiveName && (parsedParameters || toolBlock.shortResult) + ? this.getToolContent( + effectiveName, + parsedParameters, + toolBlock.shortResult, + ) + : undefined; + const locations = + effectiveName && parsedParameters + ? this.getToolLocations(effectiveName, parsedParameters) + : undefined; + const kind = effectiveName + ? this.getToolKind(effectiveName) + : undefined; + + // Emit tool_call (creation) + this.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId, + title: displayTitle, + status: "pending", + content, + locations, + kind, + rawInput: parsedParameters, + }, + }); + + // Emit tool_call_update with final status + const status: ToolCallStatus = + toolBlock.stage === "end" + ? toolBlock.success + ? "completed" + : "failed" + : toolBlock.stage === "running" + ? "in_progress" + : "pending"; + + this.connection.sessionUpdate({ + sessionId, + update: { + sessionUpdate: "tool_call_update", + toolCallId, + status, + title: displayTitle, + rawOutput: toolBlock.result || toolBlock.error, + content, + locations, + kind, + rawInput: parsedParameters, + }, + }); + } + // Skip: image, bang, compact, error, file_history, task_notification blocks + } + } + } + private createCallbacks(sessionId: string): AgentOptions["callbacks"] { const getAgent = () => this.agents.get(sessionId); const toolStates = new Map< diff --git a/packages/code/tests/acp/agent.test.ts b/packages/code/tests/acp/agent.test.ts index c98b4c27..0a41f2ed 100644 --- a/packages/code/tests/acp/agent.test.ts +++ b/packages/code/tests/acp/agent.test.ts @@ -134,6 +134,7 @@ describe("WaveAcpAgent", () => { handler: vi.fn(), }, ]), + getFullMessageThread: vi.fn().mockResolvedValue({ messages: [] }), }; vi.mocked(WaveAgent.create).mockResolvedValue( mockWaveAgent as unknown as WaveAgent, @@ -164,6 +165,174 @@ describe("WaveAcpAgent", () => { }); }); + it("should replay conversation history when loading a session", async () => { + const mockWaveAgent = { + sessionId: "replay-session-id", + destroy: vi.fn(), + getPermissionMode: vi.fn().mockReturnValue("default"), + getConfiguredModels: vi.fn().mockReturnValue(["test-model"]), + getModelConfig: vi.fn().mockReturnValue({ model: "test-model" }), + getSlashCommands: vi.fn().mockReturnValue([]), + getFullMessageThread: vi.fn().mockResolvedValue({ + messages: [ + { + id: "msg-1", + role: "user", + blocks: [{ type: "text", content: "Hello" }], + timestamp: new Date().toISOString(), + }, + { + id: "msg-2", + role: "assistant", + blocks: [ + { type: "reasoning", content: "Thinking..." }, + { type: "text", content: "Hi there!" }, + { + type: "tool", + id: "tool-1", + name: "Read", + compactParams: "file.txt", + stage: "end", + success: true, + parameters: JSON.stringify({ file_path: "/test/file.txt" }), + result: "file contents", + shortResult: "file contents", + }, + ], + timestamp: new Date().toISOString(), + }, + { + id: "msg-3", + role: "user", + blocks: [{ type: "text", content: "Thanks" }], + timestamp: new Date().toISOString(), + isMeta: true, + }, + ], + }), + }; + vi.mocked(WaveAgent.create).mockResolvedValue( + mockWaveAgent as unknown as WaveAgent, + ); + + await agent.loadSession({ + sessionId: "replay-session-id", + cwd: "/test", + mcpServers: [], + }); + + // Should emit user_message_chunk for user message + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "replay-session-id", + update: expect.objectContaining({ + sessionUpdate: "user_message_chunk", + content: { type: "text", text: "Hello" }, + messageId: "msg-1", + }), + }), + ); + + // Should emit agent_thought_chunk for reasoning block + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "replay-session-id", + update: expect.objectContaining({ + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "Thinking..." }, + messageId: "msg-2", + }), + }), + ); + + // Should emit agent_message_chunk for text block + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "replay-session-id", + update: expect.objectContaining({ + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Hi there!" }, + messageId: "msg-2", + }), + }), + ); + + // Should emit tool_call then tool_call_update for tool block + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "replay-session-id", + update: expect.objectContaining({ + sessionUpdate: "tool_call", + toolCallId: "tool-1", + title: "Read: file.txt", + status: "pending", + }), + }), + ); + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "replay-session-id", + update: expect.objectContaining({ + sessionUpdate: "tool_call_update", + toolCallId: "tool-1", + title: "Read: file.txt", + status: "completed", + }), + }), + ); + + // Should NOT emit anything for isMeta messages + const allCalls = vi.mocked(mockConnection.sessionUpdate).mock.calls; + for (const call of allCalls) { + if ( + "messageId" in (call[0] as { update: { messageId?: string } }).update + ) { + expect( + (call[0] as { update: { messageId?: string } }).update.messageId, + ).not.toBe("msg-3"); + } + } + }); + + it("should fallback to agent.messages when getFullMessageThread fails", async () => { + const mockWaveAgent = { + sessionId: "fallback-session-id", + destroy: vi.fn(), + getPermissionMode: vi.fn().mockReturnValue("default"), + getConfiguredModels: vi.fn().mockReturnValue(["test-model"]), + getModelConfig: vi.fn().mockReturnValue({ model: "test-model" }), + getSlashCommands: vi.fn().mockReturnValue([]), + getFullMessageThread: vi.fn().mockRejectedValue(new Error("disk error")), + messages: [ + { + id: "msg-1", + role: "user", + blocks: [{ type: "text", content: "Fallback msg" }], + timestamp: new Date().toISOString(), + }, + ], + }; + vi.mocked(WaveAgent.create).mockResolvedValue( + mockWaveAgent as unknown as WaveAgent, + ); + + await agent.loadSession({ + sessionId: "fallback-session-id", + cwd: "/test", + mcpServers: [], + }); + + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "fallback-session-id", + update: expect.objectContaining({ + sessionUpdate: "user_message_chunk", + content: { type: "text", text: "Fallback msg" }, + }), + }), + ); + }); + it("should list all sessions when cwd is not provided", async () => { const { listAllSessions: listAllWaveSessions, From 0688ee2f169ca087dbe42ea5cc1fd75e2478a034 Mon Sep 17 00:00:00 2001 From: lewis617 Date: Thu, 4 Jun 2026 11:30:44 +0800 Subject: [PATCH 2/2] test: add edge-case tests for replayConversationHistory to fix coverage Covers: tool without id, failed tool, tool without name, unparseable parameters, running stage, and skipped block types. --- packages/code/tests/acp/agent.test.ts | 165 ++++++++++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/packages/code/tests/acp/agent.test.ts b/packages/code/tests/acp/agent.test.ts index 0a41f2ed..36c14a5f 100644 --- a/packages/code/tests/acp/agent.test.ts +++ b/packages/code/tests/acp/agent.test.ts @@ -333,6 +333,171 @@ describe("WaveAcpAgent", () => { ); }); + it("should replay tool blocks with various edge cases", async () => { + const mockWaveAgent = { + sessionId: "edge-case-session", + destroy: vi.fn(), + getPermissionMode: vi.fn().mockReturnValue("default"), + getConfiguredModels: vi.fn().mockReturnValue(["test-model"]), + getModelConfig: vi.fn().mockReturnValue({ model: "test-model" }), + getSlashCommands: vi.fn().mockReturnValue([]), + getFullMessageThread: vi.fn().mockResolvedValue({ + messages: [ + { + id: "msg-tool-edges", + role: "assistant", + blocks: [ + // Tool without id — should generate a replay- prefixed id + { + type: "tool", + name: "Bash", + stage: "running", + parameters: JSON.stringify({ command: "ls" }), + }, + // Tool with failed stage + { + type: "tool", + id: "tool-fail", + name: "Edit", + compactParams: "file.ts", + stage: "end", + success: false, + error: "edit failed", + parameters: JSON.stringify({ + file_path: "/test/file.ts", + old_string: "a", + new_string: "b", + }), + }, + // Tool without name — should use "Tool" fallback + { + type: "tool", + id: "tool-no-name", + stage: "end", + success: true, + result: "ok", + }, + // Tool with unparseable parameters + { + type: "tool", + id: "tool-bad-json", + name: "Bash", + stage: "end", + success: true, + parameters: "not-json", + result: "done", + }, + // Skipped block types (image, bang, compact, error, file_history, task_notification) + { type: "image", imageUrls: ["file.png"] }, + { + type: "bang", + command: "ls", + output: "", + stage: "end" as const, + exitCode: 0, + }, + { type: "compact", content: "summary", sessionId: "old" }, + { type: "error", content: "err" }, + ], + timestamp: new Date().toISOString(), + }, + ], + }), + }; + vi.mocked(WaveAgent.create).mockResolvedValue( + mockWaveAgent as unknown as WaveAgent, + ); + + await agent.loadSession({ + sessionId: "edge-case-session", + cwd: "/test", + mcpServers: [], + }); + + // Tool without id — generated toolCallId starts with "replay-" + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: "tool_call", + toolCallId: expect.stringMatching(/^replay-/), + title: "Bash", + status: "pending", + }), + }), + ); + // Running stage → in_progress in update + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: "tool_call_update", + toolCallId: expect.stringMatching(/^replay-/), + status: "in_progress", + }), + }), + ); + + // Failed tool + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: "tool_call", + toolCallId: "tool-fail", + title: "Edit: file.ts", + status: "pending", + }), + }), + ); + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: "tool_call_update", + toolCallId: "tool-fail", + status: "failed", + rawOutput: "edit failed", + }), + }), + ); + + // Tool without name — uses "Tool" fallback, no compactParams → title is just "Tool" + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: "tool_call", + toolCallId: "tool-no-name", + title: "Tool", + }), + }), + ); + + // Tool with bad JSON parameters — parse error caught, parsedParameters undefined + expect(mockConnection.sessionUpdate).toHaveBeenCalledWith( + expect.objectContaining({ + update: expect.objectContaining({ + sessionUpdate: "tool_call", + toolCallId: "tool-bad-json", + rawInput: undefined, + }), + }), + ); + + // Skipped block types should not produce any session updates with these ids + const allCalls = vi.mocked(mockConnection.sessionUpdate).mock.calls; + const allUpdates = allCalls.map( + (c) => (c[0] as { update: { sessionUpdate: string } }).update, + ); + // No extra tool_call or message_chunk for skipped types + const skippedTypes = allUpdates.filter( + (u) => + u.sessionUpdate !== "tool_call" && + u.sessionUpdate !== "tool_call_update" && + u.sessionUpdate !== "user_message_chunk" && + u.sessionUpdate !== "agent_message_chunk" && + u.sessionUpdate !== "agent_thought_chunk" && + u.sessionUpdate !== "available_commands_update", + ); + expect(skippedTypes).toHaveLength(0); + }); + it("should list all sessions when cwd is not provided", async () => { const { listAllSessions: listAllWaveSessions,