Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/browser/features/Tools/WorkflowRunToolCall.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,62 @@ describe("WorkflowRunToolCall", () => {
});
});

test("shows invocation arguments and definition source before workflow events", () => {
const runningRun = {
id: "wfr_event_priority",
workspaceId: TEST_WORKSPACE_ID,
definition: {
name: "deep-research",
description: "Deep research",
scope: "built-in" as const,
executable: true,
},
definitionSource: "export default function workflow() { return null; }",
definitionHash: "sha256:event-priority",
args: { topic: "workflow cards" },
status: "running" as const,
createdAt: "2026-05-29T00:00:00.000Z",
updatedAt: "2026-05-29T00:00:02.000Z",
events: [
{
sequence: 1,
type: "action" as const,
at: "2026-05-29T00:00:01.000Z",
stepId: "collect-sources",
name: "github.issue.get",
status: "completed" as const,
effect: "read" as const,
details: { issue: 149 },
},
],
steps: [],
};

const view = renderWithStickyToolProviders(
<WorkflowRunToolCall
args={{
name: "deep-research",
args: { topic: "workflow cards" },
run_in_background: false,
}}
status="executing"
result={{
status: "running",
runId: runningRun.id,
result: null,
run: runningRun,
}}
/>
);

const argumentsTitle = view.getByText("Arguments");
const definitionSourceTitle = view.getByText("Definition source");
const eventsTitle = view.getByText("Workflow events (1)");
expect(Boolean(argumentsTitle.compareDocumentPosition(eventsTitle) & 4)).toBe(true);
expect(Boolean(definitionSourceTitle.compareDocumentPosition(eventsTitle) & 4)).toBe(true);
expect(view.getByText("collect-sources / github.issue.get / completed")).toBeTruthy();
});

test("renders executing foreground workflow status before the durable run is discovered", () => {
const view = render(
<ThemeProvider forcedTheme="dark">
Expand Down
23 changes: 23 additions & 0 deletions src/node/services/aiService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import type {
RuntimeStatusEvent,
StreamAbortEvent,
StreamEndEvent,
WorkflowRunAttachedEvent,
} from "@/common/types/stream";
import { log } from "./log";
import type { SessionUsageService } from "./sessionUsageService";
Expand Down Expand Up @@ -680,6 +681,28 @@ describe("AIService.setupStreamEventForwarding", () => {
expect(internals.pendingDevToolsRunMetadataByMessageId.has("message-1")).toBe(true);
});

it("forwards workflow-run-attached events", async () => {
using harness = createForwardingHarness("ai-service-workflow-run-attached-forwarding");
const { service, internals } = harness;
const event: WorkflowRunAttachedEvent = {
type: "workflow-run-attached",
workspaceId: "workspace-1",
messageId: "message-1",
toolCallId: "workflow-call-1",
runId: "wfr_forwarded",
timestamp: Date.now(),
};

const forwardedPromise = new Promise<WorkflowRunAttachedEvent>((resolve) => {
service.once("workflow-run-attached", (forwarded) =>
resolve(forwarded as WorkflowRunAttachedEvent)
);
});
internals.streamManager.emit("workflow-run-attached", event);

expect(await forwardedPromise).toEqual(event);
});

it.each([
{
name: "stream error",
Expand Down
1 change: 1 addition & 0 deletions src/node/services/aiService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ export class AIService extends EventEmitter {
"tool-call-end",
"reasoning-delta",
"reasoning-end",
"workflow-run-attached",
"usage-delta",
] as const) {
this.streamManager.on(event, (data) => this.emit(event, data));
Expand Down
79 changes: 79 additions & 0 deletions src/node/services/streamManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as fs from "node:fs/promises";

import { KNOWN_MODELS } from "@/common/constants/knownModels";
import { StreamEndEventSchema } from "@/common/orpc/schemas/stream";
import type { CompletedMessagePart, WorkflowRunAttachedEvent } from "@/common/types/stream";
import { Ok, Err } from "@/common/types/result";
import type { ToolPolicy } from "@/common/utils/tools/toolPolicy";
import {
Expand Down Expand Up @@ -200,6 +201,7 @@ function createStreamInfoForTests(
startTime: now,
lastPartTimestamp: now,
toolCompletionTimestamps: new Map<string, number>(),
pendingWorkflowRunAttachments: new Map<string, unknown>(),
model,
metadataModel: overrides.metadataModel ?? model,
historySequence: 1,
Expand Down Expand Up @@ -232,6 +234,7 @@ describe("StreamManager - workflow run attachments", () => {
const streamInfo = createStreamInfoForTests({
messageId,
lastPartialWriteTime: timestamp,
pendingWorkflowRunAttachments: undefined,
parts: [
{
type: "dynamic-tool",
Expand Down Expand Up @@ -266,6 +269,82 @@ describe("StreamManager - workflow run attachments", () => {
timestamp: timestamp + 1,
});
});

test("persists workflow attachments that arrive before the tool part", async () => {
const streamManager = new StreamManager(historyService);
const workspaceId = "workflow-attachment-race-workspace";
const messageId = "workflow-attachment-race-message";
const timestamp = Date.now();
const streamInfo = createStreamInfoForTests({
messageId,
lastPartialWriteTime: timestamp,
parts: [],
});

getWorkspaceStreamsForTests(streamManager).set(workspaceId, streamInfo);

const attached = await streamManager.attachWorkflowRunToToolCall({
type: "workflow-run-attached",
workspaceId,
messageId,
toolCallId: "workflow-call-race",
runId: "wfr_race",
timestamp: timestamp + 1,
});

expect(attached).toBe(true);
expect(await historyService.readPartial(workspaceId)).toBeNull();

const appendPartAndEmit = getPrivateMethodForTests<
(
workspaceId: string,
streamInfo: Record<string, unknown>,
part: CompletedMessagePart,
schedulePartialWrite?: boolean
) => Promise<void>
>(streamManager, "appendPartAndEmit");

const replayedAttachments: WorkflowRunAttachedEvent[] = [];
streamManager.on("workflow-run-attached", (event: WorkflowRunAttachedEvent) => {
replayedAttachments.push(event);
});

await appendPartAndEmit.call(
streamManager,
workspaceId,
streamInfo,
{
type: "dynamic-tool",
toolCallId: "workflow-call-race",
toolName: "workflow_run",
input: { name: "deep-research", args: {} },
state: "input-available",
timestamp: timestamp + 2,
},
false
);

expect(replayedAttachments).toEqual([
{
type: "workflow-run-attached",
workspaceId,
messageId,
toolCallId: "workflow-call-race",
runId: "wfr_race",
timestamp: timestamp + 1,
},
]);

const partial = await historyService.readPartial(workspaceId);
const part = partial?.parts[0];
if (part?.type !== "dynamic-tool") {
throw new Error("Expected workflow tool part in persisted partial");
}
expect(part.workflowRun).toEqual({
runId: "wfr_race",
timestamp: timestamp + 1,
});
});
});

describe("StreamManager - createTempDirForStream", () => {
Expand Down
87 changes: 79 additions & 8 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ function hasIncompleteToolCallPart(parts: CompletedMessagePart[]): boolean {
return parts.some((part) => part.type === "dynamic-tool" && part.state !== "output-available");
}

type DynamicToolCompletedMessagePart = Extract<CompletedMessagePart, { type: "dynamic-tool" }>;
type WorkflowRunToolAttachment = NonNullable<DynamicToolCompletedMessagePart["workflowRun"]>;

// Comprehensive stream info
interface WorkspaceStreamInfo {
state: StreamState;
Expand All @@ -484,6 +487,10 @@ interface WorkspaceStreamInfo {
// original start timestamp even after they gain output.
toolCompletionTimestamps: Map<string, number>;

// Workflow tools can create the durable run before their stream part is stored. Keep the exact
// attachment and apply it as soon as the matching dynamic-tool part lands.
pendingWorkflowRunAttachments: Map<string, WorkflowRunToolAttachment>;

model: string;
/** Metadata model resolved from provider mapping for cost/token metadata lookups. */
metadataModel: string;
Expand Down Expand Up @@ -646,6 +653,43 @@ export class StreamManager extends EventEmitter {
streamInfo.toolModelUsages.push(clonePersistedToolModelUsage(event));
}

private getWorkflowRunAttachment(event: WorkflowRunAttachedEvent): WorkflowRunToolAttachment {
return {
runId: event.runId,
...(event.run != null ? { run: event.run } : {}),
timestamp: event.timestamp,
};
}

private takePendingWorkflowRunAttachment(
streamInfo: WorkspaceStreamInfo,
toolCallId: string
): WorkflowRunToolAttachment | undefined {
const pendingAttachments = (streamInfo.pendingWorkflowRunAttachments ??= new Map());
const attachment = pendingAttachments.get(toolCallId);
if (attachment != null) {
pendingAttachments.delete(toolCallId);
}
return attachment;
}

private emitWorkflowRunAttachedFromAttachment(input: {
workspaceId: WorkspaceId;
messageId: string;
toolCallId: string;
attachment: WorkflowRunToolAttachment;
}): void {
this.emit("workflow-run-attached", {
type: "workflow-run-attached",
workspaceId: input.workspaceId as string,
messageId: input.messageId,
toolCallId: input.toolCallId,
runId: input.attachment.runId,
...(input.attachment.run != null ? { run: input.attachment.run } : {}),
timestamp: input.attachment.timestamp,
} satisfies WorkflowRunAttachedEvent);
}

async attachWorkflowRunToToolCall(event: WorkflowRunAttachedEvent): Promise<boolean> {
const workspaceId = event.workspaceId as WorkspaceId;
const streamInfo = this.workspaceStreams.get(workspaceId);
Expand All @@ -656,25 +700,24 @@ export class StreamManager extends EventEmitter {
return false;
}

const attachment = this.getWorkflowRunAttachment(event);
const partIndex = streamInfo.parts.findIndex(
(part) => part.type === "dynamic-tool" && part.toolCallId === event.toolCallId
);
if (partIndex === -1) {
return false;
(streamInfo.pendingWorkflowRunAttachments ??= new Map()).set(event.toolCallId, attachment);
return true;
}

const part = streamInfo.parts[partIndex];
if (part.type !== "dynamic-tool") {
return false;
}

Comment thread
ThomasK33 marked this conversation as resolved.
streamInfo.pendingWorkflowRunAttachments?.delete(event.toolCallId);
streamInfo.parts[partIndex] = {
...part,
workflowRun: {
runId: event.runId,
...(event.run != null ? { run: event.run } : {}),
timestamp: event.timestamp,
},
workflowRun: attachment,
};

await this.flushPartialWrite(workspaceId, streamInfo);
Expand Down Expand Up @@ -1164,8 +1207,24 @@ export class StreamManager extends EventEmitter {
await this.emitPartAsEvent(workspaceId, streamInfo.messageId, part);
} finally {
// Always persist the part in-memory (and to partial.json, if enabled), even if emit fails.
streamInfo.parts.push(part);
if (schedulePartialWrite) {
let partToPersist = part;
let pendingAttachment: WorkflowRunToolAttachment | undefined;
if (part.type === "dynamic-tool") {
pendingAttachment = this.takePendingWorkflowRunAttachment(streamInfo, part.toolCallId);
if (pendingAttachment != null) {
partToPersist = { ...part, workflowRun: pendingAttachment };
}
}
streamInfo.parts.push(partToPersist);
if (pendingAttachment != null && part.type === "dynamic-tool") {
await this.flushPartialWrite(workspaceId, streamInfo);
this.emitWorkflowRunAttachedFromAttachment({
Comment thread
ThomasK33 marked this conversation as resolved.
workspaceId,
messageId: streamInfo.messageId,
toolCallId: part.toolCallId,
attachment: pendingAttachment,
});
} else if (schedulePartialWrite) {
void this.schedulePartialWrite(workspaceId, streamInfo);
}
}
Expand Down Expand Up @@ -1563,6 +1622,7 @@ export class StreamManager extends EventEmitter {
startTime,
lastPartTimestamp: startTime,
toolCompletionTimestamps: new Map(),
pendingWorkflowRunAttachments: new Map(),
model: modelString,
metadataModel,
thinkingLevel,
Expand Down Expand Up @@ -1622,12 +1682,14 @@ export class StreamManager extends EventEmitter {
const existingPartIndex = streamInfo.parts.findIndex(
(p) => p.type === "dynamic-tool" && p.toolCallId === toolCallId
);
const pendingAttachment = this.takePendingWorkflowRunAttachment(streamInfo, toolCallId);

if (existingPartIndex !== -1) {
const existingPart = streamInfo.parts[existingPartIndex];
if (existingPart.type === "dynamic-tool") {
streamInfo.parts[existingPartIndex] = {
...existingPart,
...(pendingAttachment != null ? { workflowRun: pendingAttachment } : {}),
state: "output-available" as const,
output,
};
Expand All @@ -1644,6 +1706,7 @@ export class StreamManager extends EventEmitter {
input: toolCall?.input ?? null,
output,
timestamp: nextPartTimestamp(streamInfo),
...(pendingAttachment != null ? { workflowRun: pendingAttachment } : {}),
});
}

Expand All @@ -1652,6 +1715,14 @@ export class StreamManager extends EventEmitter {
// read partial.json via commitPartial. Without this await, there's a race condition
// where the partial is read before the tool result is written, causing "amnesia".
await this.flushPartialWrite(workspaceId, streamInfo);
if (pendingAttachment != null) {
this.emitWorkflowRunAttachedFromAttachment({
workspaceId,
messageId: streamInfo.messageId,
toolCallId,
attachment: pendingAttachment,
});
}

// Emit tool-call-end event (listeners can now safely read partial)
const completionTimestamp = nextPartTimestamp(streamInfo);
Expand Down
Loading