Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jest.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
module.exports = {
preset: 'ts-jest/presets/default-esm',
testEnvironment: 'node',
testEnvironmentOptions: { env: { NODE_ENV: 'test' } },
roots: ['<rootDir>/tests'],
testPathIgnorePatterns: ['/node_modules/', '/agentweave/'],
extensionsToTreatAsEsm: ['.ts'],
Expand Down
294 changes: 162 additions & 132 deletions src/a2a-server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import express from "express";
import { Worker } from "worker_threads";
import { context, propagation } from "@opentelemetry/api";
import type { Agent } from "@mariozechner/pi-agent-core";
import type { AgentEvent } from "@mariozechner/pi-agent-core";
import { createTask, updateTaskStatus, getRecentTasks, getDb } from "./task-journal.js";
Expand All @@ -8,6 +9,8 @@ import { log } from "./logger.js";
import { saveSession } from "./session.js";
import { extractAssistantTextFromTurn } from "./response.js";
import type { WorkerProgressEvent } from "./worker.js";
import { relayTaskUpdateToTelegram, relayJobCompletionToTelegram } from "./telegram-notify.js";
import { receiveCallback } from "./tools/claude-subagent.js";

const A2A_PORT = parseInt(process.env.A2A_PORT || "8770", 10);
const A2A_SHARED_SECRET = process.env.A2A_SHARED_SECRET || "";
Expand All @@ -22,6 +25,7 @@ const AGENT_CARD = {
capabilities: {
streaming: true,
async_tasks: true,
callback_endpoint: true,
},
skills: [
"browser_control",
Expand All @@ -44,34 +48,6 @@ function authMiddleware(req: express.Request, res: express.Response, next: expre
next();
}

async function relayTaskUpdateToTelegram(taskId: string, message: string): Promise<void> {
const token = process.env.TELEGRAM_BOT_TOKEN;
const chatIds = (process.env.TELEGRAM_ALLOWED_USERS || "")
.split(",")
.map((x) => x.trim())
.filter(Boolean);

if (!token || chatIds.length === 0) return;

await Promise.all(
chatIds.map(async (chatId) => {
try {
await fetch(`https://api.telegram.org/bot${token}/sendMessage`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
chat_id: chatId,
text: `🧵 Task ${taskId}: ${message}`,
disable_notification: true,
}),
});
} catch {
// best effort
}
})
);
}

export function createA2AServer(agent: Agent): express.Express {
const app = express();
app.use(express.json());
Expand Down Expand Up @@ -101,6 +77,14 @@ export function createA2AServer(agent: Agent): express.Express {
const taskLabel = req.headers["x-agentweave-task-label"] as string | undefined;
const AGENTWEAVE_PROXY_TOKEN = process.env.AGENTWEAVE_PROXY_TOKEN;

// Extract W3C traceparent from incoming request (for Nix → Max delegations).
// Workers run in a separate thread — AsyncLocalStorage context doesn't cross
// thread boundaries. So we (a) run parent-thread work in this context, AND
// (b) serialize the traceparent headers into workerData for the worker to re-extract.
const incomingContext = propagation.extract(context.active(), req.headers);
const traceHeaders: Record<string, string> = {};
propagation.inject(incomingContext, traceHeaders);

try {
const { params } = req.body;
if (!params?.message?.parts?.[0]?.text) {
Expand All @@ -110,53 +94,86 @@ export function createA2AServer(agent: Agent): express.Express {

const text = params.message.parts[0].text;
const isSync = String(req.query.sync || "false").toLowerCase() === "true";
log("info", `A2A task from Nix (${isSync ? "sync" : "async"}): ${text.slice(0, 100)}`);
log("info", `A2A task from ${callerAgentId || "unknown"} (${isSync ? "sync" : "async"}): ${text.slice(0, 100)}`);

const task = createTask("a2a_task", "nix", { text, metadata: params.metadata });
taskId = task.id;
updateTaskStatus(task.id, "working");

if (!isSync) {
const worker = new Worker(new URL("./worker.js", import.meta.url), {
workerData: {
taskId: task.id,
text,
parentSessionId,
delegatedSessionId,
callerAgentId,
taskLabel,
},
});
const workerStartTime = Date.now();
const isSilent = params.metadata?.silent === true;

// Run worker within the extracted context (if present), so the worker's tracer
// can create child spans under the incoming trace parent.
const executeWorker = async () => {
const worker = new Worker(new URL("./worker.js", import.meta.url), {
workerData: {
taskId: task.id,
text,
parentSessionId,
delegatedSessionId,
callerAgentId,
taskLabel,
traceHeaders,
},
});

worker.on("message", (event: WorkerProgressEvent) => {
if (event.taskId !== task.id) return;
worker.on("message", (event: WorkerProgressEvent) => {
if (event.taskId !== task.id) return;

if (event.type === "progress") {
updateTaskStatus(task.id, "working", { progress: event.message || "Working" });
if (event.message) {
void relayTaskUpdateToTelegram(task.id, event.message);
}
} else if (event.type === "complete") {
updateTaskStatus(task.id, "completed", { response: event.result || "" });
if (!isSilent) {
void relayJobCompletionToTelegram({
taskLabel: taskLabel || task.id,
status: "completed",
durationMs: Date.now() - workerStartTime,
result: event.result || "",
});
}
worker.terminate().catch(() => {});
} else if (event.type === "error") {
updateTaskStatus(task.id, "failed", { error: event.error || "Unknown error" });
if (!isSilent) {
void relayJobCompletionToTelegram({
taskLabel: taskLabel || task.id,
status: "failed",
durationMs: Date.now() - workerStartTime,
error: event.error || "Unknown error",
});
}
worker.terminate().catch(() => {});
}
});

if (event.type === "progress") {
updateTaskStatus(task.id, "working", { progress: event.message || "Working" });
if (event.message) {
void relayTaskUpdateToTelegram(task.id, event.message);
worker.on("error", (err: any) => {
log("error", `Worker thread error for task ${task.id}: ${err.message}`);
updateTaskStatus(task.id, "failed", { error: err.message });
if (!isSilent) {
void relayJobCompletionToTelegram({
taskLabel: taskLabel || task.id,
status: "failed",
durationMs: Date.now() - workerStartTime,
error: err.message,
});
}
} else if (event.type === "complete") {
updateTaskStatus(task.id, "completed", { response: event.result || "" });
void relayTaskUpdateToTelegram(task.id, "Completed");
worker.terminate().catch(() => {});
} else if (event.type === "error") {
updateTaskStatus(task.id, "failed", { error: event.error || "Unknown error" });
void relayTaskUpdateToTelegram(task.id, `Failed: ${event.error || "Unknown error"}`);
worker.terminate().catch(() => {});
}
});
});

worker.on("error", (err: any) => {
log("error", `Worker thread error for task ${task.id}: ${err.message}`);
updateTaskStatus(task.id, "failed", { error: err.message });
});
worker.on("exit", (code) => {
if (code !== 0) {
log("warn", `Worker for task ${task.id} exited with code ${code}`);
}
});
};

worker.on("exit", (code) => {
if (code !== 0) {
log("warn", `Worker for task ${task.id} exited with code ${code}`);
}
});
// Execute the worker within the incoming trace context (if present)
await context.with(incomingContext, executeWorker);

res.status(202).json({
jsonrpc: "2.0",
Expand Down Expand Up @@ -197,37 +214,42 @@ export function createA2AServer(agent: Agent): express.Express {
}
}

let responseText = "";
const turnStartIndex = agent.state.messages.length;
const unsub = agent.subscribe((event: AgentEvent) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
responseText += event.assistantMessageEvent.delta;
}
});
// Execute sync task within the incoming trace context as well
const executeSyncTask = async () => {
let responseText = "";
const turnStartIndex = agent.state.messages.length;
const unsub = agent.subscribe((event: AgentEvent) => {
if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") {
responseText += event.assistantMessageEvent.delta;
}
});

await agent.prompt(text);
unsub();
await agent.prompt(text);
unsub();

if (!responseText) {
responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex);
}
if (!responseText) {
responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex);
}

updateTaskStatus(task.id, "completed", { response: responseText });
saveSession(agent);
updateTaskStatus(task.id, "completed", { response: responseText });
saveSession(agent);

res.json({
jsonrpc: "2.0",
id: req.body.id,
result: {
id: task.id,
status: { state: "completed" },
artifacts: [
{
parts: [{ type: "text", text: responseText }],
},
],
},
});
res.json({
jsonrpc: "2.0",
id: req.body.id,
result: {
id: task.id,
status: { state: "completed" },
artifacts: [
{
parts: [{ type: "text", text: responseText }],
},
],
},
});
};

await context.with(incomingContext, executeSyncTask);
} catch (e: any) {
agent.abort();
log("error", `A2A task error: ${e.message}`);
Expand Down Expand Up @@ -297,63 +319,29 @@ export function createA2AServer(agent: Agent): express.Express {
}
break;
case "tool_execution_start":
sendEvent("tool_start", {
toolCallId: event.toolCallId,
toolName: event.toolName,
args: event.args,
});
sendEvent("tool_start", { toolName: event.toolName });
break;
case "tool_execution_end":
sendEvent("tool_end", {
toolCallId: event.toolCallId,
toolName: event.toolName,
isError: event.isError,
});
break;
case "agent_end":
sendEvent("tool_end", { toolName: event.toolName });
break;
}
});

const turnStartIndex = agent.state.messages.length;
await agent.prompt(text);
unsub();

// Extract final response from this turn only
const responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex);

updateTaskStatus(task.id, "completed", { response: responseText });
saveSession(agent);
sendEvent("task_end", { taskId: task.id });

updateTaskStatus(task.id, "completed", { response: "(streamed)" });
sendEvent("task_end", { taskId: task.id, status: "completed" });
res.end();
} catch (e: any) {
agent.abort();
log("error", `A2A stream error: ${e.message}`);
try {
res.write(`event: error\ndata: ${JSON.stringify({ message: e.message })}\n\n`);
res.end();
} catch {
}
res.write(`event: error\ndata: ${JSON.stringify({ error: e.message })}\n\n`);
res.end();
}
});

app.get("/messages", authMiddleware, (_req, res) => {
const messages = agent.state.messages
.filter((m: any) => m.role === "user" || m.role === "assistant")
.map((m: any) => {
const text = typeof m.content === "string"
? m.content
: Array.isArray(m.content)
? m.content
.filter((c: any) => c.type === "text")
.map((c: any) => c.text)
.join("")
: "";
return { role: m.role, text };
});
res.json({ messages });
});

app.get("/tasks/:id", authMiddleware, (req, res) => {
const task = getDb().prepare("SELECT * FROM tasks WHERE id = ?").get(req.params.id);
if (!task) {
Expand All @@ -363,6 +351,48 @@ export function createA2AServer(agent: Agent): express.Express {
res.json(task);
});

/**
* POST /tasks/callback
* Called by Nix (or any external agent) when an async subagent job completes.
* Body: { jobId: string, status: 'completed' | 'failed' | 'timed_out', result?: string, error?: string }
*/
app.post("/tasks/callback", authMiddleware, (req, res) => {
const { jobId, status, result, error } = req.body as {
jobId?: string;
status?: string;
result?: string;
error?: string;
};

if (!jobId || typeof jobId !== "string") {
res.status(400).json({ error: "Missing or invalid jobId" });
return;
}

const validStatuses = ["completed", "failed", "timed_out"] as const;
type CallbackStatus = (typeof validStatuses)[number];
if (!status || !validStatuses.includes(status as CallbackStatus)) {
res.status(400).json({ error: `Invalid status. Must be one of: ${validStatuses.join(", ")}` });
return;
}

let found = false;
try {
found = receiveCallback(jobId, status as CallbackStatus, result, error);
} catch (e: any) {
log("error", `POST /tasks/callback error for job ${jobId}: ${e.message}`);
res.status(500).json({ error: "Internal error processing callback" });
return;
}

if (!found) {
res.status(404).json({ error: `Unknown jobId: ${jobId}` });
return;
}

res.status(200).json({ ok: true, jobId, status });
});

return app;
}

Expand Down
Loading
Loading