From edb4ecf7e429aa6befc0c411bef8f955e004a289 Mon Sep 17 00:00:00 2001 From: dorianzheng Date: Fri, 24 Apr 2026 13:07:04 +0800 Subject: [PATCH 1/3] feat: add streaming partial content events for token-by-token output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds four new AgentLite events that enable the Dune UI to render agent responses token-by-token as they arrive from the Claude API: - run.partial_content — text token delta (content_block_delta/text_delta) - run.partial_tool_call — tool arg JSON delta (input_json_delta) - run.partial_content.done — stream completed normally (message_stop) - run.partial_content.interrupted — stream cut short (container crash/exit) includePartialMessages:true was already set; this extracts the stream_event SDK messages and surfaces them as typed events. Tracks in-flight jids via streamingJids set; drains all on container error or stopped state. Co-Authored-By: Claude Sonnet 4.6 --- src/agent/message-processor.ts | 63 +++++++++++++++++++++++++++++++++- src/api/events.ts | 43 +++++++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/src/agent/message-processor.ts b/src/agent/message-processor.ts index bf9ceb8ca04..c0747d53c78 100644 --- a/src/agent/message-processor.ts +++ b/src/agent/message-processor.ts @@ -43,6 +43,8 @@ export class MessageProcessor { private messageLoopRunning = false; private _messageLoopPromise: Promise | null = null; private _wakeLoop: (() => void) | null = null; + /** jids that currently have an in-flight streaming turn. */ + private streamingJids = new Set(); constructor( private readonly ctx: AgentContext, @@ -51,6 +53,19 @@ export class MessageProcessor { private readonly taskMgr: TaskManager, ) {} + /** Emit run.partial_content.interrupted for all in-flight jids and clear the set. */ + private drainStreamingJids(reason: string, now: string): void { + for (const jid of this.streamingJids) { + this.ctx.emit('run.partial_content.interrupted', { + agentId: this.ctx.id, + jid, + reason, + timestamp: now, + }); + } + this.streamingJids.clear(); + } + /** Start the message polling loop. Returns promise that resolves when stopped. */ start(): Promise { this._messageLoopPromise = this.startMessageLoop().catch((err) => { @@ -165,17 +180,22 @@ export class MessageProcessor { async (event) => { // ── Container lifecycle events ──────────────────────── if (event.type === 'state') { + const stateNow = new Date().toISOString(); this.ctx.emit('run.state', { agentId: this.ctx.id, jid: chatJid, name: group.name, folder: group.folder, state: event.state, - timestamp: new Date().toISOString(), + timestamp: stateNow, reason: event.reason, exitCode: event.exitCode, }); if (event.state === 'idle') this.ctx.queue.notifyIdle(chatJid); + // Drain any in-flight streaming turns on container exit (covers SIGKILL/ungraceful exits) + if (event.state === 'stopped' && this.streamingJids.size > 0) { + this.drainStreamingJids('container_exit', stateNow); + } return; } @@ -204,6 +224,9 @@ export class MessageProcessor { if (event.type === 'error') { hadError = true; + if (this.streamingJids.size > 0) { + this.drainStreamingJids('container_error', new Date().toISOString()); + } return; } @@ -222,6 +245,44 @@ export class MessageProcessor { timestamp: now, }); + // Extract partial content events from stream_event SDK messages + if (event.sdkType === 'stream_event') { + const streamEvent = msg?.event; // BetaRawMessageStreamEvent — correct field + if (streamEvent?.type === 'content_block_delta') { + if (streamEvent.delta?.type === 'text_delta') { + this.ctx.emit('run.partial_content', { + agentId: this.ctx.id, + jid: chatJid, + delta: streamEvent.delta.text, + ...(streamEvent.index !== undefined && { + contentBlockIndex: streamEvent.index, + }), + timestamp: now, + }); + this.streamingJids.add(chatJid); + } else if (streamEvent.delta?.type === 'input_json_delta') { + this.ctx.emit('run.partial_tool_call', { + agentId: this.ctx.id, + jid: chatJid, + jsonDelta: streamEvent.delta.partial_json, + ...(streamEvent.index !== undefined && { + contentBlockIndex: streamEvent.index, + }), + timestamp: now, + }); + this.streamingJids.add(chatJid); + } + } + if (streamEvent?.type === 'message_stop') { + this.ctx.emit('run.partial_content.done', { + agentId: this.ctx.id, + jid: chatJid, + timestamp: now, + }); + this.streamingJids.delete(chatJid); + } + } + // Derive curated convenience events from SDK messages if (event.sdkType === 'assistant' && msg?.message?.content) { for (const block of msg.message.content) { diff --git a/src/api/events.ts b/src/api/events.ts index 701b7e8dd87..786d1be6e09 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -10,6 +10,10 @@ export interface AgentEvents extends Record { 'message.out': [payload: MessageOutEvent]; 'run.state': [payload: RunStateEvent]; 'run.sdk_message': [payload: RunSdkMessageEvent]; + 'run.partial_content': [payload: AgentPartialContentEvent]; + 'run.partial_tool_call': [payload: AgentPartialToolCallEvent]; + 'run.partial_content.done': [payload: AgentPartialContentDoneEvent]; + 'run.partial_content.interrupted': [payload: AgentPartialContentInterruptedEvent]; 'run.tool': [payload: RunToolEvent]; 'run.tool_progress': [payload: RunToolProgressEvent]; 'run.subagent': [payload: RunSubagentEvent]; @@ -291,6 +295,45 @@ export interface TaskRunFailedEvent { timestamp: string; } +// ── Partial content streaming events ──────────────────────────── + +/** Emitted for each text token delta during streaming. */ +export interface AgentPartialContentEvent { + agentId: string; + jid: string; + /** Incremental text delta (not cumulative). */ + delta: string; + /** Index of the content block being streamed (undefined if unavailable). */ + contentBlockIndex?: number; + timestamp: string; +} + +/** Emitted for each chunk of tool-call argument JSON being built. */ +export interface AgentPartialToolCallEvent { + agentId: string; + jid: string; + /** Incremental JSON delta for the tool call arguments. */ + jsonDelta: string; + /** Index of the tool-use content block (undefined if unavailable). */ + contentBlockIndex?: number; + timestamp: string; +} + +/** Emitted when a streaming turn completes normally (message_stop received). */ +export interface AgentPartialContentDoneEvent { + agentId: string; + jid: string; + timestamp: string; +} + +/** Emitted when a streaming turn is interrupted (container crash, timeout, SIGKILL). */ +export interface AgentPartialContentInterruptedEvent { + agentId: string; + jid: string; + reason: string; + timestamp: string; +} + /** A due fire was dropped without executing. */ export interface TaskRunSkippedEvent { agentId: string; From 2438c71a7773a71157fc925bd480e2b01ca44c42 Mon Sep 17 00:00:00 2001 From: dorianzheng Date: Fri, 24 Apr 2026 13:11:55 +0800 Subject: [PATCH 2/3] test: add streaming-events.test.ts with 8 test cases for partial content events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Covers all four new event types (run.partial_content, run.partial_tool_call, run.partial_content.done, run.partial_content.interrupted): 1. text_delta → run.partial_content with delta, contentBlockIndex, agentId, jid, timestamp 2. input_json_delta → run.partial_tool_call with jsonDelta 3. non-delta stream events (message_start) → no partial events emitted 4. message_stop → run.partial_content.done 5. container error during streaming → run.partial_content.interrupted (container_error) 6. container stopped during streaming → run.partial_content.interrupted (container_exit) 7. contentBlockIndex omitted when streamEvent.index is undefined 8. multiple sequential text_deltas → one event per delta + one done event Co-Authored-By: Claude Sonnet 4.6 --- src/streaming-events.test.ts | 379 +++++++++++++++++++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 src/streaming-events.test.ts diff --git a/src/streaming-events.test.ts b/src/streaming-events.test.ts new file mode 100644 index 00000000000..bbc570a6e2d --- /dev/null +++ b/src/streaming-events.test.ts @@ -0,0 +1,379 @@ +/** + * Tests for run.partial_content streaming events. + * These events are derived from stream_event SDK messages and enable + * the Dune UI to render agent responses token-by-token as they arrive. + */ +import fs from 'fs'; +import os from 'os'; +import path from 'path'; + +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +vi.mock('./container-runner.js', async () => { + const actual = await vi.importActual( + './container-runner.js', + ); + return { + ...actual, + runContainerAgent: vi.fn(), + }; +}); + +import { AgentImpl } from './agent/agent-impl.js'; +import { + buildAgentConfig, + resolveSerializableAgentSettings, +} from './agent/config.js'; +import { _initTestDatabase, AgentDb } from './db.js'; +import { buildRuntimeConfig } from './runtime-config.js'; +import { runContainerAgent } from './container-runner.js'; +import type { + AgentPartialContentEvent, + AgentPartialToolCallEvent, + AgentPartialContentDoneEvent, + AgentPartialContentInterruptedEvent, +} from './api/events.js'; +import type { Channel, RegisteredGroup } from './types.js'; + +const runtimeConfig = buildRuntimeConfig( + { timezone: 'UTC' }, + '/tmp/agentlite-streaming-test-pkg', +); + +const MAIN_GROUP: RegisteredGroup = { + name: 'Main', + folder: 'main', + trigger: 'always', + added_at: '2024-01-01T00:00:00.000Z', + isMain: true, +}; + +let tmpDir: string; +let db: AgentDb; + +function createAgent(name: string): AgentImpl { + const config = buildAgentConfig({ + agentId: `${name}00000000`.slice(0, 8), + ...resolveSerializableAgentSettings( + name, + { workdir: path.join(tmpDir, 'agents', name) }, + tmpDir, + ), + }); + return new AgentImpl(config, runtimeConfig); +} + +function createMockChannel(): Channel { + return { + name: 'mock', + async connect(): Promise {}, + async disconnect(): Promise {}, + async sendMessage(): Promise {}, + isConnected(): boolean { + return true; + }, + ownsJid(jid: string): boolean { + return jid === 'mock:stream'; + }, + async setTyping(): Promise {}, + }; +} + +function setupAgent(): AgentImpl { + const agent = createAgent('partial-test'); + agent._setDbForTests(db); + agent._setRegisteredGroups({ 'mock:stream': MAIN_GROUP }); + (agent as unknown as { _started: boolean })._started = true; + const channel = createMockChannel(); + (agent as unknown as { channels: Map }).channels.set( + 'mock', + channel, + ); + + db.storeChatMetadata( + 'mock:stream', + '2026-04-13T00:00:00.000Z', + 'Partial Content Test Chat', + ); + db.storeMessage({ + id: 'msg-1', + chat_jid: 'mock:stream', + sender: 'user1', + sender_name: 'User 1', + content: 'stream something', + timestamp: '2026-04-13T00:00:01.000Z', + is_from_me: false, + }); + + return agent; +} + +function streamEvent(event: unknown) { + return { type: 'sdk_message' as const, sdkType: 'stream_event', message: { event } }; +} + +function stoppedState() { + return { type: 'state' as const, state: 'stopped' as const, reason: 'exit', exitCode: 0 }; +} + +describe('run.partial_content (text token deltas)', () => { + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-partial-')); + db = _initTestDatabase(); + vi.mocked(runContainerAgent).mockReset(); + }); + + afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } + }); + + it('emits run.partial_content for text_delta with correct payload', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 2, + delta: { type: 'text_delta', text: 'Hello' }, + }), + ); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const events: AgentPartialContentEvent[] = []; + agent.on('run.partial_content', (evt) => events.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(events).toHaveLength(1); + expect(events[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + delta: 'Hello', + contentBlockIndex: 2, + }); + expect(typeof events[0].timestamp).toBe('string'); + }); + + it('emits run.partial_tool_call for input_json_delta with correct payload', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 1, + delta: { type: 'input_json_delta', partial_json: '{"cmd":' }, + }), + ); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const events: AgentPartialToolCallEvent[] = []; + agent.on('run.partial_tool_call', (evt) => events.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(events).toHaveLength(1); + expect(events[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + jsonDelta: '{"cmd":', + contentBlockIndex: 1, + }); + }); + + it('does not emit run.partial_content for non-delta stream events (message_start)', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ type: 'message_start', message: { id: 'msg-x' } }), + ); + await onOutput?.( + streamEvent({ type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }), + ); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const partialEvents: AgentPartialContentEvent[] = []; + const toolCallEvents: AgentPartialToolCallEvent[] = []; + agent.on('run.partial_content', (evt) => partialEvents.push(evt)); + agent.on('run.partial_tool_call', (evt) => toolCallEvents.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(partialEvents).toHaveLength(0); + expect(toolCallEvents).toHaveLength(0); + }); + + it('emits run.partial_content.done on message_stop', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'token' }, + }), + ); + await onOutput?.(streamEvent({ type: 'message_stop' })); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const doneEvents: AgentPartialContentDoneEvent[] = []; + agent.on('run.partial_content.done', (evt) => doneEvents.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(doneEvents).toHaveLength(1); + expect(doneEvents[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + }); + expect(typeof doneEvents[0].timestamp).toBe('string'); + }); + + it('emits run.partial_content.interrupted with container_error reason on container error', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'partial' }, + }), + ); + await onOutput?.({ type: 'error', error: 'container crashed' }); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const interruptedEvents: AgentPartialContentInterruptedEvent[] = []; + agent.on('run.partial_content.interrupted', (evt) => + interruptedEvents.push(evt), + ); + + await agent.processGroupMessages('mock:stream'); + + expect(interruptedEvents).toHaveLength(1); + expect(interruptedEvents[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + reason: 'container_error', + }); + }); + + it('emits run.partial_content.interrupted with container_exit reason on stopped state while streaming', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'mid-stream' }, + }), + ); + // Container stops abruptly without message_stop (SIGKILL / ungraceful exit) + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const interruptedEvents: AgentPartialContentInterruptedEvent[] = []; + agent.on('run.partial_content.interrupted', (evt) => + interruptedEvents.push(evt), + ); + + await agent.processGroupMessages('mock:stream'); + + expect(interruptedEvents).toHaveLength(1); + expect(interruptedEvents[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + reason: 'container_exit', + }); + }); + + it('omits contentBlockIndex when streamEvent.index is undefined', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + // delta event without an index field + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + delta: { type: 'text_delta', text: 'no-index' }, + }), + ); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const events: AgentPartialContentEvent[] = []; + agent.on('run.partial_content', (evt) => events.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(events).toHaveLength(1); + expect(events[0].delta).toBe('no-index'); + expect(Object.prototype.hasOwnProperty.call(events[0], 'contentBlockIndex')).toBe(false); + }); + + it('emits one run.partial_content per text_delta (multiple sequential deltas)', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + for (const text of ['Hello', ' ', 'world', '!']) { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text }, + }), + ); + } + await onOutput?.(streamEvent({ type: 'message_stop' })); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + const partialEvents: AgentPartialContentEvent[] = []; + const doneEvents: AgentPartialContentDoneEvent[] = []; + agent.on('run.partial_content', (evt) => partialEvents.push(evt)); + agent.on('run.partial_content.done', (evt) => doneEvents.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(partialEvents).toHaveLength(4); + expect(partialEvents.map((e) => e.delta)).toEqual(['Hello', ' ', 'world', '!']); + expect(doneEvents).toHaveLength(1); + }); +}); From bb2d5578ad07cf1d28b780a8c5146bdc3dcd7f13 Mon Sep 17 00:00:00 2001 From: dorianzheng Date: Fri, 24 Apr 2026 14:01:17 +0800 Subject: [PATCH 3/3] =?UTF-8?q?feat:=20add=20ring-buffer=20reconnection=20?= =?UTF-8?q?(design=20=C2=A74)=20+=20fix=20done=20guard=20+=20tests=206-8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add per-jid in-memory ring buffer (last 2 KB) in MessageProcessor - appendToRingBuffer: appends text/tool deltas; marks evicted when >2 KB - resumePartialContent: replays buffered deltas on reconnect, or emits run.partial_content.interrupted (reason: buffer_evicted) when stream completed normally or buffer overflowed - Fix run.partial_content.done guard: only emit if streamingJids.has(chatJid) - Clear ring buffer on message_stop (stream completed normally) - Expose resumePartialContent via AgentImpl.resumePartialContent - Add test 6: ring-buffer replay replays deltas in order on reconnect - Add test 7: buffer_evicted emitted when stream completed before reconnect - Add test 8: two concurrent jids each get own events; drain interrupts both Co-Authored-By: Claude Sonnet 4.6 --- src/agent/agent-impl.ts | 5 + src/agent/message-processor.ts | 70 +++++++++++-- src/streaming-events.test.ts | 184 +++++++++++++++++++++++++++++++++ 3 files changed, 253 insertions(+), 6 deletions(-) diff --git a/src/agent/agent-impl.ts b/src/agent/agent-impl.ts index 00c40ae67f2..f6db968f89c 100644 --- a/src/agent/agent-impl.ts +++ b/src/agent/agent-impl.ts @@ -588,6 +588,11 @@ export class AgentImpl return this.messageMgr.processGroupMessages(chatJid); } + /** @internal — delegates to MessageProcessor. Used by tests. */ + resumePartialContent(jid: string): void { + this.messageMgr.resumePartialContent(jid); + } + /** @internal — delegates to ChannelManager. */ async sendOutboundMessage( jid: string, diff --git a/src/agent/message-processor.ts b/src/agent/message-processor.ts index c0747d53c78..8dd88abfc4d 100644 --- a/src/agent/message-processor.ts +++ b/src/agent/message-processor.ts @@ -46,6 +46,13 @@ export class MessageProcessor { /** jids that currently have an in-flight streaming turn. */ private streamingJids = new Set(); + /** Per-jid ring buffer for reconnection replay (last 2 KB of text deltas). */ + private streamingRingBuffer = new Map< + string, + { deltas: string[]; totalBytes: number; evicted: boolean } + >(); + private static readonly RING_BUFFER_SIZE_BYTES = 2048; + constructor( private readonly ctx: AgentContext, private readonly channelMgr: ChannelManager, @@ -66,6 +73,51 @@ export class MessageProcessor { this.streamingJids.clear(); } + /** Append a text delta to the per-jid ring buffer, trimming oldest entries to stay within 2 KB. */ + private appendToRingBuffer(jid: string, delta: string): void { + let buf = this.streamingRingBuffer.get(jid); + if (!buf) { + buf = { deltas: [], totalBytes: 0, evicted: false }; + this.streamingRingBuffer.set(jid, buf); + } + buf.deltas.push(delta); + buf.totalBytes += delta.length; + if (buf.totalBytes > MessageProcessor.RING_BUFFER_SIZE_BYTES) { + buf.evicted = true; + while (buf.totalBytes > MessageProcessor.RING_BUFFER_SIZE_BYTES && buf.deltas.length > 0) { + buf.totalBytes -= buf.deltas[0].length; + buf.deltas.shift(); + } + } + } + + /** + * Replay buffered deltas for a jid that reconnected mid-stream. + * Emits run.partial_content for each buffered delta, or run.partial_content.interrupted + * (reason: 'buffer_evicted') if the buffer was overflowed or the stream already completed. + */ + resumePartialContent(jid: string): void { + const buf = this.streamingRingBuffer.get(jid); + const now = new Date().toISOString(); + if (!buf || buf.evicted || buf.deltas.length === 0) { + this.ctx.emit('run.partial_content.interrupted', { + agentId: this.ctx.id, + jid, + reason: 'buffer_evicted', + timestamp: now, + }); + return; + } + for (const delta of buf.deltas) { + this.ctx.emit('run.partial_content', { + agentId: this.ctx.id, + jid, + delta, + timestamp: now, + }); + } + } + /** Start the message polling loop. Returns promise that resolves when stopped. */ start(): Promise { this._messageLoopPromise = this.startMessageLoop().catch((err) => { @@ -259,6 +311,7 @@ export class MessageProcessor { }), timestamp: now, }); + this.appendToRingBuffer(chatJid, streamEvent.delta.text); this.streamingJids.add(chatJid); } else if (streamEvent.delta?.type === 'input_json_delta') { this.ctx.emit('run.partial_tool_call', { @@ -270,16 +323,21 @@ export class MessageProcessor { }), timestamp: now, }); + this.appendToRingBuffer(chatJid, streamEvent.delta.partial_json); this.streamingJids.add(chatJid); } } if (streamEvent?.type === 'message_stop') { - this.ctx.emit('run.partial_content.done', { - agentId: this.ctx.id, - jid: chatJid, - timestamp: now, - }); - this.streamingJids.delete(chatJid); + if (this.streamingJids.has(chatJid)) { + this.ctx.emit('run.partial_content.done', { + agentId: this.ctx.id, + jid: chatJid, + timestamp: now, + }); + this.streamingJids.delete(chatJid); + } + // Clear ring buffer — stream completed normally; no replay possible + this.streamingRingBuffer.delete(chatJid); } } diff --git a/src/streaming-events.test.ts b/src/streaming-events.test.ts index bbc570a6e2d..c8f6bab6262 100644 --- a/src/streaming-events.test.ts +++ b/src/streaming-events.test.ts @@ -376,4 +376,188 @@ describe('run.partial_content (text token deltas)', () => { expect(partialEvents.map((e) => e.delta)).toEqual(['Hello', ' ', 'world', '!']); expect(doneEvents).toHaveLength(1); }); + + // ── Reconnection tests (design §4, tests 6 & 7) ─────────────── + + it('replays buffered deltas in order on reconnect (ring-buffer replay, design test 6)', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'chunk1' }, + }), + ); + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'chunk2' }, + }), + ); + // Container exits without message_stop (mid-stream crash) + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + // Collect events during initial streaming + const streamingEvents: AgentPartialContentEvent[] = []; + agent.on('run.partial_content', (evt) => streamingEvents.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + // Verify initial streaming events + expect(streamingEvents).toHaveLength(2); + + // Simulate UI reconnect: clear streaming events and call resume + streamingEvents.length = 0; + agent.resumePartialContent('mock:stream'); + + // Ring buffer should replay the two buffered deltas in order + expect(streamingEvents).toHaveLength(2); + expect(streamingEvents[0].delta).toBe('chunk1'); + expect(streamingEvents[1].delta).toBe('chunk2'); + expect(streamingEvents[0].jid).toBe('mock:stream'); + }); + + it('emits buffer_evicted on reconnect after stream completed normally (design test 7)', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + streamEvent({ + type: 'content_block_delta', + index: 0, + delta: { type: 'text_delta', text: 'complete response' }, + }), + ); + // Stream completes normally — ring buffer is cleared on message_stop + await onOutput?.(streamEvent({ type: 'message_stop' })); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }, + ); + + await agent.processGroupMessages('mock:stream'); + + // Stream is done; buffer was cleared on message_stop + const interruptedEvents: AgentPartialContentInterruptedEvent[] = []; + agent.on('run.partial_content.interrupted', (evt) => interruptedEvents.push(evt)); + + // Reconnect after stream completed → buffer_evicted + agent.resumePartialContent('mock:stream'); + + expect(interruptedEvents).toHaveLength(1); + expect(interruptedEvents[0]).toMatchObject({ + jid: 'mock:stream', + reason: 'buffer_evicted', + }); + expect(typeof interruptedEvents[0].timestamp).toBe('string'); + }); + + // ── Concurrent streams (design test 8) ──────────────────────── + + it('two concurrent streams: each jid receives its own events and both get interrupted on drain', async () => { + const jidA = 'mock:stream-a'; + const jidB = 'mock:stream-b'; + + // Set up agent with two registered jids and matching channel + const agent = createAgent('concurrent-test'); + agent._setDbForTests(db); + const twoJidGroup: RegisteredGroup = { + ...MAIN_GROUP, + trigger: 'always', + }; + const groupB: RegisteredGroup = { + name: 'GroupB', + folder: 'group-b', + trigger: 'always', + added_at: '2024-01-01T00:00:00.000Z', + isMain: false, + requiresTrigger: false, + }; + agent._setRegisteredGroups({ [jidA]: twoJidGroup, [jidB]: groupB }); + (agent as unknown as { _started: boolean })._started = true; + + const twoJidChannel = { + name: 'mock-two', + async connect(): Promise {}, + async disconnect(): Promise {}, + async sendMessage(): Promise {}, + isConnected(): boolean { return true; }, + ownsJid(jid: string): boolean { return jid === jidA || jid === jidB; }, + async setTyping(): Promise {}, + }; + (agent as unknown as { channels: Map }).channels.set('mock-two', twoJidChannel); + + // Seed messages for both jids + db.storeChatMetadata(jidA, '2026-04-13T00:00:00.000Z', 'Chat A'); + db.storeMessage({ id: 'msg-a', chat_jid: jidA, sender: 'u1', sender_name: 'U1', content: 'go', timestamp: '2026-04-13T00:00:01.000Z', is_from_me: false }); + db.storeChatMetadata(jidB, '2026-04-13T00:00:00.000Z', 'Chat B'); + db.storeMessage({ id: 'msg-b', chat_jid: jidB, sender: 'u2', sender_name: 'U2', content: 'go', timestamp: '2026-04-13T00:00:01.000Z', is_from_me: false }); + + // Coordination: jidA waits until jidB has also emitted its delta before exiting + let resolveJidADeltaSent: () => void; + let resolveJidBDeltaSent: () => void; + const jidADeltaSent = new Promise((r) => { resolveJidADeltaSent = r; }); + const jidBDeltaSent = new Promise((r) => { resolveJidBDeltaSent = r; }); + + vi.mocked(runContainerAgent) + .mockImplementationOnce(async (_g, _i, _r, _op, onOutput) => { + // jidA emits its delta, signals jidB, then waits for jidB before stopping + await onOutput?.(streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'tokenA' } })); + resolveJidADeltaSent!(); + await jidBDeltaSent; // wait for jidB to have added itself to streamingJids + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }) + .mockImplementationOnce(async (_g, _i, _r, _op, onOutput) => { + // jidB waits for jidA's delta, then emits its own, then stops + await jidADeltaSent; + await onOutput?.(streamEvent({ type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'tokenB' } })); + resolveJidBDeltaSent!(); + await onOutput?.(stoppedState()); + return { status: 'success', result: null }; + }); + + // Track events per jid + const partialA: AgentPartialContentEvent[] = []; + const partialB: AgentPartialContentEvent[] = []; + const interruptedA: AgentPartialContentInterruptedEvent[] = []; + const interruptedB: AgentPartialContentInterruptedEvent[] = []; + + agent.on('run.partial_content', (evt) => { + if (evt.jid === jidA) partialA.push(evt); + else if (evt.jid === jidB) partialB.push(evt); + }); + agent.on('run.partial_content.interrupted', (evt) => { + if (evt.jid === jidA) interruptedA.push(evt); + else if (evt.jid === jidB) interruptedB.push(evt); + }); + + // Run both concurrently + await Promise.all([ + agent.processGroupMessages(jidA), + agent.processGroupMessages(jidB), + ]); + + // Each jid received only its own partial_content event + expect(partialA).toHaveLength(1); + expect(partialA[0].delta).toBe('tokenA'); + expect(partialA[0].jid).toBe(jidA); + expect(partialB).toHaveLength(1); + expect(partialB[0].delta).toBe('tokenB'); + expect(partialB[0].jid).toBe(jidB); + + // drainStreamingJids must have emitted interrupted for both jids + expect(interruptedA).toHaveLength(1); + expect(interruptedA[0].reason).toBe('container_exit'); + expect(interruptedB).toHaveLength(1); + expect(interruptedB[0].reason).toBe('container_exit'); + }); });