diff --git a/jest.config.cjs b/jest.config.cjs index c7b8361..d947032 100644 --- a/jest.config.cjs +++ b/jest.config.cjs @@ -2,6 +2,7 @@ module.exports = { preset: 'ts-jest/presets/default-esm', testEnvironment: 'node', + testEnvironmentOptions: { env: { NODE_ENV: 'test' } }, roots: ['/tests'], testPathIgnorePatterns: ['/node_modules/', '/agentweave/'], extensionsToTreatAsEsm: ['.ts'], diff --git a/src/a2a-server.ts b/src/a2a-server.ts index 6fa6314..5d94d8c 100644 --- a/src/a2a-server.ts +++ b/src/a2a-server.ts @@ -1,5 +1,6 @@ import express from "express"; import { Worker } from "worker_threads"; +import { context, propagation } from "@opentelemetry/api"; import type { Agent } from "@mariozechner/pi-agent-core"; import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { createTask, updateTaskStatus, getRecentTasks, getDb } from "./task-journal.js"; @@ -8,6 +9,8 @@ import { log } from "./logger.js"; import { saveSession } from "./session.js"; import { extractAssistantTextFromTurn } from "./response.js"; import type { WorkerProgressEvent } from "./worker.js"; +import { relayTaskUpdateToTelegram, relayJobCompletionToTelegram } from "./telegram-notify.js"; +import { receiveCallback } from "./tools/claude-subagent.js"; const A2A_PORT = parseInt(process.env.A2A_PORT || "8770", 10); const A2A_SHARED_SECRET = process.env.A2A_SHARED_SECRET || ""; @@ -22,6 +25,7 @@ const AGENT_CARD = { capabilities: { streaming: true, async_tasks: true, + callback_endpoint: true, }, skills: [ "browser_control", @@ -44,34 +48,6 @@ function authMiddleware(req: express.Request, res: express.Response, next: expre next(); } -async function relayTaskUpdateToTelegram(taskId: string, message: string): Promise { - const token = process.env.TELEGRAM_BOT_TOKEN; - const chatIds = (process.env.TELEGRAM_ALLOWED_USERS || "") - .split(",") - .map((x) => x.trim()) - .filter(Boolean); - - if (!token || chatIds.length === 0) return; - - await Promise.all( - chatIds.map(async (chatId) => { - try { - await fetch(`https://api.telegram.org/bot${token}/sendMessage`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - chat_id: chatId, - text: `🧡 Task ${taskId}: ${message}`, - disable_notification: true, - }), - }); - } catch { - // best effort - } - }) - ); -} - export function createA2AServer(agent: Agent): express.Express { const app = express(); app.use(express.json()); @@ -101,6 +77,14 @@ export function createA2AServer(agent: Agent): express.Express { const taskLabel = req.headers["x-agentweave-task-label"] as string | undefined; const AGENTWEAVE_PROXY_TOKEN = process.env.AGENTWEAVE_PROXY_TOKEN; + // Extract W3C traceparent from incoming request (for Nix β†’ Max delegations). + // Workers run in a separate thread β€” AsyncLocalStorage context doesn't cross + // thread boundaries. So we (a) run parent-thread work in this context, AND + // (b) serialize the traceparent headers into workerData for the worker to re-extract. + const incomingContext = propagation.extract(context.active(), req.headers); + const traceHeaders: Record = {}; + propagation.inject(incomingContext, traceHeaders); + try { const { params } = req.body; if (!params?.message?.parts?.[0]?.text) { @@ -110,53 +94,86 @@ export function createA2AServer(agent: Agent): express.Express { const text = params.message.parts[0].text; const isSync = String(req.query.sync || "false").toLowerCase() === "true"; - log("info", `A2A task from Nix (${isSync ? "sync" : "async"}): ${text.slice(0, 100)}`); + log("info", `A2A task from ${callerAgentId || "unknown"} (${isSync ? "sync" : "async"}): ${text.slice(0, 100)}`); const task = createTask("a2a_task", "nix", { text, metadata: params.metadata }); taskId = task.id; updateTaskStatus(task.id, "working"); if (!isSync) { - const worker = new Worker(new URL("./worker.js", import.meta.url), { - workerData: { - taskId: task.id, - text, - parentSessionId, - delegatedSessionId, - callerAgentId, - taskLabel, - }, - }); + const workerStartTime = Date.now(); + const isSilent = params.metadata?.silent === true; + + // Run worker within the extracted context (if present), so the worker's tracer + // can create child spans under the incoming trace parent. + const executeWorker = async () => { + const worker = new Worker(new URL("./worker.js", import.meta.url), { + workerData: { + taskId: task.id, + text, + parentSessionId, + delegatedSessionId, + callerAgentId, + taskLabel, + traceHeaders, + }, + }); - worker.on("message", (event: WorkerProgressEvent) => { - if (event.taskId !== task.id) return; + worker.on("message", (event: WorkerProgressEvent) => { + if (event.taskId !== task.id) return; + + if (event.type === "progress") { + updateTaskStatus(task.id, "working", { progress: event.message || "Working" }); + if (event.message) { + void relayTaskUpdateToTelegram(task.id, event.message); + } + } else if (event.type === "complete") { + updateTaskStatus(task.id, "completed", { response: event.result || "" }); + if (!isSilent) { + void relayJobCompletionToTelegram({ + taskLabel: taskLabel || task.id, + status: "completed", + durationMs: Date.now() - workerStartTime, + result: event.result || "", + }); + } + worker.terminate().catch(() => {}); + } else if (event.type === "error") { + updateTaskStatus(task.id, "failed", { error: event.error || "Unknown error" }); + if (!isSilent) { + void relayJobCompletionToTelegram({ + taskLabel: taskLabel || task.id, + status: "failed", + durationMs: Date.now() - workerStartTime, + error: event.error || "Unknown error", + }); + } + worker.terminate().catch(() => {}); + } + }); - if (event.type === "progress") { - updateTaskStatus(task.id, "working", { progress: event.message || "Working" }); - if (event.message) { - void relayTaskUpdateToTelegram(task.id, event.message); + worker.on("error", (err: any) => { + log("error", `Worker thread error for task ${task.id}: ${err.message}`); + updateTaskStatus(task.id, "failed", { error: err.message }); + if (!isSilent) { + void relayJobCompletionToTelegram({ + taskLabel: taskLabel || task.id, + status: "failed", + durationMs: Date.now() - workerStartTime, + error: err.message, + }); } - } else if (event.type === "complete") { - updateTaskStatus(task.id, "completed", { response: event.result || "" }); - void relayTaskUpdateToTelegram(task.id, "Completed"); - worker.terminate().catch(() => {}); - } else if (event.type === "error") { - updateTaskStatus(task.id, "failed", { error: event.error || "Unknown error" }); - void relayTaskUpdateToTelegram(task.id, `Failed: ${event.error || "Unknown error"}`); - worker.terminate().catch(() => {}); - } - }); + }); - worker.on("error", (err: any) => { - log("error", `Worker thread error for task ${task.id}: ${err.message}`); - updateTaskStatus(task.id, "failed", { error: err.message }); - }); + worker.on("exit", (code) => { + if (code !== 0) { + log("warn", `Worker for task ${task.id} exited with code ${code}`); + } + }); + }; - worker.on("exit", (code) => { - if (code !== 0) { - log("warn", `Worker for task ${task.id} exited with code ${code}`); - } - }); + // Execute the worker within the incoming trace context (if present) + await context.with(incomingContext, executeWorker); res.status(202).json({ jsonrpc: "2.0", @@ -197,37 +214,42 @@ export function createA2AServer(agent: Agent): express.Express { } } - let responseText = ""; - const turnStartIndex = agent.state.messages.length; - const unsub = agent.subscribe((event: AgentEvent) => { - if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") { - responseText += event.assistantMessageEvent.delta; - } - }); + // Execute sync task within the incoming trace context as well + const executeSyncTask = async () => { + let responseText = ""; + const turnStartIndex = agent.state.messages.length; + const unsub = agent.subscribe((event: AgentEvent) => { + if (event.type === "message_update" && event.assistantMessageEvent.type === "text_delta") { + responseText += event.assistantMessageEvent.delta; + } + }); - await agent.prompt(text); - unsub(); + await agent.prompt(text); + unsub(); - if (!responseText) { - responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex); - } + if (!responseText) { + responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex); + } - updateTaskStatus(task.id, "completed", { response: responseText }); - saveSession(agent); + updateTaskStatus(task.id, "completed", { response: responseText }); + saveSession(agent); - res.json({ - jsonrpc: "2.0", - id: req.body.id, - result: { - id: task.id, - status: { state: "completed" }, - artifacts: [ - { - parts: [{ type: "text", text: responseText }], - }, - ], - }, - }); + res.json({ + jsonrpc: "2.0", + id: req.body.id, + result: { + id: task.id, + status: { state: "completed" }, + artifacts: [ + { + parts: [{ type: "text", text: responseText }], + }, + ], + }, + }); + }; + + await context.with(incomingContext, executeSyncTask); } catch (e: any) { agent.abort(); log("error", `A2A task error: ${e.message}`); @@ -297,63 +319,29 @@ export function createA2AServer(agent: Agent): express.Express { } break; case "tool_execution_start": - sendEvent("tool_start", { - toolCallId: event.toolCallId, - toolName: event.toolName, - args: event.args, - }); + sendEvent("tool_start", { toolName: event.toolName }); break; case "tool_execution_end": - sendEvent("tool_end", { - toolCallId: event.toolCallId, - toolName: event.toolName, - isError: event.isError, - }); - break; - case "agent_end": + sendEvent("tool_end", { toolName: event.toolName }); break; } }); - const turnStartIndex = agent.state.messages.length; await agent.prompt(text); unsub(); - - // Extract final response from this turn only - const responseText = extractAssistantTextFromTurn(agent.state.messages as any, turnStartIndex); - - updateTaskStatus(task.id, "completed", { response: responseText }); saveSession(agent); - sendEvent("task_end", { taskId: task.id }); + + updateTaskStatus(task.id, "completed", { response: "(streamed)" }); + sendEvent("task_end", { taskId: task.id, status: "completed" }); res.end(); } catch (e: any) { agent.abort(); log("error", `A2A stream error: ${e.message}`); - try { - res.write(`event: error\ndata: ${JSON.stringify({ message: e.message })}\n\n`); - res.end(); - } catch { - } + res.write(`event: error\ndata: ${JSON.stringify({ error: e.message })}\n\n`); + res.end(); } }); - app.get("/messages", authMiddleware, (_req, res) => { - const messages = agent.state.messages - .filter((m: any) => m.role === "user" || m.role === "assistant") - .map((m: any) => { - const text = typeof m.content === "string" - ? m.content - : Array.isArray(m.content) - ? m.content - .filter((c: any) => c.type === "text") - .map((c: any) => c.text) - .join("") - : ""; - return { role: m.role, text }; - }); - res.json({ messages }); - }); - app.get("/tasks/:id", authMiddleware, (req, res) => { const task = getDb().prepare("SELECT * FROM tasks WHERE id = ?").get(req.params.id); if (!task) { @@ -363,6 +351,48 @@ export function createA2AServer(agent: Agent): express.Express { res.json(task); }); + /** + * POST /tasks/callback + * Called by Nix (or any external agent) when an async subagent job completes. + * Body: { jobId: string, status: 'completed' | 'failed' | 'timed_out', result?: string, error?: string } + */ + app.post("/tasks/callback", authMiddleware, (req, res) => { + const { jobId, status, result, error } = req.body as { + jobId?: string; + status?: string; + result?: string; + error?: string; + }; + + if (!jobId || typeof jobId !== "string") { + res.status(400).json({ error: "Missing or invalid jobId" }); + return; + } + + const validStatuses = ["completed", "failed", "timed_out"] as const; + type CallbackStatus = (typeof validStatuses)[number]; + if (!status || !validStatuses.includes(status as CallbackStatus)) { + res.status(400).json({ error: `Invalid status. Must be one of: ${validStatuses.join(", ")}` }); + return; + } + + let found = false; + try { + found = receiveCallback(jobId, status as CallbackStatus, result, error); + } catch (e: any) { + log("error", `POST /tasks/callback error for job ${jobId}: ${e.message}`); + res.status(500).json({ error: "Internal error processing callback" }); + return; + } + + if (!found) { + res.status(404).json({ error: `Unknown jobId: ${jobId}` }); + return; + } + + res.status(200).json({ ok: true, jobId, status }); + }); + return app; } diff --git a/src/telegram-notify.ts b/src/telegram-notify.ts new file mode 100644 index 0000000..9fbc5a0 --- /dev/null +++ b/src/telegram-notify.ts @@ -0,0 +1,115 @@ +/** + * Telegram notification helpers. + * Extracted here to avoid circular imports between a2a-server.ts and claude-subagent.ts. + */ + +/** + * Format a duration in milliseconds to a human-readable string. + * < 60s β†’ "42s" + * < 1h β†’ "1m 42s" + * >= 1h β†’ "2h 5m" + */ +export function formatDuration(ms: number): string { + const totalSec = Math.round(ms / 1000); + if (totalSec < 60) return `${totalSec}s`; + const minutes = Math.floor(totalSec / 60); + const seconds = totalSec % 60; + if (minutes < 60) return seconds > 0 ? `${minutes}m ${seconds}s` : `${minutes}m`; + const hours = Math.floor(minutes / 60); + const mins = minutes % 60; + return mins > 0 ? `${hours}h ${mins}m` : `${hours}h`; +} + +/** + * Truncate a result string to maxChars, appending "…" if truncated. + */ +export function summarizeResult(text: string, maxChars = 200): string { + if (text.length <= maxChars) return text; + return text.slice(0, maxChars) + "…"; +} + +/** + * Send a raw task update message to all configured Telegram chat IDs. + * Used for mid-task progress messages (🧡 prefix). + */ +export async function relayTaskUpdateToTelegram(taskId: string, message: string): Promise { + if (process.env.NODE_ENV === "test") return; + const token = process.env.TELEGRAM_BOT_TOKEN; + const chatIds = (process.env.TELEGRAM_ALLOWED_USERS || "") + .split(",") + .map((x) => x.trim()) + .filter(Boolean); + + if (!token || chatIds.length === 0) return; + + await Promise.all( + chatIds.map(async (chatId) => { + try { + await fetch(`https://api.telegram.org/bot${token}/sendMessage`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + chat_id: chatId, + text: `🧡 Task ${taskId}: ${message}`, + disable_notification: true, + }), + }); + } catch { + // best effort + } + }) + ); +} + +export interface JobCompletionPayload { + taskLabel: string; + status: "completed" | "failed" | "timed_out"; + durationMs: number; + result?: string; + error?: string; +} + +/** + * Send a completion/failure/timeout notification to Telegram. + * Used by receiveCallback in claude-subagent.ts and the A2A worker handler. + */ +export async function relayJobCompletionToTelegram(payload: JobCompletionPayload): Promise { + if (process.env.NODE_ENV === "test") return; + const token = process.env.TELEGRAM_BOT_TOKEN; + const chatIds = (process.env.TELEGRAM_ALLOWED_USERS || "") + .split(",") + .map((x) => x.trim()) + .filter(Boolean); + + if (!token || chatIds.length === 0) return; + + const duration = formatDuration(payload.durationMs); + let text: string; + + if (payload.status === "completed") { + const summary = summarizeResult(payload.result?.trim() || "(no output)"); + text = `βœ… Task ${payload.taskLabel}: completed in ${duration} β€” ${summary}`; + } else if (payload.status === "timed_out") { + text = `⏱️ Task ${payload.taskLabel}: timed out after ${duration}`; + } else { + text = `❌ Task ${payload.taskLabel}: failed in ${duration} β€” ${payload.error || "unknown error"}`; + } + + await Promise.all( + chatIds.map(async (chatId) => { + try { + await fetch(`https://api.telegram.org/bot${token}/sendMessage`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + chat_id: chatId, + text, + disable_notification: false, + }), + }); + } catch { + // best effort + } + }) + ); +} diff --git a/src/tools/claude-subagent.ts b/src/tools/claude-subagent.ts index d6e3cd3..80b91d6 100644 --- a/src/tools/claude-subagent.ts +++ b/src/tools/claude-subagent.ts @@ -5,6 +5,7 @@ import { writeFileSync, readFileSync, existsSync, mkdirSync } from "fs"; import { join } from "path"; import { log } from "../logger.js"; import { getAgentWeaveSession } from "../agentweave-context.js"; +import { relayJobCompletionToTelegram } from "../telegram-notify.js"; type DelegateJobStatus = "running" | "completed" | "failed" | "timed_out"; @@ -20,6 +21,7 @@ type DelegateJob = { exitCode?: number | null; output: string; error?: string; + silent?: boolean; }; const jobs = new Map(); @@ -91,16 +93,52 @@ function makeCustomHeaders(headers: Record): string { .join("\n"); } -function onJobComplete(job: DelegateJob): void { - const elapsedSec = Math.round(((job.endedAt || Date.now()) - job.startedAt) / 1000); - log( - "info", - `claude_subagent ${job.id} finished: status=${job.status} elapsed=${elapsedSec}s label="${job.taskLabel}"` - ); +/** + * Called when a subagent job reaches a terminal state (completed/failed/timed_out). + * Updates in-memory + disk state, then fires a Telegram notification unless silent. + * Also exported for use by the A2A /tasks/callback endpoint (cross-machine callbacks). + * + * Returns true if the job was found and updated, false if the jobId is unknown. + */ +export function receiveCallback( + jobId: string, + status: "completed" | "failed" | "timed_out", + result?: string, + error?: string +): boolean { + // Prefer in-memory job; fall back to disk (handles cross-restart callbacks) + const job = jobs.get(jobId) ?? loadJobFromDisk(jobId); + if (!job) { + log("warn", `receiveCallback: unknown job_id ${jobId}`); + return false; + } + + job.status = status; + job.endedAt = job.endedAt ?? Date.now(); + if (result !== undefined) job.output = result; + if (error !== undefined) job.error = error; + + // Re-insert into memory map in case it was loaded from disk + jobs.set(jobId, job); persistJob(job); + + const elapsedSec = Math.round((job.endedAt - job.startedAt) / 1000); + log("info", `claude_subagent ${job.id} finished: status=${status} elapsed=${elapsedSec}s label="${job.taskLabel}"`); + + if (!job.silent) { + void relayJobCompletionToTelegram({ + taskLabel: job.taskLabel, + status, + durationMs: job.endedAt - job.startedAt, + result: job.output, + error: job.error, + }); + } + + return true; } -function startClaudeDelegation(prompt: string, taskLabel?: string): DelegateJob { +function startClaudeDelegation(prompt: string, taskLabel?: string, silent?: boolean): DelegateJob { evictOldJobs(); const jobId = crypto.randomUUID(); @@ -141,6 +179,7 @@ function startClaudeDelegation(prompt: string, taskLabel?: string): DelegateJob startedAt: Date.now(), status: "running", output: "", + silent: silent ?? false, }; jobs.set(job.id, job); @@ -164,11 +203,9 @@ function startClaudeDelegation(prompt: string, taskLabel?: string): DelegateJob if (job.status !== "running") return; log("warn", `claude_subagent ${job.id} timed out after ${JOB_TIMEOUT_MS / 1000}s β€” killing`); claude.kill("SIGTERM"); - job.status = "timed_out"; - job.error = `Timed out after ${JOB_TIMEOUT_MS / 1000}s`; job.endedAt = Date.now(); processes.delete(jobId); - onJobComplete(job); + receiveCallback(jobId, "timed_out", undefined, `Timed out after ${JOB_TIMEOUT_MS / 1000}s`); }, JOB_TIMEOUT_MS); claude.stdout.on("data", (chunk: Buffer) => { @@ -181,23 +218,24 @@ function startClaudeDelegation(prompt: string, taskLabel?: string): DelegateJob claude.on("error", (err: Error) => { clearTimeout(timeoutHandle); - job.status = "failed"; - job.error = err.message; job.endedAt = Date.now(); processes.delete(jobId); - onJobComplete(job); + receiveCallback(jobId, "failed", undefined, err.message); }); claude.on("close", (code: number | null) => { clearTimeout(timeoutHandle); // Don't overwrite timed_out status set by the timeout handler - if (job.status === "running") { - job.exitCode = code; - job.endedAt = Date.now(); - job.status = code === 0 ? "completed" : "failed"; - } + if (job.status !== "running") return; + job.exitCode = code; + job.endedAt = Date.now(); processes.delete(jobId); - onJobComplete(job); + receiveCallback( + jobId, + code === 0 ? "completed" : "failed", + job.output, + code !== 0 ? `Exit code ${code}` : undefined + ); }); return job; @@ -221,6 +259,38 @@ function summarizeJob(job: DelegateJob): string { ].join("\n"); } +// ─── Test helpers (exported only for unit tests) ────────────────────────────── + +export interface HeaderParams { + childSessionId: string; + parentSessionId: string; + agentId: string; + taskLabel: string; + proxyToken?: string; +} + +export function _buildAnthropicCustomHeadersForTest(params: HeaderParams): string { + const headers: Record = { + "X-AgentWeave-Session-Id": params.childSessionId, + "X-AgentWeave-Parent-Session-Id": params.parentSessionId, + "X-AgentWeave-Agent-Id": params.agentId, + "X-AgentWeave-Agent-Type": "subagent", + "X-AgentWeave-Task-Label": params.taskLabel, + ...(params.proxyToken ? { "X-AgentWeave-Proxy-Token": params.proxyToken } : {}), + }; + return makeCustomHeaders(headers); +} + +export function _clearJobsForTest(): void { + jobs.clear(); +} + +export function _addJobForTest(job: DelegateJob): void { + jobs.set(job.id, job); +} + +// ─── Tool definition ────────────────────────────────────────────────────────── + export const delegateToClaudeSubagent: AgentTool = { name: "delegate_to_claude_subagent", label: "Delegate to Claude Code Subagent", @@ -233,6 +303,9 @@ export const delegateToClaudeSubagent: AgentTool = { prompt: Type.Optional(Type.String({ description: "Task prompt for action=start" })), task_label: Type.Optional(Type.String({ description: "Optional short label for trace attribution" })), job_id: Type.Optional(Type.String({ description: "Job id for action=status" })), + silent: Type.Optional( + Type.Boolean({ description: "If true, suppresses Telegram notification on completion" }) + ), }), execute: async (_id, params: any) => { const action = String(params.action || "").toLowerCase(); @@ -246,7 +319,7 @@ export const delegateToClaudeSubagent: AgentTool = { }; } - const job = startClaudeDelegation(prompt, params.task_label); + const job = startClaudeDelegation(prompt, params.task_label, params.silent ?? false); return { content: [ { @@ -278,7 +351,6 @@ export const delegateToClaudeSubagent: AgentTool = { }; } - // In-memory first, then fall back to disk (survives restarts) const job = jobs.get(jobId) ?? loadJobFromDisk(jobId); if (!job) { return { @@ -296,51 +368,36 @@ export const delegateToClaudeSubagent: AgentTool = { childSessionId: job.childSessionId, parentSessionId: job.parentSessionId, taskLabel: job.taskLabel, - completed: job.status !== "running", - exitCode: job.exitCode, + elapsedSec: Math.round(((job.endedAt || Date.now()) - job.startedAt) / 1000), }, }; } if (action === "list") { - const all = [...jobs.values()].sort((a, b) => b.startedAt - a.startedAt).slice(0, 10); - const text = all.length - ? all.map((j) => `${j.id} | ${j.status} | ${j.taskLabel}`).join("\n") - : "No claude subagent jobs yet."; + const allJobs = [...jobs.values()].sort((a, b) => b.startedAt - a.startedAt).slice(0, 20); + if (allJobs.length === 0) { + return { + content: [{ type: "text" as const, text: "No claude subagent jobs yet." }], + details: { success: true, count: 0, jobs: [] }, + }; + } + const lines = allJobs.map((j) => { + const elapsedSec = Math.round(((j.endedAt || Date.now()) - j.startedAt) / 1000); + return `${j.id} [${j.status}] ${j.taskLabel} (${elapsedSec}s)`; + }); return { - content: [{ type: "text" as const, text }], - details: { success: true, count: all.length }, + content: [{ type: "text" as const, text: lines.join("\n") }], + details: { + success: true, + count: allJobs.length, + jobs: allJobs.map((j) => ({ id: j.id, status: j.status, taskLabel: j.taskLabel })), + }, }; } return { - content: [{ type: "text" as const, text: `Invalid action: ${action}. Use start, status, or list.` }], + content: [{ type: "text" as const, text: `Invalid action: "${action}". Use start, status, or list` }], details: { success: false }, }; }, }; - -// ─── Test helpers (exported for unit tests only) ────────────────────────────── - -export function _buildAnthropicCustomHeadersForTest(input: { - childSessionId: string; - parentSessionId: string; - agentId: string; - taskLabel: string; - proxyToken?: string; -}): string { - return makeCustomHeaders({ - "X-AgentWeave-Session-Id": input.childSessionId, - "X-AgentWeave-Parent-Session-Id": input.parentSessionId, - "X-AgentWeave-Agent-Id": input.agentId, - "X-AgentWeave-Agent-Type": "subagent", - "X-AgentWeave-Task-Label": input.taskLabel, - ...(input.proxyToken ? { "X-AgentWeave-Proxy-Token": input.proxyToken } : {}), - }); -} - -/** Reset in-memory job store between tests */ -export function _clearJobsForTest(): void { - jobs.clear(); - processes.clear(); -} diff --git a/src/worker.ts b/src/worker.ts index aa5e4c6..3a4bf08 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,4 +1,5 @@ import { parentPort, workerData } from "worker_threads"; +import { context, propagation } from "@opentelemetry/api"; import type { AgentEvent } from "@mariozechner/pi-agent-core"; import { createAgent } from "./agent.js"; import { setAgentWeaveSession, resetAgentWeaveSession } from "./agentweave-context.js"; @@ -22,6 +23,8 @@ interface WorkerTaskData { delegatedSessionId?: string; callerAgentId?: string; taskLabel?: string; + /** Serialized W3C trace headers (e.g. traceparent) from the calling agent. */ + traceHeaders?: Record; } async function postProgress(event: WorkerProgressEvent): Promise { @@ -29,9 +32,16 @@ async function postProgress(event: WorkerProgressEvent): Promise { } async function main() { - const { taskId, text, parentSessionId, delegatedSessionId, callerAgentId, taskLabel } = workerData as WorkerTaskData; + const { taskId, text, parentSessionId, delegatedSessionId, callerAgentId, taskLabel, traceHeaders } = workerData as WorkerTaskData; + + // Re-extract trace context from serialized headers so this worker's spans + // are linked as children of the calling agent's trace. + const incomingContext = traceHeaders + ? propagation.extract(context.active(), traceHeaders) + : context.active(); const AGENTWEAVE_PROXY_TOKEN = process.env.AGENTWEAVE_PROXY_TOKEN; + await context.with(incomingContext, async () => { try { await postProgress({ type: "progress", taskId, message: "Worker started" }); @@ -105,6 +115,7 @@ async function main() { }).catch(() => {}); } } + }); // end context.with } void main(); diff --git a/tests/a2a-callback.test.ts b/tests/a2a-callback.test.ts new file mode 100644 index 0000000..0dc093b --- /dev/null +++ b/tests/a2a-callback.test.ts @@ -0,0 +1,174 @@ +import { describe, it, expect, jest } from "@jest/globals"; + +/** + * Tests for POST /tasks/callback endpoint. + * Tests the HTTP layer: auth gating, input validation, and routing. + */ + +jest.unstable_mockModule("../src/task-journal.js", () => ({ + createTask: jest.fn().mockReturnValue({ id: "task-001", status: "working" }), + updateTaskStatus: jest.fn(), + getRecentTasks: jest.fn().mockReturnValue([]), + getDb: jest.fn().mockReturnValue({ + prepare: jest.fn().mockReturnValue({ + get: jest.fn().mockReturnValue({ id: "known-job-001" }), + all: jest.fn().mockReturnValue([]), + }), + }), +})); + +jest.unstable_mockModule("../src/logger.js", () => ({ log: jest.fn() })); +jest.unstable_mockModule("../src/session.js", () => ({ + saveSession: jest.fn(), + loadSession: jest.fn().mockReturnValue(null), +})); +jest.unstable_mockModule("../src/agentweave-context.js", () => ({ + setAgentWeaveSession: jest.fn(), + resetAgentWeaveSession: jest.fn(), + getAgentWeaveSession: jest.fn().mockReturnValue("max-main"), +})); +jest.unstable_mockModule("../src/response.js", () => ({ + extractAssistantTextFromTurn: jest.fn().mockReturnValue(""), +})); +jest.unstable_mockModule("../src/telegram-notify.js", () => ({ + relayTaskUpdateToTelegram: jest.fn(), + relayJobCompletionToTelegram: jest.fn(), + formatDuration: jest.fn().mockReturnValue("1s"), + summarizeResult: jest.fn().mockImplementation((t: unknown) => t as string), +})); +jest.unstable_mockModule("../src/tools/claude-subagent.js", () => ({ + receiveCallback: jest.fn(), + delegateToClaudeSubagent: {}, + _clearJobsForTest: jest.fn(), + _addJobForTest: jest.fn(), + evictOldJobs: jest.fn(), + truncateOutput: jest.fn().mockImplementation((t: unknown) => t), + MAX_OUTPUT_CHARS: 15000, +})); +jest.unstable_mockModule("worker_threads", () => ({ + Worker: jest.fn().mockImplementation(() => ({ on: jest.fn(), terminate: jest.fn() })), +})); + +const SECRET = "test-secret-callback"; +process.env.A2A_SHARED_SECRET = SECRET; + +const { createA2AServer } = await import("../src/a2a-server.js"); +const { receiveCallback } = await import("../src/tools/claude-subagent.js"); +const receiveCallbackMock = receiveCallback as jest.Mock; + +function makeAgent() { + return { + state: { isStreaming: false, messages: [] }, + subscribe: jest.fn().mockReturnValue(() => {}), + prompt: jest.fn(), + abort: jest.fn(), + } as any; +} + +async function callEndpoint( + app: ReturnType, + method: string, + path: string, + opts: { auth?: string; body?: unknown } = {} +): Promise<{ status: number; body: any }> { + return new Promise((resolve, reject) => { + const server = app.listen(0, "127.0.0.1", () => { + const addr = server.address() as { port: number }; + const url = `http://127.0.0.1:${addr.port}${path}`; + const headers: Record = { "Content-Type": "application/json" }; + if (opts.auth) headers["Authorization"] = opts.auth; + fetch(url, { + method, + headers, + body: opts.body !== undefined ? JSON.stringify(opts.body) : undefined, + }) + .then(async (res) => { + const body = await res.json().catch(() => ({})); + server.close(); + resolve({ status: res.status, body }); + }) + .catch((err) => { server.close(); reject(err); }); + }); + }); +} + +describe("POST /tasks/callback β€” HTTP layer", () => { + it("returns 401 without Authorization header", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + body: { jobId: "known-job-001", status: "completed" }, + }); + expect(res.status).toBe(401); + }); + + it("returns 401 with wrong secret", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + auth: "Bearer wrong-secret", + body: { jobId: "known-job-001", status: "completed" }, + }); + expect(res.status).toBe(401); + }); + + it("returns 400 for missing jobId", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + auth: `Bearer ${SECRET}`, + body: { status: "completed" }, + }); + expect(res.status).toBe(400); + expect(res.body.error).toMatch(/jobId/i); + }); + + it("returns 400 for invalid status value", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + auth: `Bearer ${SECRET}`, + body: { jobId: "known-job-001", status: "bogus" }, + }); + expect(res.status).toBe(400); + expect(res.body.error).toMatch(/status/i); + }); + + it("accepts timed_out as a valid status", async () => { + receiveCallbackMock.mockReturnValueOnce(true); + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + auth: `Bearer ${SECRET}`, + body: { jobId: "timed-out-job", status: "timed_out" }, + }); + expect(res.status).not.toBe(400); + expect(res.status).not.toBe(401); + }); + + it("returns 200 with ok=true for a known job", async () => { + receiveCallbackMock.mockReturnValueOnce(true); + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + auth: `Bearer ${SECRET}`, + body: { jobId: "known-job-001", status: "completed", result: "all done" }, + }); + expect(res.status).toBe(200); + expect(res.body.ok).toBe(true); + expect(res.body.jobId).toBe("known-job-001"); + }); + + it("returns 404 for unknown jobId", async () => { + receiveCallbackMock.mockReturnValueOnce(false); + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks/callback", { + auth: `Bearer ${SECRET}`, + body: { jobId: "nonexistent-job-xyz", status: "failed", error: "something broke" }, + }); + expect(res.status).toBe(404); + }); +}); + +describe("Agent card capabilities", () => { + it("advertises callback_endpoint=true", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "GET", "/.well-known/agent.json"); + expect(res.status).toBe(200); + expect(res.body.capabilities?.callback_endpoint).toBe(true); + }); +}); diff --git a/tests/a2a-trace-propagation.test.ts b/tests/a2a-trace-propagation.test.ts new file mode 100644 index 0000000..e473348 --- /dev/null +++ b/tests/a2a-trace-propagation.test.ts @@ -0,0 +1,195 @@ +import { describe, it, expect, jest } from "@jest/globals"; +import { propagation, context } from "@opentelemetry/api"; + +/** + * Tests for W3C traceparent propagation in A2A server. + * + * Two layers tested: + * 1. Unit: propagation.extract() correctly deserialises incoming headers into a context + * that can be re-injected β€” this is the core mechanism used in a2a-server.ts and worker.ts + * 2. HTTP smoke: the /tasks endpoint accepts requests with traceparent headers and + * does not error out + * + * Note: asserting that a span is *actually* recorded as a child in an OTLP exporter + * requires a running tracer provider (covered in tracing.test.ts). These tests + * focus on the serialisation/deserialisation round-trip and HTTP contract. + */ + +jest.unstable_mockModule("../src/task-journal.js", () => ({ + createTask: jest.fn().mockReturnValue({ id: "task-001", status: "working" }), + updateTaskStatus: jest.fn(), + getRecentTasks: jest.fn().mockReturnValue([]), + getDb: jest.fn().mockReturnValue({ + prepare: jest.fn().mockReturnValue({ + get: jest.fn().mockReturnValue({ id: "known-job" }), + all: jest.fn().mockReturnValue([]), + }), + }), +})); +jest.unstable_mockModule("../src/logger.js", () => ({ log: jest.fn() })); +jest.unstable_mockModule("../src/session.js", () => ({ + saveSession: jest.fn(), + loadSession: jest.fn().mockReturnValue(null), +})); +jest.unstable_mockModule("../src/agentweave-context.js", () => ({ + setAgentWeaveSession: jest.fn(), + resetAgentWeaveSession: jest.fn(), + getAgentWeaveSession: jest.fn().mockReturnValue("max-main"), +})); +jest.unstable_mockModule("../src/response.js", () => ({ + extractAssistantTextFromTurn: jest.fn().mockReturnValue(""), +})); +jest.unstable_mockModule("../src/telegram-notify.js", () => ({ + relayTaskUpdateToTelegram: jest.fn(), + relayJobCompletionToTelegram: jest.fn(), +})); +jest.unstable_mockModule("../src/tools/claude-subagent.js", () => ({ + receiveCallback: jest.fn(), + delegateToClaudeSubagent: {}, +})); +jest.unstable_mockModule("worker_threads", () => ({ + Worker: jest.fn().mockImplementation(() => ({ on: jest.fn(), terminate: jest.fn() })), +})); + +const SECRET = "test-trace-secret"; +process.env.A2A_SHARED_SECRET = SECRET; + +const { createA2AServer } = await import("../src/a2a-server.js"); + +function makeAgent() { + return { + state: { isStreaming: false, messages: [] }, + subscribe: jest.fn().mockReturnValue(() => {}), + prompt: jest.fn(), + abort: jest.fn(), + } as any; +} + +async function callEndpoint( + app: ReturnType, + method: string, + path: string, + opts: { auth?: string; body?: unknown; headers?: Record } = {} +): Promise<{ status: number; body: any }> { + return new Promise((resolve, reject) => { + const server = app.listen(0, "127.0.0.1", () => { + const addr = server.address() as { port: number }; + const url = `http://127.0.0.1:${addr.port}${path}`; + const headers: Record = { "Content-Type": "application/json", ...opts.headers }; + if (opts.auth) headers["Authorization"] = opts.auth; + fetch(url, { method, headers, body: opts.body !== undefined ? JSON.stringify(opts.body) : undefined }) + .then(async (res) => { const body = await res.json().catch(() => ({})); server.close(); resolve({ status: res.status, body }); }) + .catch((err) => { server.close(); reject(err); }); + }); + }); +} + +// ─── Unit: traceparent round-trip ───────────────────────────────────────────── +// These tests verify the serialise β†’ extract β†’ re-inject round-trip that +// a2a-server.ts and worker.ts both rely on. + +describe("traceparent round-trip (propagation unit)", () => { + it("extract then re-inject preserves traceparent value", () => { + const traceparent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"; + const incomingHeaders = { traceparent }; + + const extracted = propagation.extract(context.active(), incomingHeaders); + + const reinjected: Record = {}; + propagation.inject(extracted, reinjected); + + // If a real propagator is registered the header is preserved. + // With the no-op propagator (default in tests) the header is absent β€” + // that's expected and is why we register a real provider in tracing.test.ts. + // Either way the round-trip must not throw. + expect(() => propagation.extract(context.active(), reinjected)).not.toThrow(); + }); + + it("extract with missing traceparent returns active context (no-op)", () => { + const extracted = propagation.extract(context.active(), {}); + const reinjected: Record = {}; + propagation.inject(extracted, reinjected); + // No traceparent header β€” no exception, empty inject output + expect(Object.keys(reinjected).length).toBe(0); + }); + + it("extract with malformed traceparent does not throw", () => { + expect(() => + propagation.extract(context.active(), { traceparent: "not-valid" }) + ).not.toThrow(); + }); + + it("traceHeaders injected into workerData can be re-extracted in worker thread", () => { + // Simulate what a2a-server.ts does: extract from req.headers β†’ inject into traceHeaders + const traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01"; + const incomingContext = propagation.extract(context.active(), { traceparent }); + const traceHeaders: Record = {}; + propagation.inject(incomingContext, traceHeaders); + + // Simulate what worker.ts does: re-extract from traceHeaders + expect(() => propagation.extract(context.active(), traceHeaders)).not.toThrow(); + }); +}); + +// ─── HTTP: /tasks endpoint accepts traceparent ──────────────────────────────── + +const taskBody = (id: string) => ({ + id, + params: { message: { parts: [{ type: "text", text: "test task" }] } }, +}); + +describe("POST /tasks β€” traceparent HTTP acceptance", () => { + it("returns 202 with valid traceparent header", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks", { + auth: `Bearer ${SECRET}`, + headers: { traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" }, + body: taskBody("req-1"), + }); + expect(res.status).toBe(202); + }); + + it("returns 202 without traceparent header (backwards compatible)", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks", { + auth: `Bearer ${SECRET}`, + body: taskBody("req-2"), + }); + expect(res.status).toBe(202); + }); + + it("returns 202 with malformed traceparent (graceful degradation)", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks", { + auth: `Bearer ${SECRET}`, + headers: { traceparent: "bad-value" }, + body: taskBody("req-3"), + }); + expect(res.status).toBe(202); + }); + + it("returns 202 with both traceparent and AgentWeave headers", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks", { + auth: `Bearer ${SECRET}`, + headers: { + traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", + "x-agentweave-parent-session-id": "nix-session-123", + "x-agentweave-agent-id": "nix-v1", + "x-agentweave-task-label": "delegation-from-nix", + }, + body: taskBody("req-4"), + }); + expect(res.status).toBe(202); + }); + + it("task result includes working state for async task", async () => { + const app = createA2AServer(makeAgent()); + const res = await callEndpoint(app, "POST", "/tasks", { + auth: `Bearer ${SECRET}`, + headers: { traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01" }, + body: taskBody("req-5"), + }); + expect(res.body.result?.status?.state).toBe("working"); + }); +}); diff --git a/tests/telegram-notify.test.ts b/tests/telegram-notify.test.ts new file mode 100644 index 0000000..e2e1d5a --- /dev/null +++ b/tests/telegram-notify.test.ts @@ -0,0 +1,54 @@ +import { describe, it, expect } from "@jest/globals"; +import { formatDuration, summarizeResult } from "../src/telegram-notify.js"; + +describe("formatDuration", () => { + it("returns 0s for 0ms", () => { + expect(formatDuration(0)).toBe("0s"); + }); + + it("returns seconds for < 60s", () => { + expect(formatDuration(30000)).toBe("30s"); + // 59999ms rounds to 60s which crosses the minute threshold β†’ "1m" + expect(formatDuration(59999)).toBe("1m"); + expect(formatDuration(1000)).toBe("1s"); + }); + + it("returns m + s for < 1h", () => { + expect(formatDuration(90000)).toBe("1m 30s"); + expect(formatDuration(60000)).toBe("1m"); + expect(formatDuration(3599000)).toBe("59m 59s"); + }); + + it("returns h + m for >= 1h", () => { + expect(formatDuration(3661000)).toBe("1h 1m"); + expect(formatDuration(7200000)).toBe("2h"); + expect(formatDuration(7500000)).toBe("2h 5m"); + }); +}); + +describe("summarizeResult", () => { + it("passes through short strings unchanged", () => { + expect(summarizeResult("hello")).toBe("hello"); + expect(summarizeResult("")).toBe(""); + }); + + it("passes through exactly 200 chars unchanged", () => { + const s = "a".repeat(200); + expect(summarizeResult(s)).toBe(s); + expect(summarizeResult(s).length).toBe(200); + }); + + it("truncates strings over 200 chars with ellipsis", () => { + const s = "a".repeat(201); + const result = summarizeResult(s); + expect(result.endsWith("…")).toBe(true); + expect(result.length).toBe(201); // 200 chars + "…" + }); + + it("respects custom maxChars", () => { + const s = "abcdefghij"; // 10 chars + expect(summarizeResult(s, 5)).toBe("abcde…"); + expect(summarizeResult(s, 10)).toBe(s); + expect(summarizeResult(s, 11)).toBe(s); + }); +}); diff --git a/tests/worker.test.ts b/tests/worker.test.ts index 43f66a6..d578c22 100644 --- a/tests/worker.test.ts +++ b/tests/worker.test.ts @@ -99,3 +99,43 @@ describe("Telegram relay message format", () => { ); }); }); + +// ─── Silent flag β€” DelegateJob.silent field ─────────────────────────────────── + +import { _clearJobsForTest, _addJobForTest } from "../src/tools/claude-subagent.js"; + +describe("DelegateJob silent flag", () => { + it("silent=true is preserved in job object", () => { + _clearJobsForTest(); + const job = { + id: "silent-job-test", + taskLabel: "silent task", + prompt: "do something quietly", + childSessionId: "c1", + parentSessionId: "p1", + startedAt: Date.now() - 1000, + status: "running" as const, + output: "", + silent: true, + }; + _addJobForTest(job); + expect(job.silent).toBe(true); + }); + + it("silent=false is preserved in job object", () => { + _clearJobsForTest(); + const job = { + id: "noisy-job-test", + taskLabel: "noisy task", + prompt: "do something loudly", + childSessionId: "c2", + parentSessionId: "p2", + startedAt: Date.now() - 1000, + status: "running" as const, + output: "", + silent: false, + }; + _addJobForTest(job); + expect(job.silent).toBe(false); + }); +});