diff --git a/@blaxel/core/src/agents/index.ts b/@blaxel/core/src/agents/index.ts index 200ea13f..0426bbaa 100644 --- a/@blaxel/core/src/agents/index.ts +++ b/@blaxel/core/src/agents/index.ts @@ -4,9 +4,8 @@ import { Agent, getAgent } from "../client/index.js"; import { getForcedUrl, getGlobalUniqueHash } from "../common/internal.js"; import { logger } from "../common/logger.js"; import { settings } from "../common/settings.js"; -import { startSpan } from '../telemetry/telemetry.js'; -class BlAgent { +export class BlAgent { agentName: string; constructor(agentName: string) { this.agentName = agentName; @@ -79,39 +78,22 @@ class BlAgent { ): Promise { logger.debug(`Agent Calling: ${this.agentName}`); - const span = startSpan(this.agentName, { - attributes: { - "agent.name": this.agentName, - "agent.args": JSON.stringify(input), - "span.type": "agent.run", - }, - isRoot: false, - }); - try { const response = await this.call(this.url, input, headers, params); - span.setAttribute("agent.run.result", await response.text()); return await response.text(); } catch (err: unknown) { if (err instanceof Error) { if (!this.fallbackUrl) { - span.setAttribute("agent.run.error", err.stack as string); throw err; } try { const response = await this.call(this.fallbackUrl, input, headers, params); - span.setAttribute("agent.run.result", await response.text()); return await response.text(); } catch (err: unknown) { - if (err instanceof Error) { - span.setAttribute("agent.run.error", err.stack as string); - } throw err; } } throw err; - } finally { - span.end(); } } } diff --git a/@blaxel/core/src/jobs/jobs.ts b/@blaxel/core/src/jobs/jobs.ts index 51067f98..78f6ca89 100644 --- a/@blaxel/core/src/jobs/jobs.ts +++ b/@blaxel/core/src/jobs/jobs.ts @@ -8,9 +8,8 @@ import { } from "../client/index.js"; import { logger } from "../common/logger.js"; import { settings } from "../common/settings.js"; -import { startSpan } from "../telemetry/telemetry.js"; -class BlJob { +export class BlJob { jobName: string; constructor(jobName: string) { this.jobName = jobName; @@ -26,32 +25,15 @@ class BlJob { ): Promise { logger.debug(`Job Calling: ${this.jobName}`); - const span = startSpan(this.jobName, { - attributes: { - "job.name": this.jobName, - "span.type": "job.run", - }, - isRoot: false, - }); + const request: CreateJobExecutionRequest = { + tasks, + ...(options?.env && { env: options.env }), + ...(options?.memory && { memory: options.memory }), + ...(options?.executionId && { executionId: options.executionId }), + }; - try { - const request: CreateJobExecutionRequest = { - tasks, - ...(options?.env && { env: options.env }), - ...(options?.memory && { memory: options.memory }), - ...(options?.executionId && { executionId: options.executionId }), - }; - - const executionId = await this.createExecution(request); - return executionId; - } catch (err: unknown) { - if (err instanceof Error) { - span.setAttribute("job.run.error", err.stack as string); - } - throw err; - } finally { - span.end(); - } + const executionId = await this.createExecution(request); + return executionId; } /** diff --git a/@blaxel/core/src/mcp/server.ts b/@blaxel/core/src/mcp/server.ts index 7a4cbebf..db22832b 100644 --- a/@blaxel/core/src/mcp/server.ts +++ b/@blaxel/core/src/mcp/server.ts @@ -3,8 +3,6 @@ import { v4 as uuidv4 } from "uuid"; import WebSocket, { WebSocketServer } from "ws"; import { env } from "../common/env.js"; import { logger } from "../common/logger.js"; -import { BlaxelSpan, startSpan } from "../telemetry/telemetry.js"; -const spans = new Map(); interface JSONRPCMessage { jsonrpc: "2.0"; @@ -20,7 +18,7 @@ export class BlaxelMcpServerTransport implements Transport { onclose?: () => void; onerror?: (err: Error) => void; - private messageHandler?: (msg: JSONRPCMessage, clientId: string) => void; + private messageHandler?: (msg: JSONRPCMessage, clientId: string) => void | Promise; onconnection?: (clientId: string) => void; ondisconnection?: (clientId: string) => void; @@ -52,27 +50,10 @@ export class BlaxelMcpServerTransport implements Transport { }); this.onconnection?.(clientId); - ws.on("message", (data: Buffer) => { - const span = startSpan("message", { - attributes: { - "mcp.client.id": clientId, - "span.type": "mcp.message", - }, - isRoot: false, - }); - + ws.on("message", async (data: Buffer) => { try { const msg = JSON.parse(data.toString()) as JSONRPCMessage; - this.messageHandler?.(msg, clientId); - if ("method" in msg && "id" in msg && "params" in msg) { - span.setAttributes({ - "mcp.message.parsed": true, - "mcp.method": msg.method as string, - "mcp.messageId": msg.id as string, - "mcp.toolName": msg.params?.name as string, - }); - spans.set(clientId + ":" + msg.id, span); - } + await this.messageHandler?.(msg, clientId); // Handle msg.id safely const msgId = msg.id ? String(msg.id) : ""; @@ -82,40 +63,19 @@ export class BlaxelMcpServerTransport implements Transport { // Use optional chaining for safe access const client = this.clients.get(cId ?? ""); if (client?.ws?.readyState === WebSocket.OPEN) { - const msgSpan = spans.get(cId + ":" + (msg.id ?? "")); - try { - client.ws.send(JSON.stringify(msg)); - if (msgSpan) { - msgSpan.setAttributes({ - "mcp.message.response_sent": true, - }); - } - } catch (err) { - if (msgSpan) { - msgSpan.setStatus("error"); // Error status - msgSpan.recordException(err as Error); - } - throw err; - } finally { - if (msgSpan) { - msgSpan.end(); - } - } + client.ws.send(JSON.stringify(msg)); } else { this.clients.delete(cId); this.ondisconnection?.(cId); } } catch (err: unknown) { if (err instanceof Error) { - span.setStatus("error"); // Error status - span.recordException(err); this.onerror?.(err); } else { this.onerror?.( new Error(`Failed to parse message: ${String(err)}`) ); } - span.end(); } }); @@ -141,26 +101,7 @@ export class BlaxelMcpServerTransport implements Transport { // Send to specific client const client = this.clients.get(cId); if (client?.ws?.readyState === WebSocket.OPEN) { - const msgSpan = spans.get(cId + ":" + msg.id); - - try { - client.ws.send(data); - if (msgSpan) { - msgSpan.setAttributes({ - "mcp.message.response_sent": true, - }); - } - } catch (err) { - if (msgSpan) { - msgSpan.setStatus("error"); // Error status - msgSpan.recordException(err as Error); - } - throw err; - } finally { - if (msgSpan) { - msgSpan.end(); - } - } + client.ws.send(data); } else { this.clients.delete(cId); this.ondisconnection?.(cId); diff --git a/@blaxel/core/src/tools/index.ts b/@blaxel/core/src/tools/index.ts index 2fa73d81..2f4d2a58 100644 --- a/@blaxel/core/src/tools/index.ts +++ b/@blaxel/core/src/tools/index.ts @@ -4,6 +4,7 @@ import { getForcedUrl } from "../common/internal.js"; import { getMcpTool, ToolOptions } from "./mcpTool.js"; import { Tool } from "./types.js"; +export { McpTool } from "./mcpTool.js"; export type { ToolOptions }; export const getTool = async (name: string, options?: number | ToolOptions): Promise => { diff --git a/@blaxel/core/src/tools/mcpTool.ts b/@blaxel/core/src/tools/mcpTool.ts index f47d6004..920b5bf2 100644 --- a/@blaxel/core/src/tools/mcpTool.ts +++ b/@blaxel/core/src/tools/mcpTool.ts @@ -6,7 +6,6 @@ import { logger } from "../common/logger.js"; import { settings } from "../common/settings.js"; import { authenticate, SandboxInstance } from "../index.js"; import { BlaxelMcpClientTransport } from "../mcp/client.js"; -import { startSpan } from "../telemetry/telemetry.js"; import { Tool } from "./types.js"; import { FunctionSchema, schemaToZodSchema } from "./zodSchema.js"; @@ -162,52 +161,31 @@ export class McpTool { } async listTools(): Promise { - const span = startSpan(this.name, { - attributes: { - "span.type": "tool.list", - }, - }); - try { - logger.debug(`MCP:${this.name}:Listing tools`); - await this.start(); - const { tools } = (await this.client.listTools()) as { - tools: Array<{ - name: string; - description: string; - inputSchema: FunctionSchema; - }>; + logger.debug(`MCP:${this.name}:Listing tools`); + await this.start(); + const { tools } = (await this.client.listTools()) as { + tools: Array<{ + name: string; + description: string; + inputSchema: FunctionSchema; + }>; + }; + await this.close(); + const result = tools.map((tool) => { + return { + name: tool.name, + description: tool.description, + inputSchema: schemaToZodSchema(tool.inputSchema), + originalSchema: tool.inputSchema, + call: (input: Record | undefined) => { + return this.call(tool.name, input); + }, }; - await this.close(); - const result = tools.map((tool) => { - return { - name: tool.name, - description: tool.description, - inputSchema: schemaToZodSchema(tool.inputSchema), - originalSchema: tool.inputSchema, - call: (input: Record | undefined) => { - return this.call(tool.name, input); - }, - }; - }); - span.setAttribute("tool.list.result", JSON.stringify(result)); - return result; - } catch (err) { - span.setStatus("error"); - span.recordException(err as Error); - throw err; - } finally { - span.end(); - } + }); + return result; } async call(toolName: string, args: Record | undefined): Promise { - const span = startSpan(this.name + "." + toolName, { - attributes: { - "span.type": "tool.call", - "tool.name": toolName, - "tool.args": JSON.stringify(args), - }, - }); try { logger.debug( `MCP:${this.name}:Tool calling`, @@ -230,7 +208,6 @@ export class McpTool { JSON.stringify(args), // result ); - span.setAttribute("tool.call.result", JSON.stringify(result)); return result; } catch (err: unknown) { if (err instanceof Error) { @@ -243,8 +220,6 @@ export class McpTool { }); } throw err; - } finally { - span.end(); } } diff --git a/@blaxel/telemetry/src/instrumentation/blaxel_core.ts b/@blaxel/telemetry/src/instrumentation/blaxel_core.ts new file mode 100644 index 00000000..3a264406 --- /dev/null +++ b/@blaxel/telemetry/src/instrumentation/blaxel_core.ts @@ -0,0 +1,231 @@ +import { + BlAgent, + BlaxelMcpServerTransport, + BlJob, + McpTool, + startSpan, +} from "@blaxel/core"; + +/** + * Monkey-patches BlAgent.prototype.run to wrap with telemetry spans. + */ +function patchBlAgent() { + const origRun = BlAgent.prototype.run; + + BlAgent.prototype.run = async function ( + input: Record | string | undefined, + headers: Record = {}, + params: Record = {} + ): Promise { + const span = startSpan(this.agentName, { + attributes: { + "agent.name": this.agentName, + "agent.args": JSON.stringify(input), + "span.type": "agent.run", + }, + isRoot: false, + }); + + try { + const result = await origRun.call(this, input, headers, params); + span.setAttribute("agent.run.result", result); + return result; + } catch (err: unknown) { + if (err instanceof Error) { + span.setAttribute("agent.run.error", err.stack as string); + } + throw err; + } finally { + span.end(); + } + }; +} + +/** + * Monkey-patches BlJob.prototype.run to wrap with telemetry spans. + */ +function patchBlJob() { + const origRun = BlJob.prototype.run; + + BlJob.prototype.run = async function ( + tasks: Record[], + options?: { + env?: Record; + memory?: number; + executionId?: string; + } + ): Promise { + const span = startSpan(this.jobName, { + attributes: { + "job.name": this.jobName, + "span.type": "job.run", + }, + isRoot: false, + }); + + try { + const result = await origRun.call(this, tasks, options); + return result; + } catch (err: unknown) { + if (err instanceof Error) { + span.setAttribute("job.run.error", err.stack as string); + } + throw err; + } finally { + span.end(); + } + }; +} + +/** + * Monkey-patches McpTool.prototype.listTools and McpTool.prototype.call + * to wrap with telemetry spans. + */ +function patchMcpTool() { + const origListTools = McpTool.prototype.listTools; + const origCall = McpTool.prototype.call; + + McpTool.prototype.listTools = async function () { + const span = startSpan((this as any).name, { + attributes: { + "span.type": "tool.list", + }, + }); + try { + const result = await origListTools.call(this); + span.setAttribute("tool.list.result", JSON.stringify(result)); + return result; + } catch (err) { + span.setStatus("error"); + span.recordException(err as Error); + throw err; + } finally { + span.end(); + } + }; + + McpTool.prototype.call = async function ( + toolName: string, + args: Record | undefined + ) { + const span = startSpan((this as any).name + "." + toolName, { + attributes: { + "span.type": "tool.call", + "tool.name": toolName, + "tool.args": JSON.stringify(args), + }, + }); + try { + const result = await origCall.call(this, toolName, args); + span.setAttribute("tool.call.result", JSON.stringify(result)); + return result; + } catch (err: unknown) { + span.setStatus("error"); + span.recordException(err as Error); + throw err; + } finally { + span.end(); + } + }; +} + +/** + * Monkey-patches BlaxelMcpServerTransport to add telemetry spans + * for incoming messages (via onmessage setter) and outgoing messages (via send). + */ +function patchMcpServer() { + // Patch the onmessage setter to wrap the handler with span tracking + const origDescriptor = Object.getOwnPropertyDescriptor( + BlaxelMcpServerTransport.prototype, + "onmessage" + ); + + if (origDescriptor?.set) { + const origSetter = origDescriptor.set; + + Object.defineProperty(BlaxelMcpServerTransport.prototype, "onmessage", { + ...origDescriptor, + set( + handler: ((message: any) => void) | undefined + ) { + if (handler) { + const tracedHandler = async (message: any) => { + const messageId = message.id ? String(message.id) : ""; + const [clientId] = messageId.includes(":") + ? messageId.split(":") + : [undefined]; + + const span = startSpan("mcp.message", { + attributes: { + "span.type": "mcp.message", + ...(clientId ? { "mcp.client.id": clientId } : {}), + ...((message.method as string) + ? { "mcp.method": message.method as string } + : {}), + ...((message.params as Record)?.name + ? { + "mcp.toolName": ( + message.params as Record + ).name as string, + } + : {}), + }, + isRoot: false, + }); + + try { + await Promise.resolve(handler(message)); + } catch (err) { + span.setStatus("error"); + span.recordException(err as Error); + throw err; + } finally { + span.end(); + } + }; + origSetter.call(this, tracedHandler); + } else { + origSetter.call(this, handler); + } + }, + }); + } + + // Patch the send method to wrap with span tracking + const origSend = BlaxelMcpServerTransport.prototype.send; + + BlaxelMcpServerTransport.prototype.send = async function ( + msg: any + ): Promise { + const span = startSpan("mcp.send", { + attributes: { + "span.type": "mcp.send", + }, + isRoot: false, + }); + + try { + await origSend.call(this, msg as never); + span.setAttributes({ + "mcp.message.response_sent": true, + }); + } catch (err) { + span.setStatus("error"); + span.recordException(err as Error); + throw err; + } finally { + span.end(); + } + }; +} + +/** + * Instruments all @blaxel/core classes with telemetry via monkey patching. + * This should be called during telemetry initialization. + */ +export function instrumentBlaxelCore() { + patchBlAgent(); + patchBlJob(); + patchMcpTool(); + patchMcpServer(); +} diff --git a/@blaxel/telemetry/src/telemetry.ts b/@blaxel/telemetry/src/telemetry.ts index 7af031b1..96fab723 100644 --- a/@blaxel/telemetry/src/telemetry.ts +++ b/@blaxel/telemetry/src/telemetry.ts @@ -28,6 +28,7 @@ import { createMetricExporter, createTraceExporter } from "./auth_refresh_exporters"; +import { instrumentBlaxelCore } from "./instrumentation/blaxel_core"; import { OtelTelemetryProvider } from "./telemetry_provider"; export class BlaxelResource implements Resource { @@ -244,6 +245,9 @@ class TelemetryManager { registerInstrumentations({ instrumentations: [httpInstrumentation], }); + + // Instrument @blaxel/core classes via monkey patching + instrumentBlaxelCore(); } setExporters() {