diff --git a/container/agent-runner/src/index.ts b/container/agent-runner/src/index.ts index 11908d2eedc..0eabfefe25f 100644 --- a/container/agent-runner/src/index.ts +++ b/container/agent-runner/src/index.ts @@ -23,6 +23,12 @@ import { } from '@anthropic-ai/claude-agent-sdk'; import { fileURLToPath } from 'url'; +import { + getRetryDescriptor, + normalizeMaxRetries, + withRetry, +} from './retry.js'; + interface McpServerRuntimeConfig { command: string; args?: string[]; @@ -37,6 +43,7 @@ interface ContainerInput { isMain: boolean; isScheduledTask?: boolean; assistantName?: string; + maxRetries?: number; /** Custom MCP server runtime configs. Sources are at /workspace/agent/mcp/{name}/. */ mcpServers?: Record | null; /** Host-side actions HTTP server coordinates for the built-in MCP shim. */ @@ -62,11 +69,17 @@ interface ContainerErrorOutput { type: 'error'; error: string; newSessionId?: string; + kind?: 'rate_limit' | 'transient' | 'runtime'; + statusCode?: number; + retryable?: boolean; + exhaustedRetries?: boolean; + retriesAttempted?: number; + maxRetries?: number; } // ── Raw SDK message passthrough ────────────────────────────────── -/** Every SDK message forwarded as-is. The container is a dumb pipe. */ +/** Every SDK message is forwarded as-is, plus synthetic runtime notices. */ interface ContainerSdkMessageOutput { type: 'sdk_message'; /** Top-level SDK message type (e.g. 'assistant', 'result', 'system', 'tool_progress', 'stream_event'). */ @@ -143,6 +156,31 @@ class MessageStream { } } +class QueryAttemptError extends Error { + readonly newSessionId?: string; + readonly lastAssistantUuid?: string; + readonly closedDuringQuery: boolean; + readonly resultCount: number; + + constructor( + message: string, + opts: { + cause: unknown; + newSessionId?: string; + lastAssistantUuid?: string; + closedDuringQuery: boolean; + resultCount: number; + }, + ) { + super(message, { cause: opts.cause }); + this.name = 'QueryAttemptError'; + this.newSessionId = opts.newSessionId; + this.lastAssistantUuid = opts.lastAssistantUuid; + this.closedDuringQuery = opts.closedDuringQuery; + this.resultCount = opts.resultCount; + } +} + async function readStdin(): Promise { return new Promise((resolve, reject) => { let data = ''; @@ -427,12 +465,12 @@ function waitForIpcMessage(): Promise { } /** - * Run a single query and stream results via writeOutput. + * Run a single query attempt and stream results via writeOutput. * Uses MessageStream (AsyncIterable) to keep isSingleUserTurn=false, * allowing agent teams subagents to run to completion. * Also pipes IPC messages into the stream during the query. */ -async function runQuery( +async function runQueryAttempt( prompt: string, sessionId: string | undefined, mcpServerPath: string, @@ -446,12 +484,6 @@ async function runQuery( }> { const stream = new MessageStream(); stream.push(prompt); - writeOutput({ - type: 'state', - state: 'active', - newSessionId: sessionId, - reason: 'query_started', - }); // Poll IPC for follow-up messages and _close sentinel during the query let ipcPolling = true; @@ -515,121 +547,135 @@ async function runQuery( log(`Additional directories: ${extraDirs.join(', ')}`); } - for await (const message of query({ - prompt: stream, - options: { - includePartialMessages: true, - cwd: '/workspace/group', - additionalDirectories: extraDirs.length > 0 ? extraDirs : undefined, - resume: sessionId, - resumeSessionAt: resumeAt, - systemPrompt: appendedSystemPrompt - ? { - type: 'preset' as const, - preset: 'claude_code' as const, - append: appendedSystemPrompt, - } - : undefined, - allowedTools: [ - 'Bash', - 'Read', - 'Write', - 'Edit', - 'Glob', - 'Grep', - 'WebSearch', - 'WebFetch', - 'Task', - 'TaskOutput', - 'TaskStop', - 'TeamCreate', - 'TeamDelete', - 'SendMessage', - 'TodoWrite', - 'ToolSearch', - 'Skill', - 'NotebookEdit', - 'mcp__agentlite__*', - // Allow tools from all custom MCP servers - ...Object.keys(containerInput.mcpServers ?? {}).map( - (name) => `mcp__${name}__*`, - ), - ], - env: sdkEnv, - permissionMode: 'bypassPermissions', - allowDangerouslySkipPermissions: true, - settingSources: ['project', 'user'], - mcpServers: { - agentlite: { - command: 'node', - args: [mcpServerPath], - env: { - AGENTLITE_CHAT_JID: containerInput.chatJid, - AGENTLITE_GROUP_FOLDER: containerInput.groupFolder, - AGENTLITE_IS_MAIN: containerInput.isMain ? '1' : '0', - ...(containerInput.actionsAuth - ? { - AGENTLITE_ACTIONS_URL: containerInput.actionsAuth.url, - AGENTLITE_ACTIONS_TOKEN: containerInput.actionsAuth.token, - } - : {}), - }, - }, - ...Object.fromEntries( - Object.entries(containerInput.mcpServers ?? {}).map( - ([name, cfg]) => [name, cfg], + try { + for await (const message of query({ + prompt: stream, + options: { + includePartialMessages: true, + cwd: '/workspace/group', + additionalDirectories: extraDirs.length > 0 ? extraDirs : undefined, + resume: sessionId, + resumeSessionAt: resumeAt, + systemPrompt: appendedSystemPrompt + ? { + type: 'preset' as const, + preset: 'claude_code' as const, + append: appendedSystemPrompt, + } + : undefined, + allowedTools: [ + 'Bash', + 'Read', + 'Write', + 'Edit', + 'Glob', + 'Grep', + 'WebSearch', + 'WebFetch', + 'Task', + 'TaskOutput', + 'TaskStop', + 'TeamCreate', + 'TeamDelete', + 'SendMessage', + 'TodoWrite', + 'ToolSearch', + 'Skill', + 'NotebookEdit', + 'mcp__agentlite__*', + // Allow tools from all custom MCP servers + ...Object.keys(containerInput.mcpServers ?? {}).map( + (name) => `mcp__${name}__*`, ), - ), - }, - hooks: { - PreCompact: [ - { hooks: [createPreCompactHook(containerInput.assistantName)] }, ], + env: sdkEnv, + permissionMode: 'bypassPermissions', + allowDangerouslySkipPermissions: true, + settingSources: ['project', 'user'], + mcpServers: { + agentlite: { + command: 'node', + args: [mcpServerPath], + env: { + AGENTLITE_CHAT_JID: containerInput.chatJid, + AGENTLITE_GROUP_FOLDER: containerInput.groupFolder, + AGENTLITE_IS_MAIN: containerInput.isMain ? '1' : '0', + ...(containerInput.actionsAuth + ? { + AGENTLITE_ACTIONS_URL: containerInput.actionsAuth.url, + AGENTLITE_ACTIONS_TOKEN: containerInput.actionsAuth.token, + } + : {}), + }, + }, + ...Object.fromEntries( + Object.entries(containerInput.mcpServers ?? {}).map( + ([name, cfg]) => [name, cfg], + ), + ), + }, + hooks: { + PreCompact: [ + { hooks: [createPreCompactHook(containerInput.assistantName)] }, + ], + }, }, - }, - })) { - messageCount++; - const msgType = - message.type === 'system' - ? `system/${(message as { subtype?: string }).subtype}` - : message.type; - log(`[msg #${messageCount}] type=${msgType}`); - - // ── Internal bookkeeping (not forwarded) ──────────────────── - if (message.type === 'assistant' && 'uuid' in message) { - lastAssistantUuid = (message as { uuid: string }).uuid; - } - if (message.type === 'system' && message.subtype === 'init') { - newSessionId = message.session_id; - log(`Session initialized: ${newSessionId}`); - } - - // ── Forward every SDK message raw ───────────────────────── - const sdkSubtype = - message.type === 'system' - ? (message as { subtype?: string }).subtype - : undefined; - writeOutput({ - type: 'sdk_message', - sdkType: message.type, - sdkSubtype, - message, - }); + })) { + messageCount++; + const msgType = + message.type === 'system' + ? `system/${(message as { subtype?: string }).subtype}` + : message.type; + log(`[msg #${messageCount}] type=${msgType}`); + + // ── Internal bookkeeping (not forwarded) ──────────────────── + if (message.type === 'assistant' && 'uuid' in message) { + lastAssistantUuid = (message as { uuid: string }).uuid; + } + if (message.type === 'system' && message.subtype === 'init') { + newSessionId = message.session_id; + log(`Session initialized: ${newSessionId}`); + } - // ── Backward-compat: emit result for host message delivery ─ - if (message.type === 'result') { - resultCount++; - const textResult = - 'result' in message ? (message as { result?: string }).result : null; - log( - `Result #${resultCount}: subtype=${message.subtype}${textResult ? ` text=${textResult.slice(0, 200)}` : ''}`, - ); + // ── Forward every SDK message raw ───────────────────────── + const sdkSubtype = + message.type === 'system' + ? (message as { subtype?: string }).subtype + : undefined; writeOutput({ - type: 'result', - result: textResult || null, - newSessionId, + type: 'sdk_message', + sdkType: message.type, + sdkSubtype, + message, }); + + // ── Backward-compat: emit result for host message delivery ─ + if (message.type === 'result') { + resultCount++; + const textResult = + 'result' in message ? (message as { result?: string }).result : null; + log( + `Result #${resultCount}: subtype=${message.subtype}${textResult ? ` text=${textResult.slice(0, 200)}` : ''}`, + ); + writeOutput({ + type: 'result', + result: textResult || null, + newSessionId, + }); + } } + } catch (err) { + ipcPolling = false; + throw new QueryAttemptError( + err instanceof Error ? err.message : String(err), + { + cause: err, + newSessionId, + lastAssistantUuid, + closedDuringQuery, + resultCount, + }, + ); } ipcPolling = false; @@ -639,6 +685,75 @@ async function runQuery( return { newSessionId, lastAssistantUuid, closedDuringQuery }; } +async function runQuery( + prompt: string, + sessionId: string | undefined, + mcpServerPath: string, + containerInput: ContainerInput, + sdkEnv: Record, + resumeAt?: string, +): Promise<{ + newSessionId?: string; + lastAssistantUuid?: string; + closedDuringQuery: boolean; +}> { + let currentSessionId = sessionId; + let currentResumeAt = resumeAt; + + writeOutput({ + type: 'state', + state: 'active', + newSessionId: currentSessionId, + reason: 'query_started', + }); + + return withRetry( + async () => { + try { + const result = await runQueryAttempt( + prompt, + currentSessionId, + mcpServerPath, + containerInput, + sdkEnv, + currentResumeAt, + ); + if (result.newSessionId) { + currentSessionId = result.newSessionId; + } + return result; + } catch (err) { + if (err instanceof QueryAttemptError && err.newSessionId) { + currentSessionId = err.newSessionId; + } + if (err instanceof QueryAttemptError && err.lastAssistantUuid) { + currentResumeAt = err.lastAssistantUuid; + } + throw err; + } + }, + { + maxRetries: containerInput.maxRetries, + onRetry: async (retry) => { + if (retry.kind !== 'rate_limit') { + return; + } + + writeOutput({ + type: 'sdk_message', + sdkType: 'rate_limit_retry', + message: { + attempt: retry.attempt, + maxRetries: retry.maxRetries, + retryAfterMs: retry.retryAfterMs, + statusCode: retry.statusCode, + }, + }); + }, + }, + ); +} + async function main(): Promise { let containerInput: ContainerInput; @@ -751,12 +866,23 @@ async function main(): Promise { prompt = nextMessage; } } catch (err) { + const retry = getRetryDescriptor(err); + const maxRetries = normalizeMaxRetries(containerInput.maxRetries); const errorMessage = err instanceof Error ? err.message : String(err); + if (err instanceof QueryAttemptError && err.newSessionId) { + sessionId = err.newSessionId; + } log(`Agent error: ${errorMessage}`); writeOutput({ type: 'error', newSessionId: sessionId, error: errorMessage, + kind: retry?.kind ?? 'runtime', + statusCode: retry?.statusCode, + retryable: retry !== null, + exhaustedRetries: retry !== null, + retriesAttempted: retry ? maxRetries : undefined, + maxRetries: retry ? maxRetries : undefined, }); process.exit(1); } diff --git a/container/agent-runner/src/retry.test.ts b/container/agent-runner/src/retry.test.ts new file mode 100644 index 00000000000..a0c7c734b9b --- /dev/null +++ b/container/agent-runner/src/retry.test.ts @@ -0,0 +1,233 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { + DEFAULT_MAX_RETRIES, + getRetryDelayMs, + getRetryDescriptor, + normalizeMaxRetries, + withRetry, +} from './retry.js'; + +describe('retry helpers', () => { + it('detects rate-limit errors from HTTP 429 responses', () => { + const retry = getRetryDescriptor({ + status: 429, + message: 'Too many requests', + }); + + expect(retry).toEqual({ + kind: 'rate_limit', + message: 'Too many requests', + statusCode: 429, + }); + }); + + it('detects transient errors from nested response status codes', () => { + const retry = getRetryDescriptor({ + message: 'upstream failed', + cause: { response: { status: 503 } }, + }); + + expect(retry).toEqual({ + kind: 'transient', + message: 'upstream failed', + statusCode: 503, + }); + }); + + it('does not retry non-retryable errors', () => { + expect( + getRetryDescriptor({ + statusCode: 400, + message: 'Bad request', + }), + ).toBeNull(); + }); + + it('computes exponential backoff with 0-30% jitter on the capped base', () => { + expect(getRetryDelayMs(0, () => 0)).toBe(1_000); + expect(getRetryDelayMs(3, () => 0.5)).toBe(9_200); + expect(getRetryDelayMs(10, () => 1)).toBe(78_000); + }); + + it('keeps jitter within the capped base and 30% upper bound', () => { + const attempt = 5; + const cappedBase = 32_000; + const maxDelay = Math.floor(cappedBase * 1.3); + + expect(getRetryDelayMs(attempt, () => 0)).toBe(cappedBase); + expect(getRetryDelayMs(attempt, () => 1)).toBe(maxDelay); + + const actual = getRetryDelayMs(attempt, () => 0.42); + expect(actual).toBeGreaterThanOrEqual(cappedBase); + expect(actual).toBeLessThanOrEqual(maxDelay); + }); + + it('normalizes invalid retry counts to safe integers', () => { + expect(normalizeMaxRetries(undefined)).toBe(DEFAULT_MAX_RETRIES); + expect(normalizeMaxRetries(-3)).toBe(0); + expect(normalizeMaxRetries(3.9)).toBe(3); + }); + + it('retries retryable errors and reports the backoff delay', async () => { + const sleep = vi.fn(async (_ms: number) => {}); + const onRetry = vi.fn(async () => {}); + let attempts = 0; + + const result = await withRetry( + async () => { + attempts += 1; + if (attempts < 3) { + throw { + status: 429, + message: `try ${attempts}`, + }; + } + return 'ok'; + }, + { + maxRetries: 5, + random: () => 0, + sleep, + onRetry, + }, + ); + + expect(result).toBe('ok'); + expect(attempts).toBe(3); + expect(sleep).toHaveBeenNthCalledWith(1, 1_000); + expect(sleep).toHaveBeenNthCalledWith(2, 2_000); + expect(onRetry).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + attempt: 1, + maxRetries: 5, + retryAfterMs: 1_000, + kind: 'rate_limit', + }), + ); + expect(onRetry).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + attempt: 2, + maxRetries: 5, + retryAfterMs: 2_000, + kind: 'rate_limit', + }), + ); + }); + + it('retries HTTP 500 errors the same way as 429s', async () => { + const sleep = vi.fn(async (_ms: number) => {}); + const onRetry = vi.fn(async () => {}); + let attempts = 0; + + const result = await withRetry( + async () => { + attempts += 1; + if (attempts === 1) { + throw { + status: 500, + message: 'Internal server error', + }; + } + return 'ok'; + }, + { + maxRetries: 1, + random: () => 0, + sleep, + onRetry, + }, + ); + + expect(result).toBe('ok'); + expect(attempts).toBe(2); + expect(sleep).toHaveBeenCalledOnce(); + expect(sleep).toHaveBeenCalledWith(1_000); + expect(onRetry).toHaveBeenCalledWith( + expect.objectContaining({ + attempt: 1, + kind: 'transient', + retryAfterMs: 1_000, + statusCode: 500, + }), + ); + }); + + it('throws immediately when maxRetries is 0', async () => { + const sleep = vi.fn(async (_ms: number) => {}); + const onRetry = vi.fn(async () => {}); + let attempts = 0; + + await expect( + withRetry( + async () => { + attempts += 1; + throw { + status: 429, + message: 'Too many requests', + }; + }, + { + maxRetries: 0, + random: () => 0, + sleep, + onRetry, + }, + ), + ).rejects.toThrow( + /^Rate limit \/ transient error — retries exhausted after 0 retries/, + ); + + expect(attempts).toBe(1); + expect(sleep).not.toHaveBeenCalled(); + expect(onRetry).not.toHaveBeenCalled(); + }); + + it('throws an exhaustion error after the configured retries are spent', async () => { + const sleep = vi.fn(async (_ms: number) => {}); + + await expect( + withRetry( + async () => { + throw { + status: 429, + message: 'Too many requests', + }; + }, + { + maxRetries: 2, + random: () => 0, + sleep, + }, + ), + ).rejects.toThrow( + /^Rate limit \/ transient error — retries exhausted after 2 retries/, + ); + + expect(sleep).toHaveBeenCalledTimes(2); + }); + + it('does not retry non-HTTP errors without a retryable status code', async () => { + const sleep = vi.fn(async (_ms: number) => {}); + const error = new Error('Service unavailable'); + let attempts = 0; + + await expect( + withRetry( + async () => { + attempts += 1; + throw error; + }, + { + maxRetries: 5, + sleep, + }, + ), + ).rejects.toBe(error); + + expect(attempts).toBe(1); + expect(sleep).not.toHaveBeenCalled(); + }); +}); diff --git a/container/agent-runner/src/retry.ts b/container/agent-runner/src/retry.ts new file mode 100644 index 00000000000..f201d32dc91 --- /dev/null +++ b/container/agent-runner/src/retry.ts @@ -0,0 +1,193 @@ +export const DEFAULT_MAX_RETRIES = 5; +export const RETRY_BASE_DELAY_MS = 1_000; +export const RETRY_MAX_DELAY_MS = 60_000; + +export type RetryKind = 'rate_limit' | 'transient'; + +export interface RetryDescriptor { + kind: RetryKind; + message: string; + statusCode?: number; +} + +export interface RetryAttempt { + attempt: number; + maxRetries: number; + retryAfterMs: number; + kind: RetryKind; + message: string; + statusCode?: number; +} + +function asObject(value: unknown): Record | null { + return value && typeof value === 'object' + ? (value as Record) + : null; +} + +function readNumber(value: unknown): number | undefined { + return typeof value === 'number' && Number.isFinite(value) + ? value + : undefined; +} + +function extractStatusCode(error: unknown): number | undefined { + const seen = new Set(); + const queue: unknown[] = [error]; + + while (queue.length > 0) { + const current = queue.shift(); + if (!current || seen.has(current)) continue; + seen.add(current); + + const record = asObject(current); + if (!record) continue; + + const direct = readNumber(record.status); + if (direct !== undefined) return direct; + + const statusCode = readNumber(record.statusCode); + if (statusCode !== undefined) return statusCode; + + const snakeCase = readNumber(record.status_code); + if (snakeCase !== undefined) return snakeCase; + + queue.push(record.response, record.error, record.body, record.cause); + } + + return undefined; +} + +export function getErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + if (typeof error === 'string') { + return error; + } + const record = asObject(error); + if (typeof record?.message === 'string') { + return record.message; + } + try { + return JSON.stringify(error); + } catch { + return String(error); + } +} + +export function getRetryDescriptor(error: unknown): RetryDescriptor | null { + const record = asObject(error); + if ( + (readNumber(record?.resultCount) ?? 0) > 0 || + record?.closedDuringQuery === true + ) { + return null; + } + + const statusCode = extractStatusCode(error); + const message = getErrorMessage(error); + + if (statusCode === 429) { + return { + kind: 'rate_limit', + message, + statusCode, + }; + } + + if (statusCode === 500 || statusCode === 503) { + return { + kind: 'transient', + message, + statusCode, + }; + } + + return null; +} + +export function normalizeMaxRetries(maxRetries: number | undefined): number { + if (maxRetries === undefined) return DEFAULT_MAX_RETRIES; + if (!Number.isFinite(maxRetries)) return DEFAULT_MAX_RETRIES; + return Math.max(0, Math.floor(maxRetries)); +} + +export function getRetryDelayMs( + attempt: number, + random: () => number = Math.random, +): number { + const exponent = Math.max(0, attempt); + const cappedBase = Math.min( + RETRY_MAX_DELAY_MS, + RETRY_BASE_DELAY_MS * 2 ** exponent, + ); + const jitterFactor = Math.min(1, Math.max(0, random())); + return Math.floor(cappedBase * (1 + jitterFactor * 0.3)); +} + +export async function sleepMs(ms: number): Promise { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +function getRetriesExhaustedMessage( + retry: RetryDescriptor, + maxRetries: number, +): string { + const statusSuffix = + retry.statusCode !== undefined ? ` (status ${retry.statusCode})` : ''; + return `Rate limit / transient error — retries exhausted after ${maxRetries} retries${statusSuffix}: `; +} + +function toRetriesExhaustedError( + error: unknown, + retry: RetryDescriptor, + maxRetries: number, +): Error { + const message = `${getRetriesExhaustedMessage(retry, maxRetries)}${getErrorMessage(error)}`; + if (error instanceof Error) { + error.message = message; + return error; + } + return new Error(message, { cause: error }); +} + +export async function withRetry( + operation: (attempt: number) => Promise, + opts?: { + maxRetries?: number; + random?: () => number; + sleep?: (ms: number) => Promise; + onRetry?: (attempt: RetryAttempt) => void | Promise; + }, +): Promise { + const maxRetries = normalizeMaxRetries(opts?.maxRetries); + const random = opts?.random ?? Math.random; + const sleep = opts?.sleep ?? sleepMs; + + for (let attempt = 0; ; attempt += 1) { + try { + return await operation(attempt); + } catch (error) { + const retry = getRetryDescriptor(error); + if (!retry) { + throw error; + } + if (attempt >= maxRetries) { + throw toRetriesExhaustedError(error, retry, maxRetries); + } + + const retryAttempt = attempt + 1; + const retryAfterMs = getRetryDelayMs(attempt, random); + await opts?.onRetry?.({ + ...retry, + attempt: retryAttempt, + maxRetries, + retryAfterMs, + }); + await sleep(retryAfterMs); + } + } +} diff --git a/src/agent/message-processor.ts b/src/agent/message-processor.ts index bf9ceb8ca04..5209238632d 100644 --- a/src/agent/message-processor.ts +++ b/src/agent/message-processor.ts @@ -204,6 +204,18 @@ export class MessageProcessor { if (event.type === 'error') { hadError = true; + this.ctx.emit('run.error', { + agentId: this.ctx.id, + jid: chatJid, + kind: event.kind ?? 'runtime', + error: event.error, + retryable: event.retryable, + exhaustedRetries: event.exhaustedRetries, + retriesAttempted: event.retriesAttempted, + maxRetries: event.maxRetries, + statusCode: event.statusCode, + timestamp: new Date().toISOString(), + }); return; } @@ -223,6 +235,18 @@ export class MessageProcessor { }); // Derive curated convenience events from SDK messages + if (event.sdkType === 'rate_limit_retry') { + this.ctx.emit('run.rate_limited', { + agentId: this.ctx.id, + jid: chatJid, + attempt: msg.attempt, + maxRetries: msg.maxRetries, + retryAfterMs: msg.retryAfterMs, + statusCode: msg.statusCode, + timestamp: now, + }); + } + if (event.sdkType === 'assistant' && msg?.message?.content) { for (const block of msg.message.content) { if (block.type === 'tool_use' && block.name && block.id) { @@ -378,6 +402,7 @@ export class MessageProcessor { chatJid, isMain, assistantName: this.ctx.config.assistantName, + maxRetries: group.containerConfig?.maxRetries, agentId: this.ctx.id, groupsDir: this.ctx.config.groupsDir, dataDir: this.ctx.config.dataDir, @@ -408,6 +433,13 @@ export class MessageProcessor { } return 'success'; } catch (err) { + const errorMessage = err instanceof Error ? err.message : String(err); + await wrappedOnOutput?.({ + type: 'error', + error: errorMessage, + kind: 'runtime', + retryable: false, + }); logger.error({ group: group.name, err }, 'Agent error'); return 'error'; } diff --git a/src/api/events.ts b/src/api/events.ts index 701b7e8dd87..259db7f6caf 100644 --- a/src/api/events.ts +++ b/src/api/events.ts @@ -9,6 +9,8 @@ export interface AgentEvents extends Record { 'message.in': [payload: MessageInEvent]; 'message.out': [payload: MessageOutEvent]; 'run.state': [payload: RunStateEvent]; + 'run.rate_limited': [payload: RunRateLimitedEvent]; + 'run.error': [payload: RunErrorEvent]; 'run.sdk_message': [payload: RunSdkMessageEvent]; 'run.tool': [payload: RunToolEvent]; 'run.tool_progress': [payload: RunToolProgressEvent]; @@ -85,13 +87,56 @@ export interface RunStateEvent { exitCode?: number; } +/** Claude API retry scheduled after an upstream rate limit. */ +export interface RunRateLimitedEvent { + /** Stable agent identifier. */ + agentId: string; + /** Group/chat identifier. */ + jid: string; + /** Retry attempt number, starting at 1. */ + attempt: number; + /** Maximum retries configured for the group. */ + maxRetries: number; + /** Backoff delay in milliseconds. */ + retryAfterMs: number; + /** Upstream HTTP status code. */ + statusCode: number; + /** ISO timestamp. */ + timestamp: string; +} + +/** Runtime error surfaced by the container or host. */ +export interface RunErrorEvent { + /** Stable agent identifier. */ + agentId: string; + /** Group/chat identifier. */ + jid: string; + /** Error category. */ + kind: 'rate_limit' | 'transient' | 'runtime'; + /** Error message. */ + error: string; + /** Whether the runtime considered this retryable. */ + retryable?: boolean; + /** Whether the configured retries were exhausted. */ + exhaustedRetries?: boolean; + /** Number of retries already attempted. */ + retriesAttempted?: number; + /** Maximum retries configured for the group. */ + maxRetries?: number; + /** Best-effort upstream status code, when available. */ + statusCode?: number; + /** ISO timestamp. */ + timestamp: string; +} + /** * Raw SDK message from the agent runtime. - * Exposes all 21 SDK message types — consumers can filter by sdkType/sdkSubtype. + * Exposes all SDK messages plus synthetic retry notices emitted through the + * same envelope — consumers can filter by sdkType/sdkSubtype. * * Common sdkType values: 'assistant', 'result', 'system', 'stream_event', * 'tool_progress', 'tool_use_summary', 'auth_status', 'rate_limit_event', - * 'prompt_suggestion'. + * 'prompt_suggestion', 'rate_limit_retry'. * * Common sdkSubtype values (when sdkType='system'): 'init', 'status', * 'task_started', 'task_progress', 'task_notification', 'compact_boundary', diff --git a/src/api/group.ts b/src/api/group.ts index bed52b318da..a3e2a6416d7 100644 --- a/src/api/group.ts +++ b/src/api/group.ts @@ -18,6 +18,8 @@ export interface ContainerConfig { additionalMounts?: AdditionalMount[]; /** Container timeout in milliseconds. Default: 300000 (5 minutes). */ timeout?: number; + /** Claude API retry count after the initial attempt. Default: 5. */ + maxRetries?: number; } /** A registered group returned by the Agent API. */ diff --git a/src/container-runner.ts b/src/container-runner.ts index e381005a6d7..f3823307bc5 100644 --- a/src/container-runner.ts +++ b/src/container-runner.ts @@ -43,6 +43,7 @@ export interface ContainerInput { isMain: boolean; isScheduledTask?: boolean; assistantName?: string; + maxRetries?: number; /** Agent id used to scope runtime box names. */ agentId?: string; workDir?: string; @@ -94,12 +95,18 @@ export interface ContainerErrorEvent { type: 'error'; error: string; newSessionId?: string; + kind?: 'rate_limit' | 'transient' | 'runtime'; + statusCode?: number; + retryable?: boolean; + exhaustedRetries?: boolean; + retriesAttempted?: number; + maxRetries?: number; } /** * Raw SDK message forwarded from the container. - * The container is a dumb pipe — every SDK message is forwarded as-is. - * All 21 SDK message types flow through this single event. + * The container forwards every SDK message as-is and may also emit + * synthetic runtime notices through the same envelope. */ export interface ContainerSdkMessageEvent { type: 'sdk_message'; @@ -525,6 +532,26 @@ export async function runContainerAgent( const logsDir = path.join(groupDir, 'logs'); fs.mkdirSync(logsDir, { recursive: true }); + let newSessionId: string | undefined; + let outputChain = Promise.resolve(); + let lastLifecycleState: ContainerState | null = null; + let hasStructuredError = false; + + const forwardEvent = (event: ContainerEvent): void => { + if ('newSessionId' in event && event.newSessionId) { + newSessionId = event.newSessionId; + } + if (event.type === 'state') { + lastLifecycleState = event.state; + } + if (event.type === 'error') { + hasStructuredError = true; + } + if (onOutput) { + outputChain = outputChain.then(() => onOutput(event)); + } + }; + // Create box, run entrypoint, write stdin const spawnResult = await spawnBox( group.name, @@ -535,16 +562,24 @@ export async function runContainerAgent( JSON.stringify(input), rc, ); - if ('status' in spawnResult) return spawnResult; // error + if ('status' in spawnResult) { + if (spawnResult.status === 'error') { + forwardEvent({ + type: 'error', + error: spawnResult.error, + kind: 'runtime', + retryable: false, + }); + await outputChain; + } + return spawnResult; + } const { box, execution } = spawnResult; onProcess(containerName, containerName); // Stream stdout and stderr, parse output markers let parseBuffer = ''; - let newSessionId: string | undefined; - let outputChain = Promise.resolve(); - let lastLifecycleState: ContainerState | null = null; let stdout = ''; let stderr = ''; let stdoutTruncated = false; @@ -615,21 +650,9 @@ export async function runContainerAgent( try { const parsed: ContainerEvent = JSON.parse(jsonStr); - if ( - (parsed.type === 'state' || - parsed.type === 'result' || - parsed.type === 'error') && - parsed.newSessionId - ) { - newSessionId = parsed.newSessionId; - } - if (parsed.type === 'state') { - lastLifecycleState = parsed.state; - } // Activity detected — reset the hard timeout resetTimeout(); - // Call onOutput for all structured stream events. - outputChain = outputChain.then(() => onOutput(parsed)); + forwardEvent(parsed); } catch (err) { logger.warn( { group: group.name, error: err }, @@ -693,17 +716,14 @@ export async function runContainerAgent( const code = execResult.exitCode; if (timedOut) { - if (onOutput) { - outputChain = outputChain.then(() => - onOutput({ - type: 'state', - state: 'stopped', - newSessionId, - reason: lastLifecycleState === 'idle' ? 'idle_timeout' : 'timeout', - exitCode: code, - }), - ); - } + const timedOutFromIdle = lastLifecycleState === 'idle'; + forwardEvent({ + type: 'state', + state: 'stopped', + newSessionId, + reason: timedOutFromIdle ? 'idle_timeout' : 'timeout', + exitCode: code, + }); const ts = new Date().toISOString().replace(/[:.]/g, '-'); const timeoutLog = path.join(logsDir, `container-${ts}.log`); @@ -721,7 +741,7 @@ export async function runContainerAgent( ); // Timeout after an explicit idle signal = idle cleanup, not failure. - if (lastLifecycleState === 'idle') { + if (timedOutFromIdle) { logger.info( { group: group.name, containerName, duration, code }, 'Box timed out after idle (idle cleanup)', @@ -734,6 +754,15 @@ export async function runContainerAgent( { group: group.name, containerName, duration, code }, 'Box timed out before reaching idle', ); + if (!hasStructuredError) { + forwardEvent({ + type: 'error', + error: `Box timed out after ${configTimeout}ms`, + newSessionId, + kind: 'runtime', + retryable: false, + }); + } await outputChain; return { status: 'error', @@ -810,19 +839,25 @@ export async function runContainerAgent( fs.writeFileSync(logFile, logLines.join('\n')); logger.debug({ logFile, verbose: isVerbose }, 'Box log written'); - if (onOutput) { - outputChain = outputChain.then(() => - onOutput({ - type: 'state', - state: 'stopped', - newSessionId, - reason: code === 0 ? 'exit' : 'error_exit', - exitCode: code, - }), - ); - } + forwardEvent({ + type: 'state', + state: 'stopped', + newSessionId, + reason: code === 0 ? 'exit' : 'error_exit', + exitCode: code, + }); if (code !== 0) { + const errorMessage = `Box exited with code ${code}: ${stderr.slice(-200)}`; + if (!hasStructuredError) { + forwardEvent({ + type: 'error', + error: errorMessage, + newSessionId, + kind: 'runtime', + retryable: false, + }); + } await outputChain; logger.error( { @@ -839,7 +874,7 @@ export async function runContainerAgent( return { status: 'error', result: null, - error: `Box exited with code ${code}: ${stderr.slice(-200)}`, + error: errorMessage, }; } @@ -871,12 +906,7 @@ export async function runContainerAgent( const event = JSON.parse(jsonLine) as ContainerEvent; events.push(event); - if ( - (event.type === 'state' || - event.type === 'result' || - event.type === 'error') && - event.newSessionId - ) { + if ('newSessionId' in event && event.newSessionId) { newSessionId = event.newSessionId; } } diff --git a/src/run-stream-events.test.ts b/src/run-stream-events.test.ts index 967246ffbf3..f477a875e5a 100644 --- a/src/run-stream-events.test.ts +++ b/src/run-stream-events.test.ts @@ -28,11 +28,13 @@ import { _initTestDatabase, AgentDb } from './db.js'; import { buildRuntimeConfig } from './runtime-config.js'; import { runContainerAgent } from './container-runner.js'; import type { + RunErrorEvent, + RunRateLimitedEvent, RunSdkMessageEvent, + RunStatusEvent, + RunSubagentEvent, RunToolEvent, RunToolProgressEvent, - RunSubagentEvent, - RunStatusEvent, } from './api/events.js'; import type { Channel, RegisteredGroup } from './types.js'; @@ -603,6 +605,116 @@ describe('run.status (derived from sdk_message)', () => { }); }); +describe('run.rate_limited (derived from rate_limit_retry sdk_message)', () => { + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-stream-')); + db = _initTestDatabase(); + vi.mocked(runContainerAgent).mockReset(); + }); + + afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } + }); + + it('emits retry details for backoff UI state', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.( + sdkMsg('rate_limit_retry', { + attempt: 2, + maxRetries: 5, + retryAfterMs: 2_500, + statusCode: 429, + }), + ); + await onOutput?.({ + type: 'state', + state: 'stopped', + reason: 'exit', + exitCode: 0, + }); + return { status: 'success', result: null }; + }, + ); + + const events: RunRateLimitedEvent[] = []; + agent.on('run.rate_limited', (evt) => events.push(evt)); + + await agent.processGroupMessages('mock:stream'); + + expect(events).toHaveLength(1); + expect(events[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + attempt: 2, + maxRetries: 5, + retryAfterMs: 2_500, + statusCode: 429, + }); + }); +}); + +describe('run.error (derived from container error events)', () => { + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-stream-')); + db = _initTestDatabase(); + vi.mocked(runContainerAgent).mockReset(); + }); + + afterEach(() => { + try { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } catch { + /* ignore */ + } + }); + + it('emits exhausted retry details when the runtime gives up', async () => { + const agent = setupAgent(); + + vi.mocked(runContainerAgent).mockImplementation( + async (_group, _input, _rc, _onProcess, onOutput) => { + await onOutput?.({ + type: 'error', + error: '429 rate limit', + kind: 'rate_limit', + statusCode: 429, + retryable: true, + exhaustedRetries: true, + retriesAttempted: 5, + maxRetries: 5, + }); + return { status: 'error', result: null, error: '429 rate limit' }; + }, + ); + + const events: RunErrorEvent[] = []; + agent.on('run.error', (evt) => events.push(evt)); + + const processed = await agent.processGroupMessages('mock:stream'); + + expect(processed).toBe(false); + expect(events).toHaveLength(1); + expect(events[0]).toMatchObject({ + agentId: agent.id, + jid: 'mock:stream', + kind: 'rate_limit', + error: '429 rate limit', + retryable: true, + exhaustedRetries: true, + retriesAttempted: 5, + maxRetries: 5, + statusCode: 429, + }); + }); +}); + describe('mixed streaming events', () => { beforeEach(() => { tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'agentlite-stream-')); diff --git a/src/types.ts b/src/types.ts index 39af2dfa6d5..920a9db825e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -30,6 +30,7 @@ export interface AllowedRoot { export interface ContainerConfig { additionalMounts?: AdditionalMount[]; timeout?: number; // Default: 300000 (5 minutes) + maxRetries?: number; // Claude API retry count after the initial attempt. Default: 5 } export interface RegisteredGroup { diff --git a/vitest.config.ts b/vitest.config.ts index a456d1cc3df..d5683466a01 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -2,6 +2,10 @@ import { defineConfig } from 'vitest/config'; export default defineConfig({ test: { - include: ['src/**/*.test.ts', 'setup/**/*.test.ts'], + include: [ + 'src/**/*.test.ts', + 'setup/**/*.test.ts', + 'container/agent-runner/src/**/*.test.ts', + ], }, });