diff --git a/src/a2a-server.ts b/src/a2a-server.ts index 5d94d8c..d5d54ae 100644 --- a/src/a2a-server.ts +++ b/src/a2a-server.ts @@ -7,7 +7,7 @@ import { createTask, updateTaskStatus, getRecentTasks, getDb } from "./task-jour import { setAgentWeaveSession, resetAgentWeaveSession } from "./agentweave-context.js"; import { log } from "./logger.js"; import { saveSession } from "./session.js"; -import { extractAssistantTextFromTurn } from "./response.js"; +import { extractAssistantTextFromTurn, extractErrorFromTurn } from "./response.js"; import type { WorkerProgressEvent } from "./worker.js"; import { relayTaskUpdateToTelegram, relayJobCompletionToTelegram } from "./telegram-notify.js"; import { receiveCallback } from "./tools/claude-subagent.js"; @@ -231,22 +231,36 @@ export function createA2AServer(agent: Agent): express.Express { responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex); } - updateTaskStatus(task.id, "completed", { response: responseText }); + const llmError = !responseText + ? extractErrorFromTurn(agent.state.messages as any, turnStartIndex) + : null; + saveSession(agent); - res.json({ - jsonrpc: "2.0", - id: req.body.id, - result: { - id: task.id, - status: { state: "completed" }, - artifacts: [ - { - parts: [{ type: "text", text: responseText }], - }, - ], - }, - }); + if (llmError) { + log("warn", `LLM error during A2A sync task ${task.id}: ${llmError}`); + updateTaskStatus(task.id, "failed", { error: llmError }); + res.status(500).json({ + jsonrpc: "2.0", + id: req.body.id, + error: { code: -32000, message: `LLM error: ${llmError}` }, + }); + } else { + updateTaskStatus(task.id, "completed", { response: responseText }); + res.json({ + jsonrpc: "2.0", + id: req.body.id, + result: { + id: task.id, + status: { state: "completed" }, + artifacts: [ + { + parts: [{ type: "text", text: responseText }], + }, + ], + }, + }); + } }; await context.with(incomingContext, executeSyncTask); diff --git a/src/response.ts b/src/response.ts index 670b0e0..85a5453 100644 --- a/src/response.ts +++ b/src/response.ts @@ -24,3 +24,26 @@ export function extractAssistantTextFromTurn(messages: AgentMessage[], startInde return ""; } + +/** + * Check if the most recent assistant message in a turn is an LLM error. + * + * The pi-agent-core framework does NOT throw on LLM errors (rate limits, + * timeouts, 500s). Instead it creates an AssistantMessage with + * `stopReason: "error"` and an `errorMessage` field. + * + * Returns the error message string, or null if no error occurred. + */ +export function extractErrorFromTurn(messages: AgentMessage[], startIndex: number): string | null { + const newMessages = messages.slice(Math.max(0, startIndex)); + + for (let i = newMessages.length - 1; i >= 0; i--) { + const msg: any = newMessages[i]; + if (msg?.role !== "assistant") continue; + if (msg.stopReason === "error" && msg.errorMessage) { + return msg.errorMessage; + } + } + + return null; +} diff --git a/src/telegram-bot.ts b/src/telegram-bot.ts index 2176aca..cb72d12 100644 --- a/src/telegram-bot.ts +++ b/src/telegram-bot.ts @@ -6,7 +6,7 @@ import { createTask, updateTaskStatus, getRecentTasks } from "./task-journal.js" import { log } from "./logger.js"; import { traceAgentTurn } from "./tracing.js"; import { saveSession, clearSession } from "./session.js"; -import { extractAssistantTextFromTurn } from "./response.js"; +import { extractAssistantTextFromTurn, extractErrorFromTurn } from "./response.js"; // ── Deduplication — Issue #2 ───────────────────────────────────────────────── const processedUpdateIds = new Set(); @@ -473,13 +473,19 @@ export function createTelegramBot(agent: Agent): Bot { } if (!responseText) { - const msgCount = agent.state.messages.length; - const lastRole = msgCount > 0 ? (agent.state.messages[msgCount - 1] as any).role : "none"; - log("warn", `Empty response after prompt. Messages: ${msgCount}, last role: ${lastRole}`); + const llmError = extractErrorFromTurn(agent.state.messages as any, turnStartIndex); + if (llmError) { + log("warn", `LLM error during prompt: ${llmError}`); + updateTaskStatus(task.id, "failed", { error: llmError }); + responseText = `LLM error: ${llmError}`; + } else { + const msgCount = agent.state.messages.length; + const lastRole = msgCount > 0 ? (agent.state.messages[msgCount - 1] as any).role : "none"; + log("warn", `Empty response after prompt. Messages: ${msgCount}, last role: ${lastRole}`); + responseText = "(no response)"; + } } - responseText = responseText || "(no response)"; - // Final edit — finalize the current (possibly split) message if (responseText.length <= MAX_MSG_LEN) { await editWithFormat(bot, chatId, currentMsgId, responseText); diff --git a/src/worker.ts b/src/worker.ts index 3a4bf08..87bf4a9 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -92,6 +92,17 @@ async function main() { } } + // Check for LLM errors (rate limits, timeouts, 500s) surfaced as + // AssistantMessage with stopReason: "error" and errorMessage field. + if (!responseText) { + const lastMsg: any = agent.state.messages[agent.state.messages.length - 1]; + if (lastMsg?.role === "assistant" && lastMsg.stopReason === "error" && lastMsg.errorMessage) { + saveSession(agent); + await postProgress({ type: "error", taskId, error: `LLM error: ${lastMsg.errorMessage}` }); + return; + } + } + saveSession(agent); await postProgress({ type: "complete", taskId, message: "Worker completed", result: responseText }); } catch (e: any) { diff --git a/tests/a2a-callback.test.ts b/tests/a2a-callback.test.ts index 0dc093b..9699909 100644 --- a/tests/a2a-callback.test.ts +++ b/tests/a2a-callback.test.ts @@ -29,6 +29,7 @@ jest.unstable_mockModule("../src/agentweave-context.js", () => ({ })); jest.unstable_mockModule("../src/response.js", () => ({ extractAssistantTextFromTurn: jest.fn().mockReturnValue(""), + extractErrorFromTurn: jest.fn().mockReturnValue(null), })); jest.unstable_mockModule("../src/telegram-notify.js", () => ({ relayTaskUpdateToTelegram: jest.fn(), diff --git a/tests/a2a-trace-propagation.test.ts b/tests/a2a-trace-propagation.test.ts index e473348..07ec691 100644 --- a/tests/a2a-trace-propagation.test.ts +++ b/tests/a2a-trace-propagation.test.ts @@ -38,6 +38,7 @@ jest.unstable_mockModule("../src/agentweave-context.js", () => ({ })); jest.unstable_mockModule("../src/response.js", () => ({ extractAssistantTextFromTurn: jest.fn().mockReturnValue(""), + extractErrorFromTurn: jest.fn().mockReturnValue(null), })); jest.unstable_mockModule("../src/telegram-notify.js", () => ({ relayTaskUpdateToTelegram: jest.fn(), diff --git a/tests/response.test.ts b/tests/response.test.ts index b4ae86e..3570dbd 100644 --- a/tests/response.test.ts +++ b/tests/response.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "@jest/globals"; -import { extractAssistantTextFromTurn } from "../src/response.js"; +import { extractAssistantTextFromTurn, extractErrorFromTurn } from "../src/response.js"; describe("extractAssistantTextFromTurn", () => { it("returns assistant text from current turn", () => { @@ -25,3 +25,84 @@ describe("extractAssistantTextFromTurn", () => { expect(extractAssistantTextFromTurn(messages as any, 2)).toBe(""); }); }); + +describe("extractErrorFromTurn", () => { + it("returns errorMessage when assistant message has stopReason error", () => { + const messages: any[] = [ + { role: "user", content: [{ type: "text", text: "hello" }] }, + { + role: "assistant", + content: [{ type: "text", text: "" }], + stopReason: "error", + errorMessage: "429 rate_limit_error: too many requests", + }, + ]; + expect(extractErrorFromTurn(messages, 0)).toBe("429 rate_limit_error: too many requests"); + }); + + it("returns null when assistant message completed normally", () => { + const messages: any[] = [ + { role: "user", content: [{ type: "text", text: "hello" }] }, + { + role: "assistant", + content: [{ type: "text", text: "hi there" }], + stopReason: "stop", + }, + ]; + expect(extractErrorFromTurn(messages, 0)).toBeNull(); + }); + + it("returns null when no assistant messages exist in the turn", () => { + const messages: any[] = [ + { role: "user", content: [{ type: "text", text: "hello" }] }, + ]; + expect(extractErrorFromTurn(messages, 0)).toBeNull(); + }); + + it("only checks messages from startIndex onwards", () => { + const messages: any[] = [ + { + role: "assistant", + content: [], + stopReason: "error", + errorMessage: "old error from previous turn", + }, + { role: "user", content: [{ type: "text", text: "retry" }] }, + { + role: "assistant", + content: [{ type: "text", text: "success" }], + stopReason: "stop", + }, + ]; + expect(extractErrorFromTurn(messages, 1)).toBeNull(); + }); + + it("returns the most recent error when multiple exist", () => { + const messages: any[] = [ + { + role: "assistant", + content: [], + stopReason: "error", + errorMessage: "first error", + }, + { + role: "assistant", + content: [], + stopReason: "error", + errorMessage: "second error", + }, + ]; + expect(extractErrorFromTurn(messages, 0)).toBe("second error"); + }); + + it("ignores stopReason error when errorMessage is missing", () => { + const messages: any[] = [ + { + role: "assistant", + content: [], + stopReason: "error", + }, + ]; + expect(extractErrorFromTurn(messages, 0)).toBeNull(); + }); +});