diff --git a/src/browser/features/Tools/WorkflowRunToolCall.test.tsx b/src/browser/features/Tools/WorkflowRunToolCall.test.tsx index 9278b4201e..016f2aaeff 100644 --- a/src/browser/features/Tools/WorkflowRunToolCall.test.tsx +++ b/src/browser/features/Tools/WorkflowRunToolCall.test.tsx @@ -1197,6 +1197,62 @@ describe("WorkflowRunToolCall", () => { }); }); + test("shows invocation arguments and definition source before workflow events", () => { + const runningRun = { + id: "wfr_event_priority", + workspaceId: TEST_WORKSPACE_ID, + definition: { + name: "deep-research", + description: "Deep research", + scope: "built-in" as const, + executable: true, + }, + definitionSource: "export default function workflow() { return null; }", + definitionHash: "sha256:event-priority", + args: { topic: "workflow cards" }, + status: "running" as const, + createdAt: "2026-05-29T00:00:00.000Z", + updatedAt: "2026-05-29T00:00:02.000Z", + events: [ + { + sequence: 1, + type: "action" as const, + at: "2026-05-29T00:00:01.000Z", + stepId: "collect-sources", + name: "github.issue.get", + status: "completed" as const, + effect: "read" as const, + details: { issue: 149 }, + }, + ], + steps: [], + }; + + const view = renderWithStickyToolProviders( + + ); + + const argumentsTitle = view.getByText("Arguments"); + const definitionSourceTitle = view.getByText("Definition source"); + const eventsTitle = view.getByText("Workflow events (1)"); + expect(Boolean(argumentsTitle.compareDocumentPosition(eventsTitle) & 4)).toBe(true); + expect(Boolean(definitionSourceTitle.compareDocumentPosition(eventsTitle) & 4)).toBe(true); + expect(view.getByText("collect-sources / github.issue.get / completed")).toBeTruthy(); + }); + test("renders executing foreground workflow status before the durable run is discovered", () => { const view = render( diff --git a/src/node/services/aiService.test.ts b/src/node/services/aiService.test.ts index 2a1647b145..b27152af4e 100644 --- a/src/node/services/aiService.test.ts +++ b/src/node/services/aiService.test.ts @@ -52,6 +52,7 @@ import type { RuntimeStatusEvent, StreamAbortEvent, StreamEndEvent, + WorkflowRunAttachedEvent, } from "@/common/types/stream"; import { log } from "./log"; import type { SessionUsageService } from "./sessionUsageService"; @@ -680,6 +681,28 @@ describe("AIService.setupStreamEventForwarding", () => { expect(internals.pendingDevToolsRunMetadataByMessageId.has("message-1")).toBe(true); }); + it("forwards workflow-run-attached events", async () => { + using harness = createForwardingHarness("ai-service-workflow-run-attached-forwarding"); + const { service, internals } = harness; + const event: WorkflowRunAttachedEvent = { + type: "workflow-run-attached", + workspaceId: "workspace-1", + messageId: "message-1", + toolCallId: "workflow-call-1", + runId: "wfr_forwarded", + timestamp: Date.now(), + }; + + const forwardedPromise = new Promise((resolve) => { + service.once("workflow-run-attached", (forwarded) => + resolve(forwarded as WorkflowRunAttachedEvent) + ); + }); + internals.streamManager.emit("workflow-run-attached", event); + + expect(await forwardedPromise).toEqual(event); + }); + it.each([ { name: "stream error", diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index 01bf973c21..eb9dd54fc2 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -631,6 +631,7 @@ export class AIService extends EventEmitter { "tool-call-end", "reasoning-delta", "reasoning-end", + "workflow-run-attached", "usage-delta", ] as const) { this.streamManager.on(event, (data) => this.emit(event, data)); diff --git a/src/node/services/streamManager.test.ts b/src/node/services/streamManager.test.ts index 60310cee13..c0ef40d381 100644 --- a/src/node/services/streamManager.test.ts +++ b/src/node/services/streamManager.test.ts @@ -3,6 +3,7 @@ import * as fs from "node:fs/promises"; import { KNOWN_MODELS } from "@/common/constants/knownModels"; import { StreamEndEventSchema } from "@/common/orpc/schemas/stream"; +import type { CompletedMessagePart, WorkflowRunAttachedEvent } from "@/common/types/stream"; import { Ok, Err } from "@/common/types/result"; import type { ToolPolicy } from "@/common/utils/tools/toolPolicy"; import { @@ -200,6 +201,7 @@ function createStreamInfoForTests( startTime: now, lastPartTimestamp: now, toolCompletionTimestamps: new Map(), + pendingWorkflowRunAttachments: new Map(), model, metadataModel: overrides.metadataModel ?? model, historySequence: 1, @@ -232,6 +234,7 @@ describe("StreamManager - workflow run attachments", () => { const streamInfo = createStreamInfoForTests({ messageId, lastPartialWriteTime: timestamp, + pendingWorkflowRunAttachments: undefined, parts: [ { type: "dynamic-tool", @@ -266,6 +269,82 @@ describe("StreamManager - workflow run attachments", () => { timestamp: timestamp + 1, }); }); + + test("persists workflow attachments that arrive before the tool part", async () => { + const streamManager = new StreamManager(historyService); + const workspaceId = "workflow-attachment-race-workspace"; + const messageId = "workflow-attachment-race-message"; + const timestamp = Date.now(); + const streamInfo = createStreamInfoForTests({ + messageId, + lastPartialWriteTime: timestamp, + parts: [], + }); + + getWorkspaceStreamsForTests(streamManager).set(workspaceId, streamInfo); + + const attached = await streamManager.attachWorkflowRunToToolCall({ + type: "workflow-run-attached", + workspaceId, + messageId, + toolCallId: "workflow-call-race", + runId: "wfr_race", + timestamp: timestamp + 1, + }); + + expect(attached).toBe(true); + expect(await historyService.readPartial(workspaceId)).toBeNull(); + + const appendPartAndEmit = getPrivateMethodForTests< + ( + workspaceId: string, + streamInfo: Record, + part: CompletedMessagePart, + schedulePartialWrite?: boolean + ) => Promise + >(streamManager, "appendPartAndEmit"); + + const replayedAttachments: WorkflowRunAttachedEvent[] = []; + streamManager.on("workflow-run-attached", (event: WorkflowRunAttachedEvent) => { + replayedAttachments.push(event); + }); + + await appendPartAndEmit.call( + streamManager, + workspaceId, + streamInfo, + { + type: "dynamic-tool", + toolCallId: "workflow-call-race", + toolName: "workflow_run", + input: { name: "deep-research", args: {} }, + state: "input-available", + timestamp: timestamp + 2, + }, + false + ); + + expect(replayedAttachments).toEqual([ + { + type: "workflow-run-attached", + workspaceId, + messageId, + toolCallId: "workflow-call-race", + runId: "wfr_race", + timestamp: timestamp + 1, + }, + ]); + + const partial = await historyService.readPartial(workspaceId); + const part = partial?.parts[0]; + if (part?.type !== "dynamic-tool") { + throw new Error("Expected workflow tool part in persisted partial"); + } + expect(part.workflowRun).toEqual({ + runId: "wfr_race", + timestamp: timestamp + 1, + }); + }); }); describe("StreamManager - createTempDirForStream", () => { diff --git a/src/node/services/streamManager.ts b/src/node/services/streamManager.ts index 0e101bd825..238e1d427b 100644 --- a/src/node/services/streamManager.ts +++ b/src/node/services/streamManager.ts @@ -463,6 +463,9 @@ function hasIncompleteToolCallPart(parts: CompletedMessagePart[]): boolean { return parts.some((part) => part.type === "dynamic-tool" && part.state !== "output-available"); } +type DynamicToolCompletedMessagePart = Extract; +type WorkflowRunToolAttachment = NonNullable; + // Comprehensive stream info interface WorkspaceStreamInfo { state: StreamState; @@ -484,6 +487,10 @@ interface WorkspaceStreamInfo { // original start timestamp even after they gain output. toolCompletionTimestamps: Map; + // Workflow tools can create the durable run before their stream part is stored. Keep the exact + // attachment and apply it as soon as the matching dynamic-tool part lands. + pendingWorkflowRunAttachments: Map; + model: string; /** Metadata model resolved from provider mapping for cost/token metadata lookups. */ metadataModel: string; @@ -646,6 +653,43 @@ export class StreamManager extends EventEmitter { streamInfo.toolModelUsages.push(clonePersistedToolModelUsage(event)); } + private getWorkflowRunAttachment(event: WorkflowRunAttachedEvent): WorkflowRunToolAttachment { + return { + runId: event.runId, + ...(event.run != null ? { run: event.run } : {}), + timestamp: event.timestamp, + }; + } + + private takePendingWorkflowRunAttachment( + streamInfo: WorkspaceStreamInfo, + toolCallId: string + ): WorkflowRunToolAttachment | undefined { + const pendingAttachments = (streamInfo.pendingWorkflowRunAttachments ??= new Map()); + const attachment = pendingAttachments.get(toolCallId); + if (attachment != null) { + pendingAttachments.delete(toolCallId); + } + return attachment; + } + + private emitWorkflowRunAttachedFromAttachment(input: { + workspaceId: WorkspaceId; + messageId: string; + toolCallId: string; + attachment: WorkflowRunToolAttachment; + }): void { + this.emit("workflow-run-attached", { + type: "workflow-run-attached", + workspaceId: input.workspaceId as string, + messageId: input.messageId, + toolCallId: input.toolCallId, + runId: input.attachment.runId, + ...(input.attachment.run != null ? { run: input.attachment.run } : {}), + timestamp: input.attachment.timestamp, + } satisfies WorkflowRunAttachedEvent); + } + async attachWorkflowRunToToolCall(event: WorkflowRunAttachedEvent): Promise { const workspaceId = event.workspaceId as WorkspaceId; const streamInfo = this.workspaceStreams.get(workspaceId); @@ -656,11 +700,13 @@ export class StreamManager extends EventEmitter { return false; } + const attachment = this.getWorkflowRunAttachment(event); const partIndex = streamInfo.parts.findIndex( (part) => part.type === "dynamic-tool" && part.toolCallId === event.toolCallId ); if (partIndex === -1) { - return false; + (streamInfo.pendingWorkflowRunAttachments ??= new Map()).set(event.toolCallId, attachment); + return true; } const part = streamInfo.parts[partIndex]; @@ -668,13 +714,10 @@ export class StreamManager extends EventEmitter { return false; } + streamInfo.pendingWorkflowRunAttachments?.delete(event.toolCallId); streamInfo.parts[partIndex] = { ...part, - workflowRun: { - runId: event.runId, - ...(event.run != null ? { run: event.run } : {}), - timestamp: event.timestamp, - }, + workflowRun: attachment, }; await this.flushPartialWrite(workspaceId, streamInfo); @@ -1164,8 +1207,24 @@ export class StreamManager extends EventEmitter { await this.emitPartAsEvent(workspaceId, streamInfo.messageId, part); } finally { // Always persist the part in-memory (and to partial.json, if enabled), even if emit fails. - streamInfo.parts.push(part); - if (schedulePartialWrite) { + let partToPersist = part; + let pendingAttachment: WorkflowRunToolAttachment | undefined; + if (part.type === "dynamic-tool") { + pendingAttachment = this.takePendingWorkflowRunAttachment(streamInfo, part.toolCallId); + if (pendingAttachment != null) { + partToPersist = { ...part, workflowRun: pendingAttachment }; + } + } + streamInfo.parts.push(partToPersist); + if (pendingAttachment != null && part.type === "dynamic-tool") { + await this.flushPartialWrite(workspaceId, streamInfo); + this.emitWorkflowRunAttachedFromAttachment({ + workspaceId, + messageId: streamInfo.messageId, + toolCallId: part.toolCallId, + attachment: pendingAttachment, + }); + } else if (schedulePartialWrite) { void this.schedulePartialWrite(workspaceId, streamInfo); } } @@ -1563,6 +1622,7 @@ export class StreamManager extends EventEmitter { startTime, lastPartTimestamp: startTime, toolCompletionTimestamps: new Map(), + pendingWorkflowRunAttachments: new Map(), model: modelString, metadataModel, thinkingLevel, @@ -1622,12 +1682,14 @@ export class StreamManager extends EventEmitter { const existingPartIndex = streamInfo.parts.findIndex( (p) => p.type === "dynamic-tool" && p.toolCallId === toolCallId ); + const pendingAttachment = this.takePendingWorkflowRunAttachment(streamInfo, toolCallId); if (existingPartIndex !== -1) { const existingPart = streamInfo.parts[existingPartIndex]; if (existingPart.type === "dynamic-tool") { streamInfo.parts[existingPartIndex] = { ...existingPart, + ...(pendingAttachment != null ? { workflowRun: pendingAttachment } : {}), state: "output-available" as const, output, }; @@ -1644,6 +1706,7 @@ export class StreamManager extends EventEmitter { input: toolCall?.input ?? null, output, timestamp: nextPartTimestamp(streamInfo), + ...(pendingAttachment != null ? { workflowRun: pendingAttachment } : {}), }); } @@ -1652,6 +1715,14 @@ export class StreamManager extends EventEmitter { // read partial.json via commitPartial. Without this await, there's a race condition // where the partial is read before the tool result is written, causing "amnesia". await this.flushPartialWrite(workspaceId, streamInfo); + if (pendingAttachment != null) { + this.emitWorkflowRunAttachedFromAttachment({ + workspaceId, + messageId: streamInfo.messageId, + toolCallId, + attachment: pendingAttachment, + }); + } // Emit tool-call-end event (listeners can now safely read partial) const completionTimestamp = nextPartTimestamp(streamInfo);