Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions @blaxel/core/src/agents/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import { Agent, getAgent } from "../client/index.js";
import { getForcedUrl, getGlobalUniqueHash } from "../common/internal.js";
import { logger } from "../common/logger.js";
import { settings } from "../common/settings.js";
import { startSpan } from '../telemetry/telemetry.js';

class BlAgent {
export class BlAgent {
agentName: string;
constructor(agentName: string) {
this.agentName = agentName;
Expand Down Expand Up @@ -79,39 +78,22 @@ class BlAgent {
): Promise<string> {
logger.debug(`Agent Calling: ${this.agentName}`);

const span = startSpan(this.agentName, {
attributes: {
"agent.name": this.agentName,
"agent.args": JSON.stringify(input),
"span.type": "agent.run",
},
isRoot: false,
});

try {
const response = await this.call(this.url, input, headers, params);
span.setAttribute("agent.run.result", await response.text());
return await response.text();
} catch (err: unknown) {
if (err instanceof Error) {
if (!this.fallbackUrl) {
span.setAttribute("agent.run.error", err.stack as string);
throw err;
}
try {
const response = await this.call(this.fallbackUrl, input, headers, params);
span.setAttribute("agent.run.result", await response.text());
return await response.text();
} catch (err: unknown) {
if (err instanceof Error) {
span.setAttribute("agent.run.error", err.stack as string);
}
throw err;
}
Comment thread
mendral-app[bot] marked this conversation as resolved.
}
throw err;
} finally {
span.end();
}
}
}
Expand Down
36 changes: 9 additions & 27 deletions @blaxel/core/src/jobs/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import {
} from "../client/index.js";
import { logger } from "../common/logger.js";
import { settings } from "../common/settings.js";
import { startSpan } from "../telemetry/telemetry.js";

class BlJob {
export class BlJob {
jobName: string;
constructor(jobName: string) {
this.jobName = jobName;
Expand All @@ -26,32 +25,15 @@ class BlJob {
): Promise<string> {
logger.debug(`Job Calling: ${this.jobName}`);

const span = startSpan(this.jobName, {
attributes: {
"job.name": this.jobName,
"span.type": "job.run",
},
isRoot: false,
});
const request: CreateJobExecutionRequest = {
tasks,
...(options?.env && { env: options.env }),
...(options?.memory && { memory: options.memory }),
...(options?.executionId && { executionId: options.executionId }),
};

try {
const request: CreateJobExecutionRequest = {
tasks,
...(options?.env && { env: options.env }),
...(options?.memory && { memory: options.memory }),
...(options?.executionId && { executionId: options.executionId }),
};

const executionId = await this.createExecution(request);
return executionId;
} catch (err: unknown) {
if (err instanceof Error) {
span.setAttribute("job.run.error", err.stack as string);
}
throw err;
} finally {
span.end();
}
const executionId = await this.createExecution(request);
return executionId;
}

/**
Expand Down
69 changes: 5 additions & 64 deletions @blaxel/core/src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import { v4 as uuidv4 } from "uuid";
import WebSocket, { WebSocketServer } from "ws";
import { env } from "../common/env.js";
import { logger } from "../common/logger.js";
import { BlaxelSpan, startSpan } from "../telemetry/telemetry.js";
const spans = new Map<string, BlaxelSpan>();

interface JSONRPCMessage {
jsonrpc: "2.0";
Expand All @@ -20,7 +18,7 @@ export class BlaxelMcpServerTransport implements Transport {

onclose?: () => void;
onerror?: (err: Error) => void;
private messageHandler?: (msg: JSONRPCMessage, clientId: string) => void;
private messageHandler?: (msg: JSONRPCMessage, clientId: string) => void | Promise<void>;
onconnection?: (clientId: string) => void;
ondisconnection?: (clientId: string) => void;

Expand Down Expand Up @@ -52,27 +50,10 @@ export class BlaxelMcpServerTransport implements Transport {
});
this.onconnection?.(clientId);

ws.on("message", (data: Buffer) => {
const span = startSpan("message", {
attributes: {
"mcp.client.id": clientId,
"span.type": "mcp.message",
},
isRoot: false,
});

ws.on("message", async (data: Buffer) => {
try {
const msg = JSON.parse(data.toString()) as JSONRPCMessage;
this.messageHandler?.(msg, clientId);
if ("method" in msg && "id" in msg && "params" in msg) {
span.setAttributes({
"mcp.message.parsed": true,
"mcp.method": msg.method as string,
"mcp.messageId": msg.id as string,
"mcp.toolName": msg.params?.name as string,
});
spans.set(clientId + ":" + msg.id, span);
}
await this.messageHandler?.(msg, clientId);

// Handle msg.id safely
const msgId = msg.id ? String(msg.id) : "";
Expand All @@ -82,40 +63,19 @@ export class BlaxelMcpServerTransport implements Transport {
// Use optional chaining for safe access
const client = this.clients.get(cId ?? "");
if (client?.ws?.readyState === WebSocket.OPEN) {
const msgSpan = spans.get(cId + ":" + (msg.id ?? ""));
try {
client.ws.send(JSON.stringify(msg));
if (msgSpan) {
msgSpan.setAttributes({
"mcp.message.response_sent": true,
});
}
} catch (err) {
if (msgSpan) {
msgSpan.setStatus("error"); // Error status
msgSpan.recordException(err as Error);
}
throw err;
} finally {
if (msgSpan) {
msgSpan.end();
}
}
client.ws.send(JSON.stringify(msg));
} else {
this.clients.delete(cId);
this.ondisconnection?.(cId);
}
} catch (err: unknown) {
if (err instanceof Error) {
span.setStatus("error"); // Error status
span.recordException(err);
this.onerror?.(err);
} else {
this.onerror?.(
new Error(`Failed to parse message: ${String(err)}`)
);
}
span.end();
}
});

Expand All @@ -141,26 +101,7 @@ export class BlaxelMcpServerTransport implements Transport {
// Send to specific client
const client = this.clients.get(cId);
if (client?.ws?.readyState === WebSocket.OPEN) {
const msgSpan = spans.get(cId + ":" + msg.id);

try {
client.ws.send(data);
if (msgSpan) {
msgSpan.setAttributes({
"mcp.message.response_sent": true,
});
}
} catch (err) {
if (msgSpan) {
msgSpan.setStatus("error"); // Error status
msgSpan.recordException(err as Error);
}
throw err;
} finally {
if (msgSpan) {
msgSpan.end();
}
}
client.ws.send(data);
} else {
this.clients.delete(cId);
this.ondisconnection?.(cId);
Expand Down
1 change: 1 addition & 0 deletions @blaxel/core/src/tools/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { getForcedUrl } from "../common/internal.js";
import { getMcpTool, ToolOptions } from "./mcpTool.js";
import { Tool } from "./types.js";

export { McpTool } from "./mcpTool.js";
export type { ToolOptions };

export const getTool = async (name: string, options?: number | ToolOptions): Promise<Tool[]> => {
Expand Down
67 changes: 21 additions & 46 deletions @blaxel/core/src/tools/mcpTool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { logger } from "../common/logger.js";
import { settings } from "../common/settings.js";
import { authenticate, SandboxInstance } from "../index.js";
import { BlaxelMcpClientTransport } from "../mcp/client.js";
import { startSpan } from "../telemetry/telemetry.js";
import { Tool } from "./types.js";
import { FunctionSchema, schemaToZodSchema } from "./zodSchema.js";

Expand Down Expand Up @@ -162,52 +161,31 @@ export class McpTool {
}

async listTools(): Promise<Tool[]> {
const span = startSpan(this.name, {
attributes: {
"span.type": "tool.list",
},
});
try {
logger.debug(`MCP:${this.name}:Listing tools`);
await this.start();
const { tools } = (await this.client.listTools()) as {
tools: Array<{
name: string;
description: string;
inputSchema: FunctionSchema;
}>;
logger.debug(`MCP:${this.name}:Listing tools`);
await this.start();
const { tools } = (await this.client.listTools()) as {
tools: Array<{
name: string;
description: string;
inputSchema: FunctionSchema;
}>;
};
await this.close();
const result = tools.map((tool) => {
return {
name: tool.name,
description: tool.description,
inputSchema: schemaToZodSchema(tool.inputSchema),
originalSchema: tool.inputSchema,
call: (input: Record<string, unknown> | undefined) => {
return this.call(tool.name, input);
},
};
await this.close();
const result = tools.map((tool) => {
return {
name: tool.name,
description: tool.description,
inputSchema: schemaToZodSchema(tool.inputSchema),
originalSchema: tool.inputSchema,
call: (input: Record<string, unknown> | undefined) => {
return this.call(tool.name, input);
},
};
});
span.setAttribute("tool.list.result", JSON.stringify(result));
return result;
} catch (err) {
span.setStatus("error");
span.recordException(err as Error);
throw err;
} finally {
span.end();
}
});
return result;
}

async call(toolName: string, args: Record<string, unknown> | undefined): Promise<unknown> {
const span = startSpan(this.name + "." + toolName, {
attributes: {
"span.type": "tool.call",
"tool.name": toolName,
"tool.args": JSON.stringify(args),
},
});
try {
logger.debug(
`MCP:${this.name}:Tool calling`,
Expand All @@ -230,7 +208,6 @@ export class McpTool {
JSON.stringify(args),
// result
);
span.setAttribute("tool.call.result", JSON.stringify(result));
return result;
} catch (err: unknown) {
if (err instanceof Error) {
Expand All @@ -243,8 +220,6 @@ export class McpTool {
});
}
throw err;
} finally {
span.end();
}
}

Expand Down
Loading
Loading