Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/agent/agent-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ export class AgentImpl
return this.messageMgr.processGroupMessages(chatJid);
}

/** @internal — delegates to MessageProcessor. Used by tests. */
resumePartialContent(jid: string): void {
this.messageMgr.resumePartialContent(jid);
}

/** @internal — delegates to ChannelManager. */
async sendOutboundMessage(
jid: string,
Expand Down
121 changes: 120 additions & 1 deletion src/agent/message-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ export class MessageProcessor {
private messageLoopRunning = false;
private _messageLoopPromise: Promise<void> | null = null;
private _wakeLoop: (() => void) | null = null;
/** jids that currently have an in-flight streaming turn. */
private streamingJids = new Set<string>();

/** Per-jid ring buffer for reconnection replay (last 2 KB of text deltas). */
private streamingRingBuffer = new Map<
string,
{ deltas: string[]; totalBytes: number; evicted: boolean }
>();
private static readonly RING_BUFFER_SIZE_BYTES = 2048;

constructor(
private readonly ctx: AgentContext,
Expand All @@ -51,6 +60,64 @@ export class MessageProcessor {
private readonly taskMgr: TaskManager,
) {}

/** Emit run.partial_content.interrupted for all in-flight jids and clear the set. */
private drainStreamingJids(reason: string, now: string): void {
for (const jid of this.streamingJids) {
this.ctx.emit('run.partial_content.interrupted', {
agentId: this.ctx.id,
jid,
reason,
timestamp: now,
});
}
this.streamingJids.clear();
}

/** Append a text delta to the per-jid ring buffer, trimming oldest entries to stay within 2 KB. */
private appendToRingBuffer(jid: string, delta: string): void {
let buf = this.streamingRingBuffer.get(jid);
if (!buf) {
buf = { deltas: [], totalBytes: 0, evicted: false };
this.streamingRingBuffer.set(jid, buf);
}
buf.deltas.push(delta);
buf.totalBytes += delta.length;
if (buf.totalBytes > MessageProcessor.RING_BUFFER_SIZE_BYTES) {
buf.evicted = true;
while (buf.totalBytes > MessageProcessor.RING_BUFFER_SIZE_BYTES && buf.deltas.length > 0) {
buf.totalBytes -= buf.deltas[0].length;
buf.deltas.shift();
}
}
}

/**
* Replay buffered deltas for a jid that reconnected mid-stream.
* Emits run.partial_content for each buffered delta, or run.partial_content.interrupted
* (reason: 'buffer_evicted') if the buffer was overflowed or the stream already completed.
*/
resumePartialContent(jid: string): void {
const buf = this.streamingRingBuffer.get(jid);
const now = new Date().toISOString();
if (!buf || buf.evicted || buf.deltas.length === 0) {
this.ctx.emit('run.partial_content.interrupted', {
agentId: this.ctx.id,
jid,
reason: 'buffer_evicted',
timestamp: now,
});
return;
}
for (const delta of buf.deltas) {
this.ctx.emit('run.partial_content', {
agentId: this.ctx.id,
jid,
delta,
timestamp: now,
});
}
}

/** Start the message polling loop. Returns promise that resolves when stopped. */
start(): Promise<void> {
this._messageLoopPromise = this.startMessageLoop().catch((err) => {
Expand Down Expand Up @@ -165,17 +232,22 @@ export class MessageProcessor {
async (event) => {
// ── Container lifecycle events ────────────────────────
if (event.type === 'state') {
const stateNow = new Date().toISOString();
this.ctx.emit('run.state', {
agentId: this.ctx.id,
jid: chatJid,
name: group.name,
folder: group.folder,
state: event.state,
timestamp: new Date().toISOString(),
timestamp: stateNow,
reason: event.reason,
exitCode: event.exitCode,
});
if (event.state === 'idle') this.ctx.queue.notifyIdle(chatJid);
// Drain any in-flight streaming turns on container exit (covers SIGKILL/ungraceful exits)
if (event.state === 'stopped' && this.streamingJids.size > 0) {
this.drainStreamingJids('container_exit', stateNow);
}
return;
}

Expand Down Expand Up @@ -204,6 +276,9 @@ export class MessageProcessor {

if (event.type === 'error') {
hadError = true;
if (this.streamingJids.size > 0) {
this.drainStreamingJids('container_error', new Date().toISOString());
}
return;
}

Expand All @@ -222,6 +297,50 @@ export class MessageProcessor {
timestamp: now,
});

// Extract partial content events from stream_event SDK messages
if (event.sdkType === 'stream_event') {
const streamEvent = msg?.event; // BetaRawMessageStreamEvent — correct field
if (streamEvent?.type === 'content_block_delta') {
if (streamEvent.delta?.type === 'text_delta') {
this.ctx.emit('run.partial_content', {
agentId: this.ctx.id,
jid: chatJid,
delta: streamEvent.delta.text,
...(streamEvent.index !== undefined && {
contentBlockIndex: streamEvent.index,
}),
timestamp: now,
});
this.appendToRingBuffer(chatJid, streamEvent.delta.text);
this.streamingJids.add(chatJid);
} else if (streamEvent.delta?.type === 'input_json_delta') {
this.ctx.emit('run.partial_tool_call', {
agentId: this.ctx.id,
jid: chatJid,
jsonDelta: streamEvent.delta.partial_json,
...(streamEvent.index !== undefined && {
contentBlockIndex: streamEvent.index,
}),
timestamp: now,
});
this.appendToRingBuffer(chatJid, streamEvent.delta.partial_json);
this.streamingJids.add(chatJid);
}
}
if (streamEvent?.type === 'message_stop') {
if (this.streamingJids.has(chatJid)) {
this.ctx.emit('run.partial_content.done', {
agentId: this.ctx.id,
jid: chatJid,
timestamp: now,
});
this.streamingJids.delete(chatJid);
}
// Clear ring buffer — stream completed normally; no replay possible
this.streamingRingBuffer.delete(chatJid);
}
}

// Derive curated convenience events from SDK messages
if (event.sdkType === 'assistant' && msg?.message?.content) {
for (const block of msg.message.content) {
Expand Down
43 changes: 43 additions & 0 deletions src/api/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export interface AgentEvents extends Record<string, any[]> {
'message.out': [payload: MessageOutEvent];
'run.state': [payload: RunStateEvent];
'run.sdk_message': [payload: RunSdkMessageEvent];
'run.partial_content': [payload: AgentPartialContentEvent];
'run.partial_tool_call': [payload: AgentPartialToolCallEvent];
'run.partial_content.done': [payload: AgentPartialContentDoneEvent];
'run.partial_content.interrupted': [payload: AgentPartialContentInterruptedEvent];
'run.tool': [payload: RunToolEvent];
'run.tool_progress': [payload: RunToolProgressEvent];
'run.subagent': [payload: RunSubagentEvent];
Expand Down Expand Up @@ -291,6 +295,45 @@ export interface TaskRunFailedEvent {
timestamp: string;
}

// ── Partial content streaming events ────────────────────────────

/** Emitted for each text token delta during streaming. */
export interface AgentPartialContentEvent {
agentId: string;
jid: string;
/** Incremental text delta (not cumulative). */
delta: string;
/** Index of the content block being streamed (undefined if unavailable). */
contentBlockIndex?: number;
timestamp: string;
}

/** Emitted for each chunk of tool-call argument JSON being built. */
export interface AgentPartialToolCallEvent {
agentId: string;
jid: string;
/** Incremental JSON delta for the tool call arguments. */
jsonDelta: string;
/** Index of the tool-use content block (undefined if unavailable). */
contentBlockIndex?: number;
timestamp: string;
}

/** Emitted when a streaming turn completes normally (message_stop received). */
export interface AgentPartialContentDoneEvent {
agentId: string;
jid: string;
timestamp: string;
}

/** Emitted when a streaming turn is interrupted (container crash, timeout, SIGKILL). */
export interface AgentPartialContentInterruptedEvent {
agentId: string;
jid: string;
reason: string;
timestamp: string;
}

/** A due fire was dropped without executing. */
export interface TaskRunSkippedEvent {
agentId: string;
Expand Down
Loading
Loading