Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions src/a2a-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { createTask, updateTaskStatus, getRecentTasks, getDb } from "./task-jour
import { setAgentWeaveSession, resetAgentWeaveSession } from "./agentweave-context.js";
import { log } from "./logger.js";
import { saveSession } from "./session.js";
import { extractAssistantTextFromTurn } from "./response.js";
import { extractAssistantTextFromTurn, extractErrorFromTurn } from "./response.js";
import type { WorkerProgressEvent } from "./worker.js";
import { relayTaskUpdateToTelegram, relayJobCompletionToTelegram } from "./telegram-notify.js";
import { receiveCallback } from "./tools/claude-subagent.js";
Expand Down Expand Up @@ -231,22 +231,36 @@ export function createA2AServer(agent: Agent): express.Express {
responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex);
}

updateTaskStatus(task.id, "completed", { response: responseText });
const llmError = !responseText
? extractErrorFromTurn(agent.state.messages as any, turnStartIndex)
: null;

saveSession(agent);

res.json({
jsonrpc: "2.0",
id: req.body.id,
result: {
id: task.id,
status: { state: "completed" },
artifacts: [
{
parts: [{ type: "text", text: responseText }],
},
],
},
});
if (llmError) {
log("warn", `LLM error during A2A sync task ${task.id}: ${llmError}`);
updateTaskStatus(task.id, "failed", { error: llmError });
res.status(500).json({
jsonrpc: "2.0",
id: req.body.id,
error: { code: -32000, message: `LLM error: ${llmError}` },
});
} else {
updateTaskStatus(task.id, "completed", { response: responseText });
res.json({
jsonrpc: "2.0",
id: req.body.id,
result: {
id: task.id,
status: { state: "completed" },
artifacts: [
{
parts: [{ type: "text", text: responseText }],
},
],
},
});
}
};

await context.with(incomingContext, executeSyncTask);
Expand Down
23 changes: 23 additions & 0 deletions src/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,26 @@ export function extractAssistantTextFromTurn(messages: AgentMessage[], startInde

return "";
}

/**
* Check if the most recent assistant message in a turn is an LLM error.
*
* The pi-agent-core framework does NOT throw on LLM errors (rate limits,
* timeouts, 500s). Instead it creates an AssistantMessage with
* `stopReason: "error"` and an `errorMessage` field.
*
* Returns the error message string, or null if no error occurred.
*/
export function extractErrorFromTurn(messages: AgentMessage[], startIndex: number): string | null {
const newMessages = messages.slice(Math.max(0, startIndex));

for (let i = newMessages.length - 1; i >= 0; i--) {
const msg: any = newMessages[i];
if (msg?.role !== "assistant") continue;
if (msg.stopReason === "error" && msg.errorMessage) {
return msg.errorMessage;
}
}

return null;
}
18 changes: 12 additions & 6 deletions src/telegram-bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { createTask, updateTaskStatus, getRecentTasks } from "./task-journal.js"
import { log } from "./logger.js";
import { traceAgentTurn } from "./tracing.js";
import { saveSession, clearSession } from "./session.js";
import { extractAssistantTextFromTurn } from "./response.js";
import { extractAssistantTextFromTurn, extractErrorFromTurn } from "./response.js";

// ── Deduplication — Issue #2 ─────────────────────────────────────────────────
const processedUpdateIds = new Set<number>();
Expand Down Expand Up @@ -473,13 +473,19 @@ export function createTelegramBot(agent: Agent): Bot {
}

if (!responseText) {
const msgCount = agent.state.messages.length;
const lastRole = msgCount > 0 ? (agent.state.messages[msgCount - 1] as any).role : "none";
log("warn", `Empty response after prompt. Messages: ${msgCount}, last role: ${lastRole}`);
const llmError = extractErrorFromTurn(agent.state.messages as any, turnStartIndex);
if (llmError) {
log("warn", `LLM error during prompt: ${llmError}`);
updateTaskStatus(task.id, "failed", { error: llmError });
responseText = `LLM error: ${llmError}`;
} else {
const msgCount = agent.state.messages.length;
const lastRole = msgCount > 0 ? (agent.state.messages[msgCount - 1] as any).role : "none";
log("warn", `Empty response after prompt. Messages: ${msgCount}, last role: ${lastRole}`);
responseText = "(no response)";
}
}

responseText = responseText || "(no response)";

// Final edit — finalize the current (possibly split) message
if (responseText.length <= MAX_MSG_LEN) {
await editWithFormat(bot, chatId, currentMsgId, responseText);
Expand Down
11 changes: 11 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ async function main() {
}
}

// Check for LLM errors (rate limits, timeouts, 500s) surfaced as
// AssistantMessage with stopReason: "error" and errorMessage field.
if (!responseText) {
const lastMsg: any = agent.state.messages[agent.state.messages.length - 1];
if (lastMsg?.role === "assistant" && lastMsg.stopReason === "error" && lastMsg.errorMessage) {
saveSession(agent);
await postProgress({ type: "error", taskId, error: `LLM error: ${lastMsg.errorMessage}` });
return;
}
}

saveSession(agent);
await postProgress({ type: "complete", taskId, message: "Worker completed", result: responseText });
} catch (e: any) {
Expand Down
1 change: 1 addition & 0 deletions tests/a2a-callback.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jest.unstable_mockModule("../src/agentweave-context.js", () => ({
}));
jest.unstable_mockModule("../src/response.js", () => ({
extractAssistantTextFromTurn: jest.fn().mockReturnValue(""),
extractErrorFromTurn: jest.fn().mockReturnValue(null),
}));
jest.unstable_mockModule("../src/telegram-notify.js", () => ({
relayTaskUpdateToTelegram: jest.fn(),
Expand Down
1 change: 1 addition & 0 deletions tests/a2a-trace-propagation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jest.unstable_mockModule("../src/agentweave-context.js", () => ({
}));
jest.unstable_mockModule("../src/response.js", () => ({
extractAssistantTextFromTurn: jest.fn().mockReturnValue(""),
extractErrorFromTurn: jest.fn().mockReturnValue(null),
}));
jest.unstable_mockModule("../src/telegram-notify.js", () => ({
relayTaskUpdateToTelegram: jest.fn(),
Expand Down
83 changes: 82 additions & 1 deletion tests/response.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, it, expect } from "@jest/globals";
import { extractAssistantTextFromTurn } from "../src/response.js";
import { extractAssistantTextFromTurn, extractErrorFromTurn } from "../src/response.js";

describe("extractAssistantTextFromTurn", () => {
it("returns assistant text from current turn", () => {
Expand All @@ -25,3 +25,84 @@ describe("extractAssistantTextFromTurn", () => {
expect(extractAssistantTextFromTurn(messages as any, 2)).toBe("");
});
});

describe("extractErrorFromTurn", () => {
it("returns errorMessage when assistant message has stopReason error", () => {
const messages: any[] = [
{ role: "user", content: [{ type: "text", text: "hello" }] },
{
role: "assistant",
content: [{ type: "text", text: "" }],
stopReason: "error",
errorMessage: "429 rate_limit_error: too many requests",
},
];
expect(extractErrorFromTurn(messages, 0)).toBe("429 rate_limit_error: too many requests");
});

it("returns null when assistant message completed normally", () => {
const messages: any[] = [
{ role: "user", content: [{ type: "text", text: "hello" }] },
{
role: "assistant",
content: [{ type: "text", text: "hi there" }],
stopReason: "stop",
},
];
expect(extractErrorFromTurn(messages, 0)).toBeNull();
});

it("returns null when no assistant messages exist in the turn", () => {
const messages: any[] = [
{ role: "user", content: [{ type: "text", text: "hello" }] },
];
expect(extractErrorFromTurn(messages, 0)).toBeNull();
});

it("only checks messages from startIndex onwards", () => {
const messages: any[] = [
{
role: "assistant",
content: [],
stopReason: "error",
errorMessage: "old error from previous turn",
},
{ role: "user", content: [{ type: "text", text: "retry" }] },
{
role: "assistant",
content: [{ type: "text", text: "success" }],
stopReason: "stop",
},
];
expect(extractErrorFromTurn(messages, 1)).toBeNull();
});

it("returns the most recent error when multiple exist", () => {
const messages: any[] = [
{
role: "assistant",
content: [],
stopReason: "error",
errorMessage: "first error",
},
{
role: "assistant",
content: [],
stopReason: "error",
errorMessage: "second error",
},
];
expect(extractErrorFromTurn(messages, 0)).toBe("second error");
});

it("ignores stopReason error when errorMessage is missing", () => {
const messages: any[] = [
{
role: "assistant",
content: [],
stopReason: "error",
},
];
expect(extractErrorFromTurn(messages, 0)).toBeNull();
});
});
Loading