From c61cdeb69f6c91085137c159b661b4c1fd0ba872 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 16:46:30 +0000 Subject: [PATCH 1/6] =?UTF-8?q?=F0=9F=A4=96=20feat:=20add=20nested=20workf?= =?UTF-8?q?low=20runs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement first-class nested workflow starts via the built-in action.workflows.start primitive, including deterministic child run IDs, parent/child metadata, run-store idempotency, nested workflow UI rows, tool discovery filtering, docs, and regression coverage. --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `1754016{MUX_COSTS_USD:-unknown}`_ --- .../Tools/WorkflowRunToolCall.test.tsx | 68 +++++ .../features/Tools/WorkflowRunToolCall.tsx | 46 ++- src/common/orpc/schemas.ts | 1 + src/common/orpc/schemas/workflow.ts | 20 ++ src/common/types/workflow.ts | 6 + src/common/utils/workflowRunMessages.test.ts | 58 ++++ src/node/builtinSkills/workflow-authoring.md | 25 ++ .../builtInSkillContent.generated.ts | 25 ++ src/node/services/tools/task_await.ts | 12 +- src/node/services/tools/task_list.ts | 7 +- .../services/workflows/WorkflowRunStore.ts | 79 +++++ src/node/services/workflows/WorkflowRunner.ts | 227 +++++++++++++- .../workflows/WorkflowService.test.ts | 286 ++++++++++++++++++ .../services/workflows/WorkflowService.ts | 150 ++++++++- .../workflows/builtInWorkflowActions.ts | 23 ++ .../services/workflows/nestedWorkflowRuns.ts | 31 ++ 16 files changed, 1057 insertions(+), 7 deletions(-) create mode 100644 src/common/utils/workflowRunMessages.test.ts create mode 100644 src/node/services/workflows/nestedWorkflowRuns.ts diff --git a/src/browser/features/Tools/WorkflowRunToolCall.test.tsx b/src/browser/features/Tools/WorkflowRunToolCall.test.tsx index b33aa90052..d56b4799df 100644 --- a/src/browser/features/Tools/WorkflowRunToolCall.test.tsx +++ b/src/browser/features/Tools/WorkflowRunToolCall.test.tsx @@ -626,6 +626,74 @@ describe("WorkflowRunToolCall", () => { expect(view.container.textContent).toContain("durationMs"); }); + test("coalesces nested workflow start and completion events into one row with child run details", () => { + const view = render( + + + + + + ); + + fireEvent.click(getWorkflowHeader(view)); + + expect(view.getByText("Workflow events (1)")).toBeTruthy(); + expect(view.getByText("child-simple / child-simple / wfr_child_abc / completed")).toBeTruthy(); + expect(view.queryByText("#2")).toBeNull(); + }); + test("coalesces patch start and applied events into one row with combined details", () => { const view = render( diff --git a/src/browser/features/Tools/WorkflowRunToolCall.tsx b/src/browser/features/Tools/WorkflowRunToolCall.tsx index ba4c00d5bd..abaf358a73 100644 --- a/src/browser/features/Tools/WorkflowRunToolCall.tsx +++ b/src/browser/features/Tools/WorkflowRunToolCall.tsx @@ -209,16 +209,19 @@ function getStructuredOutput(value: unknown): unknown { type WorkflowTaskEvent = Extract; type WorkflowActionEvent = Extract; +type WorkflowChildEvent = Extract; type WorkflowPatchEvent = Extract; type WorkflowDisplayRow = | { kind: "event"; event: WorkflowRunEvent } | { kind: "task"; firstEvent: WorkflowTaskEvent; latestEvent: WorkflowTaskEvent } | { kind: "action"; firstEvent: WorkflowActionEvent; latestEvent: WorkflowActionEvent } + | { kind: "workflow"; firstEvent: WorkflowChildEvent; latestEvent: WorkflowChildEvent } | { kind: "patch"; firstEvent: WorkflowPatchEvent; latestEvent: WorkflowPatchEvent }; type WorkflowTaskRow = Extract; type WorkflowActionRow = Extract; +type WorkflowChildRow = Extract; type WorkflowPatchRow = Extract; interface PendingWorkflowActionRows { rows: WorkflowActionRow[]; @@ -229,6 +232,10 @@ function getTaskEventKey(event: WorkflowTaskEvent): string { return `task:${event.stepId}:${event.taskId}`; } +function getWorkflowChildEventKey(event: WorkflowChildEvent): string { + return `workflow:${event.stepId}:${event.runId}`; +} + function getPatchEventKey(event: WorkflowPatchEvent): string { return `patch:${event.stepId}:${event.sourceTaskId}`; } @@ -273,6 +280,7 @@ function getWorkflowDisplayRows(events: readonly WorkflowRunEvent[]): WorkflowDi const rows: WorkflowDisplayRow[] = []; const taskRows = new Map(); const actionRows = new Map(); + const workflowRows = new Map(); const patchRows = new Map(); for (const event of events) { @@ -297,6 +305,24 @@ function getWorkflowDisplayRows(events: readonly WorkflowRunEvent[]): WorkflowDi continue; } + if (event.type === "workflow") { + const key = getWorkflowChildEventKey(event); + const existingRow = workflowRows.get(key); + if (existingRow != null) { + existingRow.latestEvent = event; + continue; + } + + const row: WorkflowChildRow = { + kind: "workflow", + firstEvent: event, + latestEvent: event, + }; + workflowRows.set(key, row); + rows.push(row); + continue; + } + if (event.type === "patch") { // A patch step emits started → applied/conflict/failed for the same // stepId+sourceTaskId; collapse them into one row (latest status wins), @@ -377,6 +403,9 @@ function getDisplayRowKey(row: WorkflowDisplayRow): string { if (row.kind === "action") { return `${getActionEventKey(row.firstEvent)}:${row.firstEvent.sequence}`; } + if (row.kind === "workflow") { + return getWorkflowChildEventKey(row.firstEvent); + } if (row.kind === "patch") { return getPatchEventKey(row.firstEvent); } @@ -396,6 +425,8 @@ function getWorkflowEventLabel(event: WorkflowRunEvent): string { // Prefer the human-readable sub-agent title (matches the spawned // workspace title); fall back to stepId for legacy events without one. return `${event.title ?? event.stepId} / ${event.taskId} / ${event.status}`; + case "workflow": + return `${event.stepId} / ${event.name} / ${event.runId} / ${event.status}`; case "patch": return `${event.stepId} / ${event.sourceTaskId} / ${event.status}`; case "action": @@ -423,6 +454,8 @@ function getWorkflowEventDetail(event: WorkflowRunEvent): unknown { return event.data; case "result": return event.result; + case "workflow": + return event.details; case "patch": return event.details; case "action": @@ -459,6 +492,13 @@ function getEventTone(event: WorkflowRunEvent): "normal" | "success" | "warning" } return event.status === "failed" ? "warning" : "normal"; } + if (event.type === "workflow") { + return event.status === "completed" + ? "success" + : event.status === "failed" || event.status === "interrupted" + ? "warning" + : "normal"; + } if (event.type === "result") { return "success"; } @@ -551,7 +591,9 @@ function WorkflowEventTooltip(props: { ); } -function getWorkflowMergedRowDetail(row: WorkflowActionRow | WorkflowPatchRow): unknown { +function getWorkflowMergedRowDetail( + row: WorkflowActionRow | WorkflowChildRow | WorkflowPatchRow +): unknown { const firstDetail = getWorkflowEventDetail(row.firstEvent); const latestDetail = getWorkflowEventDetail(row.latestEvent); if (row.firstEvent === row.latestEvent || row.latestEvent.status === "started") { @@ -1544,7 +1586,7 @@ export const WorkflowRunToolCall: React.FC = ({ /> ); } - if (row.kind === "action" || row.kind === "patch") { + if (row.kind === "action" || row.kind === "workflow" || row.kind === "patch") { return ( ; export type StructuredTaskOutput = z.infer; export type WorkflowRunEvent = z.infer; export type WorkflowStepRecord = z.infer; +export type WorkflowRunParent = z.infer; export type WorkflowRunRecord = z.infer; +export function isNestedWorkflowRun(run: WorkflowRunRecord): boolean { + return run.parentWorkflow != null; +} + export function assertWorkflowRunStatusTransition( from: WorkflowRunStatus, to: WorkflowRunStatus diff --git a/src/common/utils/workflowRunMessages.test.ts b/src/common/utils/workflowRunMessages.test.ts new file mode 100644 index 0000000000..2f1d395c75 --- /dev/null +++ b/src/common/utils/workflowRunMessages.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, test } from "bun:test"; + +import type { WorkflowRunRecord } from "@/common/types/workflow"; +import { buildWorkflowRunToolPart, stripWorkflowRunRecordForModel } from "./workflowRunMessages"; + +const run: WorkflowRunRecord = { + id: "wfr_test", + workspaceId: "workspace-1", + definition: { + name: "nested-parent-simple", + description: "Nested parent", + scope: "scratch", + executable: true, + }, + definitionSource: "export default function workflow() { return { reportMarkdown: 'done' }; }", + definitionHash: "sha256:test", + args: {}, + status: "completed", + createdAt: "2026-05-29T00:00:00.000Z", + updatedAt: "2026-05-29T00:00:01.000Z", + events: [], + steps: [], +}; + +describe("workflowRunMessages", () => { + test("strips large run snapshots from model-bound workflow tool outputs", () => { + const stripped = stripWorkflowRunRecordForModel("workflow_run", { + status: "completed", + runId: run.id, + result: { reportMarkdown: "done" }, + run, + }); + + expect(stripped).toEqual({ + status: "completed", + runId: run.id, + result: { reportMarkdown: "done" }, + }); + }); + + test("preserves run snapshots in UI workflow card tool parts", () => { + const part = buildWorkflowRunToolPart( + { name: "nested-parent-simple", args: {} }, + { runId: run.id, status: "completed", result: { reportMarkdown: "done" }, run }, + 1_000 + ); + + expect(part.state).toBe("output-available"); + if (part.state !== "output-available") { + throw new Error("Expected workflow run tool part to include output"); + } + expect(part.output).toMatchObject({ + status: "completed", + runId: run.id, + run: { id: run.id }, + }); + }); +}); diff --git a/src/node/builtinSkills/workflow-authoring.md b/src/node/builtinSkills/workflow-authoring.md index 4ec88798d7..4bc4d119d6 100644 --- a/src/node/builtinSkills/workflow-authoring.md +++ b/src/node/builtinSkills/workflow-authoring.md @@ -203,6 +203,31 @@ export async function execute(input, ctx) { } ``` +### `action.workflows.start(spec)` + +Starts another workflow in the same workspace and waits for its terminal result. This is a special built-in workflow action: project/global actions can still override `workflows.start`, but the built-in version is coordinated by the durable workflow runner rather than a normal action process. + +```js +const child = action.workflows.start({ + id: "research", + input: { + name: "deep-research", + args: { input: args.topic }, + }, +}); + +return { reportMarkdown: "Child said:\n\n" + child.reportMarkdown }; +``` + +Replay semantics: + +- Parent `id` + child `{ name, args }` maps to exactly one deterministic child run ID. +- Child workflow step IDs stay local to the child run; they do not collide with parent step IDs. +- Parent replay/resume reuses the linked child run, even if the child workflow source has changed since the child run was created. +- Child completion returns `{ runId, status, name, reportMarkdown, structuredOutput? }` to the parent. +- Child failure fails the parent step; parent interruption cascades to active child workflow runs. +- V1 is same-workspace and wait-to-terminal only; fire-and-forget child workflows are intentionally rejected. + ### `applyPatch(spec)` Applies a workflow-owned child task's git patch artifact to the current parent workspace. The host always dry-runs first in a temporary worktree and only performs the real apply when the dry-run succeeds. The conductor never receives raw patch text and cannot apply arbitrary patches. diff --git a/src/node/services/agentSkills/builtInSkillContent.generated.ts b/src/node/services/agentSkills/builtInSkillContent.generated.ts index cd34ae9339..189d9378b4 100644 --- a/src/node/services/agentSkills/builtInSkillContent.generated.ts +++ b/src/node/services/agentSkills/builtInSkillContent.generated.ts @@ -7147,6 +7147,31 @@ export const BUILTIN_SKILL_FILES: Record> = { "}", "```", "", + "### `action.workflows.start(spec)`", + "", + "Starts another workflow in the same workspace and waits for its terminal result. This is a special built-in workflow action: project/global actions can still override `workflows.start`, but the built-in version is coordinated by the durable workflow runner rather than a normal action process.", + "", + "```js", + "const child = action.workflows.start({", + ' id: "research",', + " input: {", + ' name: "deep-research",', + " args: { input: args.topic },", + " },", + "});", + "", + 'return { reportMarkdown: "Child said:\\n\\n" + child.reportMarkdown };', + "```", + "", + "Replay semantics:", + "", + "- Parent `id` + child `{ name, args }` maps to exactly one deterministic child run ID.", + "- Child workflow step IDs stay local to the child run; they do not collide with parent step IDs.", + "- Parent replay/resume reuses the linked child run, even if the child workflow source has changed since the child run was created.", + "- Child completion returns `{ runId, status, name, reportMarkdown, structuredOutput? }` to the parent.", + "- Child failure fails the parent step; parent interruption cascades to active child workflow runs.", + "- V1 is same-workspace and wait-to-terminal only; fire-and-forget child workflows are intentionally rejected.", + "", "### `applyPatch(spec)`", "", "Applies a workflow-owned child task's git patch artifact to the current parent workspace. The host always dry-runs first in a temporary worktree and only performs the real apply when the dry-run succeeds. The conductor never receives raw patch text and cannot apply arbitrary patches.", diff --git a/src/node/services/tools/task_await.ts b/src/node/services/tools/task_await.ts index 910889d0fa..abb7883b07 100644 --- a/src/node/services/tools/task_await.ts +++ b/src/node/services/tools/task_await.ts @@ -9,7 +9,11 @@ import { TOOL_DEFINITIONS, } from "@/common/utils/tools/toolDefinitions"; import { canRetryWorkflowFromCheckpoint } from "@/common/utils/workflowRetryEligibility"; -import type { WorkflowRunRecord, WorkflowRunStatus } from "@/common/types/workflow"; +import { + isNestedWorkflowRun, + type WorkflowRunRecord, + type WorkflowRunStatus, +} from "@/common/types/workflow"; import { fromBashTaskId, isWorkflowRunTaskId, toBashTaskId } from "./taskId"; import { formatBashOutputReport } from "./bashTaskReport"; @@ -280,7 +284,11 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => { const runs = await config.workflowService.listRuns({ workspaceId }); for (const rawRun of runs) { const parsed = WorkflowRunRecordSchema.safeParse(rawRun); - if (!parsed.success || !isWorkflowRunAwaitableStatus(parsed.data.status)) { + if ( + !parsed.success || + !isWorkflowRunAwaitableStatus(parsed.data.status) || + isNestedWorkflowRun(parsed.data) + ) { continue; } workflowRunIds.push(parsed.data.id); diff --git a/src/node/services/tools/task_list.ts b/src/node/services/tools/task_list.ts index ae47f8e20a..359edd8715 100644 --- a/src/node/services/tools/task_list.ts +++ b/src/node/services/tools/task_list.ts @@ -5,6 +5,7 @@ import type { ToolConfiguration, ToolFactory } from "@/common/utils/tools/tools" import { WorkflowRunRecordSchema } from "@/common/orpc/schemas"; import { TaskListToolResultSchema, TOOL_DEFINITIONS } from "@/common/utils/tools/toolDefinitions"; +import { isNestedWorkflowRun } from "@/common/types/workflow"; import type { AgentTaskStatus } from "@/node/services/taskService"; import { toBashTaskId } from "./taskId"; @@ -63,7 +64,11 @@ export const createTaskListTool: ToolFactory = (config: ToolConfiguration) => { const runs = await config.workflowService.listRuns({ workspaceId }); for (const rawRun of runs) { const parsed = WorkflowRunRecordSchema.safeParse(rawRun); - if (!parsed.success || !statuses.includes(parsed.data.status)) { + if ( + !parsed.success || + !statuses.includes(parsed.data.status) || + isNestedWorkflowRun(parsed.data) + ) { continue; } tasks.push({ diff --git a/src/node/services/workflows/WorkflowRunStore.ts b/src/node/services/workflows/WorkflowRunStore.ts index 4699b54fc5..cae36c708c 100644 --- a/src/node/services/workflows/WorkflowRunStore.ts +++ b/src/node/services/workflows/WorkflowRunStore.ts @@ -16,6 +16,7 @@ import type { StructuredTaskOutput, WorkflowDefinitionDescriptor, WorkflowRunEvent, + WorkflowRunParent, WorkflowRunRecord, WorkflowRunStatus, WorkflowStepRecord, @@ -36,6 +37,7 @@ export interface CreateWorkflowRunInput { definitionSource: string; args: unknown; defaultActionCwd?: string; + parentWorkflow?: WorkflowRunParent; now: string; } @@ -99,6 +101,7 @@ export class WorkflowRunStore { definitionHash: hashSource(input.definitionSource), args: input.args, ...(input.defaultActionCwd != null ? { defaultActionCwd: input.defaultActionCwd } : {}), + ...(input.parentWorkflow != null ? { parentWorkflow: input.parentWorkflow } : {}), status: "pending", createdAt: input.now, updatedAt: input.now, @@ -110,6 +113,59 @@ export class WorkflowRunStore { return run; } + async createRunIfAbsent(input: CreateWorkflowRunInput): Promise { + assert(input.id.length > 0, "WorkflowRunStore.createRunIfAbsent: id is required"); + assert( + input.workspaceId.length > 0, + "WorkflowRunStore.createRunIfAbsent: workspaceId is required" + ); + assert( + input.definitionSource.length > 0, + "WorkflowRunStore.createRunIfAbsent: definitionSource is required" + ); + + const runDir = this.runDir(input.id); + await fs.mkdir(this.workflowsDir(), { recursive: true }); + try { + await fs.mkdir(runDir, { recursive: false }); + } catch (error) { + if (!isAlreadyExistsError(error)) { + throw error; + } + const existing = await this.getRun(input.id); + assertSameWorkflowRunIdentity(existing, input); + return existing; + } + + try { + await fs.writeFile(path.join(runDir, "definition.js"), input.definitionSource, "utf-8"); + await fs.writeFile(path.join(runDir, "events.jsonl"), "", { flag: "a" }); + await fs.writeFile(path.join(runDir, "steps.jsonl"), "", { flag: "a" }); + + const run = WorkflowRunRecordSchema.parse({ + id: input.id, + workspaceId: input.workspaceId, + definition: input.definition, + definitionSource: input.definitionSource, + definitionHash: hashSource(input.definitionSource), + args: input.args, + ...(input.defaultActionCwd != null ? { defaultActionCwd: input.defaultActionCwd } : {}), + ...(input.parentWorkflow != null ? { parentWorkflow: input.parentWorkflow } : {}), + status: "pending", + createdAt: input.now, + updatedAt: input.now, + events: [], + steps: [], + }); + + await this.writeRunFile(input.id, run); + return run; + } catch (error) { + await fs.rm(runDir, { recursive: true, force: true }); + throw error; + } + } + async getRun(runId: string): Promise { // UI polling must not wait behind the writer lock; while a mutation is in progress, // fall back to the last atomic run.json snapshot instead of reading half-updated journals. @@ -852,6 +908,29 @@ function assertValidWorkflowRunId(runId: string): void { ); } +function isAlreadyExistsError(error: unknown): boolean { + return error instanceof Error && "code" in error && error.code === "EEXIST"; +} + +function assertSameWorkflowRunIdentity( + run: WorkflowRunRecord, + input: CreateWorkflowRunInput +): void { + const expectedDefinitionHash = hashSource(input.definitionSource); + const sameIdentity = + run.id === input.id && + run.workspaceId === input.workspaceId && + run.definition.name === input.definition.name && + run.definition.scope === input.definition.scope && + run.definitionHash === expectedDefinitionHash && + JSON.stringify(run.args) === JSON.stringify(input.args) && + JSON.stringify(run.parentWorkflow ?? null) === JSON.stringify(input.parentWorkflow ?? null); + assert( + sameIdentity, + `WorkflowRunStore.createRunIfAbsent: existing run identity does not match requested run ${input.id}` + ); +} + function getWorkflowStepKey(step: WorkflowStepLookup): string { return `${step.stepId}\0${step.inputHash}`; } diff --git a/src/node/services/workflows/WorkflowRunner.ts b/src/node/services/workflows/WorkflowRunner.ts index 2d56b2d77d..fb61d8948a 100644 --- a/src/node/services/workflows/WorkflowRunner.ts +++ b/src/node/services/workflows/WorkflowRunner.ts @@ -8,6 +8,7 @@ import type { WorkflowActionMetadata, WorkflowResult, WorkflowRunEvent, + WorkflowRunStatus, WorkflowStepRecord, } from "@/common/types/workflow"; import assert from "@/common/utils/assert"; @@ -24,6 +25,7 @@ import { type WorkflowActionExecutionResult, } from "./WorkflowActionRunner"; import type { AppendWorkflowRunEventOptions, WorkflowRunStore } from "./WorkflowRunStore"; +import { deriveChildWorkflowRunId } from "./nestedWorkflowRuns"; import { assertWorkflowStepId, hashWorkflowStepInput } from "./workflowReplayKey"; export class WorkflowRunBackgroundedError extends Error { @@ -98,6 +100,17 @@ export interface WorkflowActionSpec { cache?: boolean; } +export interface WorkflowStartInput { + name: string; + args: unknown; +} + +export type WorkflowNestedResult = WorkflowResult & { + runId: string; + status: WorkflowRunStatus; + name: string; +}; + export interface WorkflowActionResult { output: unknown; stdout: string; @@ -137,6 +150,19 @@ export interface WorkflowTaskAdapter { onRunEnded?(): Promise | void; } +export interface WorkflowChildRunAdapter { + runChildWorkflowToTerminal(input: { + parentRunId: string; + stepId: string; + inputHash: string; + childRunId: string; + name: string; + args: unknown; + backgroundOnMessageQueued?: boolean; + abortSignal?: AbortSignal; + }): Promise<{ runId: string; status: WorkflowRunStatus; result: unknown }>; +} + export interface WorkflowRunnerRunOptions { onLeaseAcquired?: () => void; abortSignal?: AbortSignal; @@ -159,6 +185,7 @@ export interface WorkflowRunnerOptions { runtimeFactory: IJSRuntimeFactory; taskAdapter: WorkflowTaskAdapter; actionRegistry?: WorkflowActionRegistry; + childRunAdapter?: WorkflowChildRunAdapter; actionRunner?: WorkflowActionRunner; getProjectTrusted?: () => boolean | Promise; projectTrusted?: boolean; @@ -281,6 +308,7 @@ export class WorkflowRunner { private readonly runtimeFactory: IJSRuntimeFactory; private readonly taskAdapter: WorkflowTaskAdapter; private readonly actionRegistry?: WorkflowActionRegistry; + private readonly childRunAdapter?: WorkflowChildRunAdapter; private readonly actionRunner: WorkflowActionRunner; private readonly getProjectTrusted: () => boolean | Promise; private readonly defaultActionCwd?: string; @@ -294,6 +322,7 @@ export class WorkflowRunner { this.runtimeFactory = options.runtimeFactory; this.taskAdapter = options.taskAdapter; this.actionRegistry = options.actionRegistry; + this.childRunAdapter = options.childRunAdapter; this.actionRunner = options.actionRunner ?? new WorkflowActionRunner(); this.getProjectTrusted = options.getProjectTrusted ?? (() => options.projectTrusted ?? false); this.defaultActionCwd = options.defaultActionCwd; @@ -486,6 +515,7 @@ export class WorkflowRunner { try { return await this.runActionStep(runId, sequence, rawName, rawSpec, { abortSignal: setupRuntime.getAbortSignal(), + backgroundOnMessageQueued: options?.backgroundOnMessageQueued ?? true, leaseGuard, }); } catch (error) { @@ -687,9 +717,10 @@ export class WorkflowRunner { rawSpec: unknown, options: { abortSignal?: AbortSignal; + backgroundOnMessageQueued?: boolean; leaseGuard: WorkflowRunnerLeaseGuard; } - ): Promise { + ): Promise { assert(typeof rawName === "string" && rawName.length > 0, "action requires a name"); const spec = parseWorkflowActionSpec(rawSpec); assertWorkflowStepId(spec.id, "action"); @@ -700,6 +731,13 @@ export class WorkflowRunner { projectTrusted: await this.getProjectTrusted(), builtInOnly: spec.builtInOnly, }); + if (isBuiltInWorkflowsStartAction(action)) { + return await this.runNestedWorkflowStep(runId, sequence, spec, { + abortSignal: options.abortSignal, + backgroundOnMessageQueued: options.backgroundOnMessageQueued, + leaseGuard: options.leaseGuard, + }); + } const cwd = getWorkflowActionCwd(spec, action, await this.getDefaultActionCwd(runId)); const inputHash = hashWorkflowStepInput( spec.id, @@ -786,6 +824,139 @@ export class WorkflowRunner { }); } + private async runNestedWorkflowStep( + runId: string, + sequence: WorkflowEventSequence, + spec: WorkflowActionSpec, + options: { + abortSignal?: AbortSignal; + backgroundOnMessageQueued?: boolean; + leaseGuard: WorkflowRunnerLeaseGuard; + } + ): Promise { + const childInput = parseWorkflowStartInput(spec.input); + const inputHash = hashWorkflowStepInput(spec.id, buildWorkflowStartReplayInput(childInput)); + const childRunId = deriveChildWorkflowRunId({ + parentRunId: runId, + stepId: spec.id, + inputHash, + }); + const existingStep = await this.runStore.getStep(runId, spec.id, inputHash); + if (existingStep?.status === "completed" && existingStep.result?.structuredOutput != null) { + return normalizeWorkflowNestedResult(existingStep.result.structuredOutput); + } + + const run = await this.runStore.getRun(runId); + const driftedStep = run.steps.find( + (step) => + step.stepId === spec.id && step.inputHash !== inputHash && step.taskId?.startsWith("wfr_") + ); + if (driftedStep != null) { + throw new Error( + `Workflow child step ${spec.id} has a prior replay identity and cannot start a different child workflow` + ); + } + + const childRunAdapter = this.childRunAdapter; + assert(childRunAdapter != null, "Nested workflows are not configured for this workflow runner"); + const startedAt = existingStep?.startedAt ?? this.clock.nowIso(); + options.leaseGuard.throwIfLost(); + await this.recordStepStarted(runId, { + stepId: spec.id, + inputHash, + taskId: childRunId, + startedAt, + }); + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: "started", + }); + + const child = await childRunAdapter.runChildWorkflowToTerminal({ + parentRunId: runId, + stepId: spec.id, + inputHash, + childRunId, + name: childInput.name, + args: childInput.args, + backgroundOnMessageQueued: options.backgroundOnMessageQueued, + abortSignal: options.abortSignal, + }); + if (child.status === "backgrounded" && options.backgroundOnMessageQueued !== false) { + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: "backgrounded", + }); + throw createForegroundWaitBackgroundedError(); + } + if (child.status !== "completed") { + const message = `Child workflow ${childInput.name} finished with status ${child.status}`; + await this.recordStepFailed(runId, { + stepId: spec.id, + inputHash, + taskId: childRunId, + error: message, + startedAt, + completedAt: this.clock.nowIso(), + }); + const eventStatus = child.status === "pending" ? "running" : child.status; + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: eventStatus, + details: { error: message }, + }); + throw new Error(message); + } + + const childResult = normalizeWorkflowResultForEvent(child.result); + const nestedResult: WorkflowNestedResult = { + reportMarkdown: childResult.reportMarkdown, + ...(childResult.structuredOutput !== undefined + ? { structuredOutput: childResult.structuredOutput } + : {}), + runId: child.runId, + status: child.status, + name: childInput.name, + }; + await this.recordStepCompleted(runId, { + stepId: spec.id, + inputHash, + taskId: childRunId, + result: { + reportMarkdown: childResult.reportMarkdown, + structuredOutput: nestedResult, + }, + startedAt, + completedAt: this.clock.nowIso(), + }); + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: "completed", + details: nestedResult, + }); + return nestedResult; + } + private async getCachedWorkflowActionEffect( runId: string, stepId: string, @@ -1893,6 +2064,35 @@ function parseWorkflowActionSpec(rawSpec: unknown): WorkflowActionSpec { return parsed; } +function parseWorkflowStartInput(rawInput: unknown): WorkflowStartInput { + assert( + rawInput != null && typeof rawInput === "object" && !Array.isArray(rawInput), + "workflows.start input must be an object" + ); + const input = rawInput as Record; + assert( + typeof input.name === "string" && input.name.length > 0, + "workflows.start input.name is required" + ); + assert( + input.wait !== false && input.runInBackground !== true && input.run_in_background !== true, + "workflows.start currently waits for the child workflow to reach a terminal result" + ); + return { name: input.name, args: input.args ?? {} }; +} + +function buildWorkflowStartReplayInput(input: WorkflowStartInput): unknown { + return { + primitive: "workflows.start", + name: input.name, + args: input.args, + }; +} + +function isBuiltInWorkflowsStartAction(action: ResolvedWorkflowAction): boolean { + return action.scope === "built-in" && action.name === "workflows.start"; +} + function buildWorkflowActionReplayInput( action: ResolvedWorkflowAction, spec: WorkflowActionSpec, @@ -2014,6 +2214,31 @@ function workflowActionErrorDetails(error: unknown): Record { return { error: getErrorMessage(error) }; } +function normalizeWorkflowNestedResult(rawResult: unknown): WorkflowNestedResult { + const result = normalizeWorkflowResultForEvent(rawResult); + assert(rawResult != null && typeof rawResult === "object", "workflow result must be an object"); + const record = rawResult as Record; + assert( + typeof record.runId === "string" && record.runId.length > 0, + "workflow result requires runId" + ); + assert( + typeof record.name === "string" && record.name.length > 0, + "workflow result requires name" + ); + assert( + typeof record.status === "string" && record.status.length > 0, + "workflow result requires status" + ); + return { + reportMarkdown: result.reportMarkdown, + ...(result.structuredOutput !== undefined ? { structuredOutput: result.structuredOutput } : {}), + runId: record.runId, + name: record.name, + status: record.status as WorkflowRunStatus, + }; +} + function normalizeWorkflowActionResult(rawResult: unknown): WorkflowActionResult { assert(rawResult != null && typeof rawResult === "object", "action result must be an object"); const record = rawResult as Record; diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 6b2d66e68c..f9c85da499 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -270,6 +270,105 @@ export default function workflow({ args, agent }) { expect(run.definition.scope).toBe("global"); }); + test("runs a nested workflow through action.workflows.start and reuses the child on replay", async () => { + using tmp = new DisposableTempDir("workflow-service-nested"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-simple", + "// description: Child workflow\nexport default function workflow({ args }) { return { reportMarkdown: 'Child: ' + args.topic }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-simple", + `// description: Parent workflow + export default function workflow({ args, action }) { + const child = action.workflows.start({ + id: "child-simple", + input: { name: "child-simple", args: { topic: args.topic } }, + }); + return { reportMarkdown: "Parent saw " + child.reportMarkdown, structuredOutput: { childRunId: child.runId } }; + }\n` + ); + + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + generateRunId: () => "wfr_parent_simple", + runnerId: "runner-a", + clock: { + nowIso: () => "2026-05-29T00:00:00.000Z", + nowMs: () => 1_000, + }, + }); + + const result = await service.startNamedWorkflow({ + name: "parent-simple", + workspaceId: "workspace-1", + projectTrusted: true, + args: { topic: "nested smoke" }, + }); + const parentRun = await runStore.getRun("wfr_parent_simple"); + const childStep = parentRun.steps.find((step) => step.stepId === "child-simple"); + expect(childStep?.taskId).toMatch(/^wfr_child_/); + const childRunId = childStep?.taskId; + if (childRunId == null) { + throw new Error("Expected nested workflow step to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + + expect(result).toEqual({ + runId: "wfr_parent_simple", + status: "completed", + result: { + reportMarkdown: "Parent saw Child: nested smoke", + structuredOutput: { childRunId }, + }, + }); + expect(childRun.id).toBe(childRunId); + expect(childRun.workspaceId).toBe("workspace-1"); + expect(childRun.definition.name).toBe("child-simple"); + expect(childRun.parentWorkflow?.runId).toBe("wfr_parent_simple"); + expect(childRun.parentWorkflow?.stepId).toBe("child-simple"); + expect(childRun.status).toBe("completed"); + expect(parentRun.events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: "workflow", + stepId: "child-simple", + runId: childRunId, + status: "started", + }), + expect.objectContaining({ + type: "workflow", + stepId: "child-simple", + runId: childRunId, + status: "completed", + }), + ]) + ); + + const runsAfterCompletion = await runStore.listRuns(); + expect( + runsAfterCompletion.filter((run) => run.parentWorkflow?.runId === "wfr_parent_simple") + ).toHaveLength(1); + }); + test("runs workspace scratch workflow definitions authored as files", async () => { using tmp = new DisposableTempDir("workflow-service"); const workspaceRoot = path.join(tmp.path, "project"); @@ -610,6 +709,193 @@ export default function workflow({ args, agent }) { expect(runError instanceof Error ? runError.message : "").toMatch(/interrupted|aborted/i); }); + test("interruptRun cascades to an active nested child workflow", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-interrupt"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-interruptible", + "// description: Child interruptible\nexport default function workflow({ agent }) { return agent({ id: 'slow-child', prompt: 'slow' }); }\n" + ); + await writeWorkflow( + globalRoot, + "parent-interruptible", + `// description: Parent interruptible +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-interruptible", + input: { name: "child-interruptible", args: {} }, + }); +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let agentWaitStarted = false; + let agentAbortObserved = false; + const starterService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + agentWaitStarted = true; + return await new Promise((_, reject) => { + waitOptions?.abortSignal?.addEventListener( + "abort", + () => { + agentAbortObserved = true; + reject(new Error("Task interrupted")); + }, + { once: true } + ); + }); + }, + }, + generateRunId: () => "wfr_nested_interrupt_parent", + runnerId: "runner-a", + }); + let interruptCalls = 0; + const interruptService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("interrupt service runAgent should not be called"); + }, + async interruptRun() { + interruptCalls += 1; + }, + }, + runnerId: "runner-b", + }); + + const runPromise = starterService.startNamedWorkflow({ + name: "parent-interruptible", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + const runErrorPromise = runPromise.then( + () => null, + (error: unknown) => error + ); + await waitForCondition("nested child agent to start", () => agentWaitStarted); + + const interrupted = await interruptService.interruptRun({ + workspaceId: "workspace-1", + runId: "wfr_nested_interrupt_parent", + }); + const parentRun = await runStore.getRun("wfr_nested_interrupt_parent"); + const childRunId = parentRun.steps.find( + (step) => step.stepId === "child-interruptible" + )?.taskId; + if (childRunId == null) { + throw new Error("Expected nested interrupt workflow to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + const runError = await runErrorPromise; + + expect(interrupted.status).toBe("interrupted"); + expect(childRun.status).toBe("interrupted"); + expect(agentAbortObserved).toBe(true); + expect(interruptCalls).toBeGreaterThanOrEqual(1); + expect(runError).toBeInstanceOf(Error); + expect(runError instanceof Error ? runError.message : "").toMatch(/interrupted|aborted/i); + }); + + test("backgrounds and resumes parent workflows when a nested child workflow backgrounds", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-background"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-backgroundable", + "// description: Child backgroundable\nexport default function workflow({ agent }) { const result = agent({ id: 'slow-child', prompt: 'slow' }); return { reportMarkdown: 'Child ' + result.reportMarkdown }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-backgroundable", + `// description: Parent backgroundable +export default function workflow({ action }) { + const child = action.workflows.start({ + id: "child-backgroundable", + input: { name: "child-backgroundable", args: {} }, + }); + return { reportMarkdown: "Parent " + child.reportMarkdown }; +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let calls = 0; + const backgroundFlags: Array = []; + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + calls += 1; + backgroundFlags.push(waitOptions?.backgroundOnMessageQueued); + if (calls === 1) { + throw new ForegroundWaitBackgroundedError(); + } + return { taskId: "task_slow_child", reportMarkdown: "done" }; + }, + }, + generateRunId: () => "wfr_nested_background_parent", + runnerId: "runner-a", + }); + + const result = await service.startNamedWorkflow({ + name: "parent-backgroundable", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + + expect(result).toEqual({ + runId: "wfr_nested_background_parent", + status: "backgrounded", + result: null, + }); + await waitForWorkflowStatus(runStore, "wfr_nested_background_parent", "completed"); + const parentRun = await runStore.getRun("wfr_nested_background_parent"); + const childRunId = parentRun.steps.find( + (step) => step.stepId === "child-backgroundable" + )?.taskId; + if (childRunId == null) { + throw new Error("Expected nested background workflow to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + + expect(childRun.status).toBe("completed"); + expect(parentRun.events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ type: "workflow", status: "backgrounded", runId: childRunId }), + expect.objectContaining({ type: "workflow", status: "completed", runId: childRunId }), + ]) + ); + expect(backgroundFlags).toEqual([true, false]); + }); + test("moves foreground workflow runs to background when child waits are backgrounded", async () => { using tmp = new DisposableTempDir("workflow-service"); const projectRoot = path.join(tmp.path, "project", ".mux", "workflows"); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index c60a422294..df95a2df1d 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -22,10 +22,12 @@ import type { WorkflowRunStore } from "./WorkflowRunStore"; import { WorkflowRunBackgroundedError, WorkflowRunner, + type WorkflowChildRunAdapter, type WorkflowRunnerClock, type WorkflowRunnerRunOptions, type WorkflowTaskAdapter, } from "./WorkflowRunner"; +import { MAX_NESTED_WORKFLOW_DEPTH } from "./nestedWorkflowRuns"; export interface WorkflowBackgroundRunTerminalEvent { runId: string; @@ -199,7 +201,9 @@ export class WorkflowService { async listRuns(input: { workspaceId: string }): Promise { assert(input.workspaceId.length > 0, "WorkflowService.listRuns: workspaceId is required"); const runs = await this.runStore.listRuns(); - return runs.filter((run) => run.workspaceId === input.workspaceId); + return runs.filter( + (run) => run.workspaceId === input.workspaceId && run.parentWorkflow == null + ); } async resumeCrashedRuns(input: { @@ -251,6 +255,7 @@ export class WorkflowService { "interrupted", this.clock?.nowIso() ?? new Date().toISOString() ); + await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId])); await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.(); return interrupted; } @@ -763,6 +768,141 @@ export class WorkflowService { } } + private async interruptChildWorkflowRuns( + parentRunId: string, + workspaceId: string, + seenRunIds: Set + ): Promise { + const runs = await this.runStore.listRuns(); + const activeChildren = runs.filter( + (run) => + run.workspaceId === workspaceId && + run.parentWorkflow?.runId === parentRunId && + (run.status === "pending" || run.status === "running" || run.status === "backgrounded") + ); + for (const child of activeChildren) { + if (seenRunIds.has(child.id)) { + continue; + } + seenRunIds.add(child.id); + this.abortActiveRunner(child.id); + await this.runStore.appendStatus( + child.id, + "interrupted", + this.clock?.nowIso() ?? new Date().toISOString() + ); + await this.interruptChildWorkflowRuns(child.id, workspaceId, seenRunIds); + await (this.taskAdapterFactory?.(child.id) ?? this.requireTaskAdapter()).interruptRun?.(); + } + } + + private createChildRunAdapter(projectTrusted: boolean): WorkflowChildRunAdapter { + return { + runChildWorkflowToTerminal: async (input) => { + await this.ensureChildWorkflowRun({ ...input, projectTrusted }); + const run = await this.runStore.getRun(input.childRunId); + if (run.status === "completed") { + return { + runId: input.childRunId, + status: "completed", + result: run.events.findLast((event) => event.type === "result")?.result ?? null, + }; + } + if (run.status === "failed") { + return { runId: input.childRunId, status: "failed", result: null }; + } + + const runner = await this.createRunner(input.childRunId, projectTrusted); + try { + const result = await runner.run(input.childRunId, { + abortSignal: input.abortSignal, + backgroundOnMessageQueued: input.backgroundOnMessageQueued ?? false, + allowResumeFromInterrupted: run.status === "interrupted", + }); + return { runId: input.childRunId, status: "completed", result }; + } catch (error) { + if (error instanceof WorkflowRunBackgroundedError) { + const currentRun = await this.runStore.getRun(input.childRunId); + return { runId: input.childRunId, status: currentRun.status, result: null }; + } + throw error; + } + }, + }; + } + + private async ensureChildWorkflowRun(input: { + parentRunId: string; + stepId: string; + inputHash: string; + childRunId: string; + name: string; + args: unknown; + projectTrusted: boolean; + }): Promise { + assert( + input.parentRunId.length > 0, + "WorkflowService.ensureChildWorkflowRun: parentRunId required" + ); + assert( + input.childRunId.length > 0, + "WorkflowService.ensureChildWorkflowRun: childRunId required" + ); + assert(input.stepId.length > 0, "WorkflowService.ensureChildWorkflowRun: stepId required"); + assert( + input.inputHash.length > 0, + "WorkflowService.ensureChildWorkflowRun: inputHash required" + ); + const parent = await this.runStore.getRun(input.parentRunId); + const depth = (parent.parentWorkflow?.depth ?? -1) + 1; + assert( + depth <= MAX_NESTED_WORKFLOW_DEPTH, + `Nested workflow depth limit exceeded (${MAX_NESTED_WORKFLOW_DEPTH})` + ); + + try { + const existing = await this.runStore.getRun(input.childRunId); + assert( + existing.workspaceId === parent.workspaceId, + "Child workflow workspace must match parent" + ); + assert( + existing.definition.name === input.name, + "Child workflow name must match existing run" + ); + assert( + existing.parentWorkflow?.runId === input.parentRunId && + existing.parentWorkflow.stepId === input.stepId && + existing.parentWorkflow.inputHash === input.inputHash, + "Existing child workflow run must match parent replay identity" + ); + return; + } catch (error) { + if (!isMissingWorkflowRunError(error)) { + throw error; + } + } + + const definition = await this.definitionStore.readDefinition(input.name, { + projectTrusted: input.projectTrusted, + }); + await this.runStore.createRunIfAbsent({ + id: input.childRunId, + workspaceId: parent.workspaceId, + definition: definition.descriptor, + definitionSource: definition.source, + args: input.args, + ...(this.defaultActionCwd != null ? { defaultActionCwd: this.defaultActionCwd } : {}), + parentWorkflow: { + runId: input.parentRunId, + stepId: input.stepId, + inputHash: input.inputHash, + depth, + }, + now: this.clock?.nowIso() ?? new Date().toISOString(), + }); + } + private async createRunner(runId: string, projectTrusted: boolean): Promise { // The run record always exists by the time a runner is created (create/resume/retry // paths persist it first), so resolve the definition name for task labeling here. @@ -775,6 +915,7 @@ export class WorkflowService { runtimeFactory: this.runtimeFactory, taskAdapter: this.taskAdapterFactory?.(runId, workflowName) ?? this.requireTaskAdapter(), actionRegistry: this.actionRegistry, + childRunAdapter: this.createChildRunAdapter(projectTrusted), actionRunner: this.actionRunner, getProjectTrusted: () => this.resolveCurrentProjectTrust(projectTrusted), projectTrusted, @@ -803,6 +944,13 @@ export class WorkflowService { } } +function isMissingWorkflowRunError(error: unknown): boolean { + return ( + error instanceof Error && + (("code" in error && error.code === "ENOENT") || error.message.includes("ENOENT")) + ); +} + function isWorkflowRunAlreadyActiveError(error: unknown, runId: string): boolean { return error instanceof Error && error.message === `Workflow run is already active: ${runId}`; } diff --git a/src/node/services/workflows/builtInWorkflowActions.ts b/src/node/services/workflows/builtInWorkflowActions.ts index 8bad24d515..645ad073ba 100644 --- a/src/node/services/workflows/builtInWorkflowActions.ts +++ b/src/node/services/workflows/builtInWorkflowActions.ts @@ -881,7 +881,30 @@ module.exports.execute = async function (rawInput, ctx) { module.exports.reconcile = module.exports.execute; `; +const WORKFLOWS_START_SOURCE = String.raw` +module.exports.metadata = { + version: 1, + description: "Start a child workflow from the current workflow and wait for its terminal result", + effect: "workspace", + inputSchema: { + type: "object", + required: ["name"], + properties: { + name: { type: "string" }, + args: {}, + }, + }, + outputSchema: { type: "object" }, + timeoutMs: 86400000, +}; + +module.exports.execute = async function () { + throw new Error("workflows.start is executed by the workflow runner"); +}; +`; + const STATIC_BUILT_IN_WORKFLOW_ACTION_SOURCES: Record = { + "workflows.start": WORKFLOWS_START_SOURCE, "git.status": GIT_STATUS_SOURCE, "git.commitsBetween": GIT_COMMITS_BETWEEN_SOURCE, "git.diff": GIT_DIFF_SOURCE, diff --git a/src/node/services/workflows/nestedWorkflowRuns.ts b/src/node/services/workflows/nestedWorkflowRuns.ts new file mode 100644 index 0000000000..ff13532019 --- /dev/null +++ b/src/node/services/workflows/nestedWorkflowRuns.ts @@ -0,0 +1,31 @@ +import * as crypto from "node:crypto"; + +import { WorkflowRunIdSchema } from "@/common/orpc/schemas"; +import assert from "@/common/utils/assert"; + +export const MAX_NESTED_WORKFLOW_DEPTH = 8; + +export function deriveChildWorkflowRunId(input: { + parentRunId: string; + stepId: string; + inputHash: string; +}): string { + assert(input.parentRunId.length > 0, "deriveChildWorkflowRunId: parentRunId is required"); + assert(input.stepId.length > 0, "deriveChildWorkflowRunId: stepId is required"); + assert(input.inputHash.length > 0, "deriveChildWorkflowRunId: inputHash is required"); + const digest = crypto + .createHash("sha256") + .update(input.parentRunId) + .update("\0") + .update(input.stepId) + .update("\0") + .update(input.inputHash) + .digest("base64url") + .slice(0, 32); + const runId = `wfr_child_${digest}`; + assert( + WorkflowRunIdSchema.safeParse(runId).success, + "deriveChildWorkflowRunId: derived child run id must satisfy workflow run id schema" + ); + return runId; +} From 38ea9e4f70de6774b5f55c1250174653fedea6b5 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 17:30:23 +0000 Subject: [PATCH 2/6] =?UTF-8?q?=F0=9F=A4=96=20fix:=20harden=20nested=20wor?= =?UTF-8?q?kflow=20lifecycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address deep-review findings for nested workflow runs: re-check current project trust, finalize failed child steps/events, avoid interrupt/create orphans, tolerate child lease contention, recover partial child run directories, expose runner-coordinated workflows.start for runtime-backed workspaces, repair missing terminal events, and document nested discovery semantics. Validation: - bun test src/node/services/workflows/WorkflowService.test.ts src/node/services/workflows/WorkflowRunStore.test.ts src/node/services/workflows/WorkflowActionRegistry.test.ts src/browser/features/Tools/WorkflowRunToolCall.test.tsx src/common/utils/workflowRunMessages.test.ts --timeout 20000 - make typecheck - MUX_ESLINT_CONCURRENCY=2 make static-check --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `2628699{MUX_COSTS_USD:-unknown}`_ --- docs/hooks/tools.mdx | 4 +- src/common/utils/tools/toolDefinitions.ts | 8 +- src/node/builtinSkills/workflow-authoring.md | 1 + .../builtInSkillContent.generated.ts | 5 +- .../workflows/WorkflowActionRegistry.test.ts | 14 ++ .../workflows/WorkflowActionRegistry.ts | 10 +- .../workflows/WorkflowRunStore.test.ts | 55 +++++ .../services/workflows/WorkflowRunStore.ts | 84 ++++--- src/node/services/workflows/WorkflowRunner.ts | 123 +++++++++- .../workflows/WorkflowService.test.ts | 123 ++++++++++ .../services/workflows/WorkflowService.ts | 218 +++++++++++++++--- 11 files changed, 549 insertions(+), 96 deletions(-) diff --git a/docs/hooks/tools.mdx b/docs/hooks/tools.mdx index baa5986320..b0b6fb846a 100644 --- a/docs/hooks/tools.mdx +++ b/docs/hooks/tools.mdx @@ -659,8 +659,8 @@ If a value is too large for the environment, it may be omitted (not set). Mux al | `MUX_TOOL_INPUT_FILTER` | `filter` | string | Optional regex to filter bash task output lines. By default, only matching lines are returned. When filter_exclude is true, matching lines are excluded instead. Non-matching lines are discarded and cannot be retrieved later. | | `MUX_TOOL_INPUT_FILTER_EXCLUDE` | `filter_exclude` | boolean | When true, lines matching 'filter' are excluded instead of kept. Requires 'filter' to be set. | | `MUX_TOOL_INPUT_MIN_COMPLETED` | `min_completed` | number | Number of awaited tasks that must complete before this call returns. Defaults to 1, so by default task_await returns as soon as the FIRST awaited task completes, letting you act on it while the rest keep running. The result still includes every task complete at that moment plus current status (running/queued) for the rest. Tasks that have not yet completed keep running and remain re-awaitable on a later task_await call. Raise this (e.g. set it to the total number of awaited tasks) when you genuinely need more before proceeding — for example best-of-N synthesis that must compare every candidate. Clamped to the number of awaited tasks; values above that behave like 'wait for all'. | -| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. | -| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs.) | +| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs. | +| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs.) | | `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\|starting\|running\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. | diff --git a/src/common/utils/tools/toolDefinitions.ts b/src/common/utils/tools/toolDefinitions.ts index e19a92705f..aa6e98cd88 100644 --- a/src/common/utils/tools/toolDefinitions.ts +++ b/src/common/utils/tools/toolDefinitions.ts @@ -509,8 +509,8 @@ export const TaskAwaitToolArgsSchema = z .nullish() .describe( "List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. " + - "task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. " + - "When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs." + "task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. " + + "When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs." ), filter: z .string() @@ -1780,7 +1780,7 @@ export const TOOL_DEFINITIONS = { "\n\nIMPORTANT: Do not call task_await in the same parallel tool-call batch as task, bash, or workflow_run — " + "the taskId/runId is not available until the spawning tool returns. " + "Always wait for the task/bash/workflow_run tool result first, then call task_await in a subsequent step. " + - "When omitting task_ids to await active tasks/workflows, ensure at least one background task or workflow was already spawned in a prior step. Omitted task_ids exclude workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. " + + "When omitting task_ids to await active tasks/workflows, ensure at least one background task or workflow was already spawned in a prior step. Omitted task_ids discover top-level workflow runs only and exclude workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs. " + "\n\nAgent tasks and workflow runs return reports when completed. " + "Completed reports are persisted on disk and survive context compaction: calling task_await on an already-completed task/workflow run ID (timeout_secs: 0 for non-blocking) re-fetches the full report instead of re-running the work. " + "Bash tasks return incremental output while running and a final reportMarkdown when they exit. " + @@ -1807,7 +1807,7 @@ export const TOOL_DEFINITIONS = { task_list: { description: "List descendant tasks for the current workspace, including status + metadata. " + - "This includes sub-agent tasks, background bash tasks, and workflow runs, but omits workflow-owned sub-agents (and their background bash tasks) whose reports are consumed through their workflow run. " + + "This includes sub-agent tasks, background bash tasks, and top-level workflow runs, but omits workflow-owned sub-agents/background bash tasks and nested child workflow runs whose reports are consumed through parent workflow runs. " + "Use this after compaction, interruptions, or an app restart to rediscover active tasks and resumable workflow runs (statuses interrupted/failed; resume with workflow_resume). " + "This is a discovery tool, NOT a waiting mechanism. If the current request actually depends on a task's output, call task_await with the specific task IDs you need; do not await all active tasks just because they appear here.", schema: TaskListToolArgsSchema, diff --git a/src/node/builtinSkills/workflow-authoring.md b/src/node/builtinSkills/workflow-authoring.md index 4bc4d119d6..98172a4287 100644 --- a/src/node/builtinSkills/workflow-authoring.md +++ b/src/node/builtinSkills/workflow-authoring.md @@ -225,6 +225,7 @@ Replay semantics: - Child workflow step IDs stay local to the child run; they do not collide with parent step IDs. - Parent replay/resume reuses the linked child run, even if the child workflow source has changed since the child run was created. - Child completion returns `{ runId, status, name, reportMarkdown, structuredOutput? }` to the parent. +- Child runs are omitted from top-level `task_list` and omitted-ID `task_await` discovery because the parent consumes their result; explicitly awaiting a known child `runId` still works. - Child failure fails the parent step; parent interruption cascades to active child workflow runs. - V1 is same-workspace and wait-to-terminal only; fire-and-forget child workflows are intentionally rejected. diff --git a/src/node/services/agentSkills/builtInSkillContent.generated.ts b/src/node/services/agentSkills/builtInSkillContent.generated.ts index 189d9378b4..80ff8d9ccb 100644 --- a/src/node/services/agentSkills/builtInSkillContent.generated.ts +++ b/src/node/services/agentSkills/builtInSkillContent.generated.ts @@ -4589,8 +4589,8 @@ export const BUILTIN_SKILL_FILES: Record> = { "| `MUX_TOOL_INPUT_FILTER` | `filter` | string | Optional regex to filter bash task output lines. By default, only matching lines are returned. When filter_exclude is true, matching lines are excluded instead. Non-matching lines are discarded and cannot be retrieved later. |", "| `MUX_TOOL_INPUT_FILTER_EXCLUDE` | `filter_exclude` | boolean | When true, lines matching 'filter' are excluded instead of kept. Requires 'filter' to be set. |", "| `MUX_TOOL_INPUT_MIN_COMPLETED` | `min_completed` | number | Number of awaited tasks that must complete before this call returns. Defaults to 1, so by default task_await returns as soon as the FIRST awaited task completes, letting you act on it while the rest keep running. The result still includes every task complete at that moment plus current status (running/queued) for the rest. Tasks that have not yet completed keep running and remain re-awaitable on a later task_await call. Raise this (e.g. set it to the total number of awaited tasks) when you genuinely need more before proceeding — for example best-of-N synthesis that must compare every candidate. Clamped to the number of awaited tasks; values above that behave like 'wait for all'. |", - "| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. |", - "| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs.) |", + "| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs. |", + "| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs.) |", "| `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\\|starting\\|running\\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. |", "", "", @@ -7169,6 +7169,7 @@ export const BUILTIN_SKILL_FILES: Record> = { "- Child workflow step IDs stay local to the child run; they do not collide with parent step IDs.", "- Parent replay/resume reuses the linked child run, even if the child workflow source has changed since the child run was created.", "- Child completion returns `{ runId, status, name, reportMarkdown, structuredOutput? }` to the parent.", + "- Child runs are omitted from top-level `task_list` and omitted-ID `task_await` discovery because the parent consumes their result; explicitly awaiting a known child `runId` still works.", "- Child failure fails the parent step; parent interruption cascades to active child workflow runs.", "- V1 is same-workspace and wait-to-terminal only; fire-and-forget child workflows are intentionally rejected.", "", diff --git a/src/node/services/workflows/WorkflowActionRegistry.test.ts b/src/node/services/workflows/WorkflowActionRegistry.test.ts index a038809b38..a4a1bcd1de 100644 --- a/src/node/services/workflows/WorkflowActionRegistry.test.ts +++ b/src/node/services/workflows/WorkflowActionRegistry.test.ts @@ -258,6 +258,20 @@ describe("WorkflowActionRegistry", () => { } }); + test("allows runner-coordinated workflows.start in runtime-backed workspaces", async () => { + using tmp = new DisposableTempDir("workflow-actions-runtime-workflows-start"); + const registry = new WorkflowActionRegistry({ + projectRoot: "/runtime/project/.mux/actions", + globalRoot: path.join(tmp.path, "global-actions"), + projectRuntime: createRuntimeWithoutActions(), + projectCwd: "/runtime/project", + }); + + const action = await registry.resolveAction("workflows.start", { projectTrusted: true }); + + expect(action).toMatchObject({ name: "workflows.start", scope: "built-in" }); + }); + test("blocks project-local actions without Project Trust while allowing global actions", async () => { using tmp = new DisposableTempDir("workflow-actions-trust"); const projectRoot = path.join(tmp.path, "project-actions"); diff --git a/src/node/services/workflows/WorkflowActionRegistry.ts b/src/node/services/workflows/WorkflowActionRegistry.ts index c6b07b2eeb..0253ddb4bb 100644 --- a/src/node/services/workflows/WorkflowActionRegistry.ts +++ b/src/node/services/workflows/WorkflowActionRegistry.ts @@ -74,7 +74,7 @@ export class WorkflowActionRegistry { if (builtInAction == null) { throw new Error(`Built-in workflow action not found: ${normalizedName}`); } - if (this.projectRuntime != null) { + if (this.projectRuntime != null && !isRunnerCoordinatedBuiltInAction(normalizedName)) { throw new Error("Workflow actions are not supported for runtime-backed workspaces yet"); } return builtInAction; @@ -101,7 +101,7 @@ export class WorkflowActionRegistry { const builtInAction = this.readBuiltInAction(normalizedName); if (builtInAction != null) { - if (this.projectRuntime != null) { + if (this.projectRuntime != null && !isRunnerCoordinatedBuiltInAction(normalizedName)) { throw new Error("Workflow actions are not supported for runtime-backed workspaces yet"); } return builtInAction; @@ -213,7 +213,11 @@ export class WorkflowActionRegistry { } } -export function normalizeWorkflowActionName(name: string): string { +function isRunnerCoordinatedBuiltInAction(name: string): boolean { + return name === "workflows.start"; +} + +function normalizeWorkflowActionName(name: string): string { assert(typeof name === "string", "Workflow action name must be a string"); const normalized = name.trim(); assert(normalized.length > 0, "Workflow action name is required"); diff --git a/src/node/services/workflows/WorkflowRunStore.test.ts b/src/node/services/workflows/WorkflowRunStore.test.ts index 3ba1690876..3013fe3ee2 100644 --- a/src/node/services/workflows/WorkflowRunStore.test.ts +++ b/src/node/services/workflows/WorkflowRunStore.test.ts @@ -68,6 +68,61 @@ describe("WorkflowRunStore", () => { ).rejects.toThrow(/runId must match/); }); + test("createRunIfAbsent recovers an incomplete deterministic run directory", async () => { + using tmp = new DisposableTempDir("workflow-runs-partial-child"); + const store = new WorkflowRunStore({ sessionDir: tmp.path }); + await fs.mkdir(path.join(tmp.path, "workflows", "wfr_child_partial"), { recursive: true }); + + const run = await store.createRunIfAbsent({ + id: "wfr_child_partial", + workspaceId: "workspace-1", + definition, + definitionSource: source, + args: { topic: "nested" }, + parentWorkflow: { + runId: "wfr_parent", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + now: "2026-05-29T00:00:00.000Z", + }); + + expect(run.id).toBe("wfr_child_partial"); + await expect(store.getRun("wfr_child_partial")).resolves.toMatchObject({ + id: "wfr_child_partial", + parentWorkflow: { runId: "wfr_parent" }, + }); + }); + + test("createRunIfAbsent reuses a snapshotted child run after definition source changes", async () => { + using tmp = new DisposableTempDir("workflow-runs-child-source-change"); + const store = new WorkflowRunStore({ sessionDir: tmp.path }); + const input = { + id: "wfr_child_source_change", + workspaceId: "workspace-1", + definition, + args: { topic: "nested" }, + parentWorkflow: { + runId: "wfr_parent", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + now: "2026-05-29T00:00:00.000Z", + }; + const created = await store.createRunIfAbsent({ ...input, definitionSource: source }); + + const reused = await store.createRunIfAbsent({ + ...input, + definitionSource: + "export default function workflow() { return { reportMarkdown: 'new' }; }\n", + }); + + expect(reused.id).toBe(created.id); + expect(reused.definitionSource).toBe(source); + }); + test("ignores malformed journal lines while preserving valid events and steps", async () => { using tmp = new DisposableTempDir("workflow-runs"); const store = await createStore(tmp.path); diff --git a/src/node/services/workflows/WorkflowRunStore.ts b/src/node/services/workflows/WorkflowRunStore.ts index cae36c708c..d2bb743c7e 100644 --- a/src/node/services/workflows/WorkflowRunStore.ts +++ b/src/node/services/workflows/WorkflowRunStore.ts @@ -126,43 +126,60 @@ export class WorkflowRunStore { const runDir = this.runDir(input.id); await fs.mkdir(this.workflowsDir(), { recursive: true }); + const lockDir = `${runDir}.create.lock`; + await acquireWorkflowMutationLock( + lockDir, + this.leaseMutationLockStaleMs(), + this.leaseMutationWaitTimeoutMs() + ); try { + const existing = await this.getRunIfFullyCreated(input.id); + if (existing != null) { + assertSameWorkflowRunIdentity(existing, input); + return existing; + } + + // A deterministic child run ID must be recoverable after a crash between mkdir and + // run.json. Treat an unreadable run directory as an incomplete create, not identity. + await fs.rm(runDir, { recursive: true, force: true }); await fs.mkdir(runDir, { recursive: false }); - } catch (error) { - if (!isAlreadyExistsError(error)) { + try { + await fs.writeFile(path.join(runDir, "definition.js"), input.definitionSource, "utf-8"); + await fs.writeFile(path.join(runDir, "events.jsonl"), "", { flag: "a" }); + await fs.writeFile(path.join(runDir, "steps.jsonl"), "", { flag: "a" }); + + const run = WorkflowRunRecordSchema.parse({ + id: input.id, + workspaceId: input.workspaceId, + definition: input.definition, + definitionSource: input.definitionSource, + definitionHash: hashSource(input.definitionSource), + args: input.args, + ...(input.defaultActionCwd != null ? { defaultActionCwd: input.defaultActionCwd } : {}), + ...(input.parentWorkflow != null ? { parentWorkflow: input.parentWorkflow } : {}), + status: "pending", + createdAt: input.now, + updatedAt: input.now, + events: [], + steps: [], + }); + + await this.writeRunFile(input.id, run); + return run; + } catch (error) { + await fs.rm(runDir, { recursive: true, force: true }); throw error; } - const existing = await this.getRun(input.id); - assertSameWorkflowRunIdentity(existing, input); - return existing; + } finally { + await fs.rm(lockDir, { recursive: true, force: true }); } + } + private async getRunIfFullyCreated(runId: string): Promise { try { - await fs.writeFile(path.join(runDir, "definition.js"), input.definitionSource, "utf-8"); - await fs.writeFile(path.join(runDir, "events.jsonl"), "", { flag: "a" }); - await fs.writeFile(path.join(runDir, "steps.jsonl"), "", { flag: "a" }); - - const run = WorkflowRunRecordSchema.parse({ - id: input.id, - workspaceId: input.workspaceId, - definition: input.definition, - definitionSource: input.definitionSource, - definitionHash: hashSource(input.definitionSource), - args: input.args, - ...(input.defaultActionCwd != null ? { defaultActionCwd: input.defaultActionCwd } : {}), - ...(input.parentWorkflow != null ? { parentWorkflow: input.parentWorkflow } : {}), - status: "pending", - createdAt: input.now, - updatedAt: input.now, - events: [], - steps: [], - }); - - await this.writeRunFile(input.id, run); - return run; - } catch (error) { - await fs.rm(runDir, { recursive: true, force: true }); - throw error; + return await this.getRun(runId); + } catch { + return null; } } @@ -908,21 +925,14 @@ function assertValidWorkflowRunId(runId: string): void { ); } -function isAlreadyExistsError(error: unknown): boolean { - return error instanceof Error && "code" in error && error.code === "EEXIST"; -} - function assertSameWorkflowRunIdentity( run: WorkflowRunRecord, input: CreateWorkflowRunInput ): void { - const expectedDefinitionHash = hashSource(input.definitionSource); const sameIdentity = run.id === input.id && run.workspaceId === input.workspaceId && run.definition.name === input.definition.name && - run.definition.scope === input.definition.scope && - run.definitionHash === expectedDefinitionHash && JSON.stringify(run.args) === JSON.stringify(input.args) && JSON.stringify(run.parentWorkflow ?? null) === JSON.stringify(input.parentWorkflow ?? null); assert( diff --git a/src/node/services/workflows/WorkflowRunner.ts b/src/node/services/workflows/WorkflowRunner.ts index fb61d8948a..a34dc35675 100644 --- a/src/node/services/workflows/WorkflowRunner.ts +++ b/src/node/services/workflows/WorkflowRunner.ts @@ -843,7 +843,15 @@ export class WorkflowRunner { }); const existingStep = await this.runStore.getStep(runId, spec.id, inputHash); if (existingStep?.status === "completed" && existingStep.result?.structuredOutput != null) { - return normalizeWorkflowNestedResult(existingStep.result.structuredOutput); + const cached = normalizeWorkflowNestedResult(existingStep.result.structuredOutput); + await this.recordWorkflowTerminalEventIfMissing(runId, sequence, { + stepId: spec.id, + runId: cached.runId, + name: cached.name, + status: "completed", + details: cached, + }); + return cached; } const run = await this.runStore.getRun(runId); @@ -877,16 +885,29 @@ export class WorkflowRunner { status: "started", }); - const child = await childRunAdapter.runChildWorkflowToTerminal({ - parentRunId: runId, - stepId: spec.id, - inputHash, - childRunId, - name: childInput.name, - args: childInput.args, - backgroundOnMessageQueued: options.backgroundOnMessageQueued, - abortSignal: options.abortSignal, - }); + let child: Awaited>; + try { + child = await childRunAdapter.runChildWorkflowToTerminal({ + parentRunId: runId, + stepId: spec.id, + inputHash, + childRunId, + name: childInput.name, + args: childInput.args, + backgroundOnMessageQueued: options.backgroundOnMessageQueued, + abortSignal: options.abortSignal, + }); + } catch (error) { + await this.recordNestedWorkflowFailed(runId, sequence, { + stepId: spec.id, + inputHash, + childRunId, + name: childInput.name, + startedAt, + error, + }); + throw error; + } if (child.status === "backgrounded" && options.backgroundOnMessageQueued !== false) { await this.appendEvent(runId, { sequence: sequence.next(), @@ -957,6 +978,86 @@ export class WorkflowRunner { return nestedResult; } + private async recordNestedWorkflowFailed( + runId: string, + sequence: WorkflowEventSequence, + input: { + stepId: string; + inputHash: string; + childRunId: string; + name: string; + startedAt: string; + error: unknown; + } + ): Promise { + const message = getErrorMessage(input.error); + try { + await this.recordStepFailed(runId, { + stepId: input.stepId, + inputHash: input.inputHash, + taskId: input.childRunId, + error: message, + startedAt: input.startedAt, + completedAt: this.clock.nowIso(), + }); + await this.recordWorkflowTerminalEventIfMissing(runId, sequence, { + stepId: input.stepId, + runId: input.childRunId, + name: input.name, + status: "failed", + details: { error: message }, + }); + } catch (recordError) { + const run = await this.runStore.getRun(runId); + if (run.status !== "interrupted") { + throw recordError; + } + } + } + + private async recordWorkflowTerminalEventIfMissing( + runId: string, + sequence: WorkflowEventSequence, + workflow: { + stepId: string; + runId: string; + name: string; + status: "completed" | "failed" | "interrupted"; + details?: unknown; + } + ): Promise { + assert( + runId.length > 0, + "WorkflowRunner.recordWorkflowTerminalEventIfMissing: runId is required" + ); + assert(workflow.stepId.length > 0, "WorkflowRunner: workflow event stepId is required"); + assert(workflow.runId.length > 0, "WorkflowRunner: workflow event runId is required"); + assert(workflow.name.length > 0, "WorkflowRunner: workflow event name is required"); + + await using _lock = await this.taskEventMutex.acquire(); + const run = await this.runStore.getRun(runId); + const alreadyRecorded = run.events.some( + (event) => + event.type === "workflow" && + event.stepId === workflow.stepId && + event.runId === workflow.runId && + event.status === workflow.status + ); + if (alreadyRecorded) { + return; + } + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: workflow.stepId, + runId: workflow.runId, + name: workflow.name, + status: workflow.status, + details: workflow.details, + }); + } + private async getCachedWorkflowActionEffect( runId: string, stepId: string, diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index f9c85da499..f0bef9ebbb 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -369,6 +369,129 @@ export default function workflow({ args, agent }) { ).toHaveLength(1); }); + test("records terminal parent step state when a nested child workflow fails", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-failure"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-fails", + "// description: Child fails\nexport default function workflow() { throw new Error('child boom'); }\n" + ); + await writeWorkflow( + globalRoot, + "parent-catches-child-failure", + `// description: Parent catches child failure +export default function workflow({ action }) { + try { + action.workflows.start({ + id: "child-fails", + input: { name: "child-fails", args: {} }, + }); + } catch (error) { + return { reportMarkdown: "caught " + error.message }; + } + return { reportMarkdown: "unexpected" }; +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + generateRunId: () => "wfr_parent_catches_child_failure", + runnerId: "runner-a", + }); + + const result = await service.startNamedWorkflow({ + name: "parent-catches-child-failure", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + const parentRun = await runStore.getRun("wfr_parent_catches_child_failure"); + const childStep = parentRun.steps.find((step) => step.stepId === "child-fails"); + + expect(result.status).toBe("completed"); + expect(childStep?.status).toBe("failed"); + expect(parentRun.events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: "workflow", + stepId: "child-fails", + status: "failed", + }), + ]) + ); + }); + + test("applies current project trust before starting nested project workflows", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-trust"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + projectRoot, + "child-project", + "// description: Project child\nexport default function workflow() { return { reportMarkdown: 'project child' }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-project-child", + `// description: Parent project child +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-project", + input: { name: "child-project", args: {} }, + }); +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + getCurrentProjectTrusted: () => false, + generateRunId: () => "wfr_parent_project_child", + runnerId: "runner-a", + }); + + await expect( + service.startNamedWorkflow({ + name: "parent-project-child", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }) + ).rejects.toThrow(/Project trust|Workflow definition not found/); + const parentRun = await runStore.getRun("wfr_parent_project_child"); + + expect(parentRun.steps.find((step) => step.stepId === "child-project")?.status).toBe("failed"); + }); + test("runs workspace scratch workflow definitions authored as files", async () => { using tmp = new DisposableTempDir("workflow-service"); const workspaceRoot = path.join(tmp.path, "project"); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index df95a2df1d..726543686d 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -97,6 +97,8 @@ export interface StartNamedWorkflowResult { result: unknown; } +const CHILD_WORKFLOW_ACTIVE_POLL_MS = 1_000; + const WORKFLOW_BACKGROUND_CONTINUATION_STATUSES = new Set([ "completed", "failed", @@ -343,7 +345,7 @@ export class WorkflowService { >; backgroundedFailureMessage: string; }): Promise { - if (input.abortSignal?.aborted === true) { + if (isAbortSignalAborted(input.abortSignal)) { // The caller was aborted before the runner started; leave the run in its current // (still resumable) state instead of churning status transitions. throw new Error(`Workflow run interrupted: ${input.runId}`); @@ -463,7 +465,7 @@ export class WorkflowService { async startNamedWorkflow(input: StartNamedWorkflowInput): Promise { const runId = await this.createNamedWorkflowRun(input); - if (input.abortSignal?.aborted === true) { + if (isAbortSignalAborted(input.abortSignal)) { await this.interruptRun({ workspaceId: input.workspaceId, runId }); throw new Error(`Workflow run interrupted: ${runId}`); } @@ -778,54 +780,165 @@ export class WorkflowService { (run) => run.workspaceId === workspaceId && run.parentWorkflow?.runId === parentRunId && - (run.status === "pending" || run.status === "running" || run.status === "backgrounded") + isInterruptibleWorkflowRunStatus(run.status) ); for (const child of activeChildren) { - if (seenRunIds.has(child.id)) { - continue; - } - seenRunIds.add(child.id); - this.abortActiveRunner(child.id); - await this.runStore.appendStatus( - child.id, + await this.interruptChildWorkflowRun(child, workspaceId, seenRunIds); + } + } + + private async interruptChildWorkflowRun( + child: WorkflowRunRecord, + workspaceId: string, + seenRunIds: Set + ): Promise { + if (seenRunIds.has(child.id)) { + return; + } + seenRunIds.add(child.id); + this.abortActiveRunner(child.id); + const interrupted = await this.appendInterruptedIfActive(child.id); + if ( + !isInterruptibleWorkflowRunStatus(interrupted.status) && + interrupted.status !== "interrupted" + ) { + return; + } + await this.interruptChildWorkflowRuns(child.id, workspaceId, seenRunIds); + await (this.taskAdapterFactory?.(child.id) ?? this.requireTaskAdapter()).interruptRun?.(); + } + + private async appendInterruptedIfActive(runId: string): Promise { + const run = await this.runStore.getRun(runId); + if (!isInterruptibleWorkflowRunStatus(run.status)) { + return run; + } + try { + return await this.runStore.appendStatus( + runId, "interrupted", this.clock?.nowIso() ?? new Date().toISOString() ); - await this.interruptChildWorkflowRuns(child.id, workspaceId, seenRunIds); - await (this.taskAdapterFactory?.(child.id) ?? this.requireTaskAdapter()).interruptRun?.(); + } catch (error) { + const latest = await this.runStore.getRun(runId); + if (!isInterruptibleWorkflowRunStatus(latest.status)) { + return latest; + } + throw error; + } + } + + private async interruptExistingChildWorkflowRun( + runId: string + ): Promise { + const run = await this.getRunIfPresent(runId); + if (run == null) { + return null; + } + this.abortActiveRunner(runId); + return await this.appendInterruptedIfActive(runId); + } + + private async getRunIfPresent(runId: string): Promise { + try { + return await this.runStore.getRun(runId); + } catch (error) { + if (isMissingWorkflowRunError(error)) { + return null; + } + throw error; } } private createChildRunAdapter(projectTrusted: boolean): WorkflowChildRunAdapter { return { runChildWorkflowToTerminal: async (input) => { - await this.ensureChildWorkflowRun({ ...input, projectTrusted }); - const run = await this.runStore.getRun(input.childRunId); - if (run.status === "completed") { - return { - runId: input.childRunId, - status: "completed", - result: run.events.findLast((event) => event.type === "result")?.result ?? null, - }; - } - if (run.status === "failed") { - return { runId: input.childRunId, status: "failed", result: null }; - } + while (true) { + const currentProjectTrusted = await this.resolveCurrentProjectTrust(projectTrusted); + if (isAbortSignalAborted(input.abortSignal)) { + const interrupted = await this.interruptExistingChildWorkflowRun(input.childRunId); + return { + runId: input.childRunId, + status: interrupted?.status ?? "interrupted", + result: null, + }; + } - const runner = await this.createRunner(input.childRunId, projectTrusted); - try { - const result = await runner.run(input.childRunId, { - abortSignal: input.abortSignal, - backgroundOnMessageQueued: input.backgroundOnMessageQueued ?? false, - allowResumeFromInterrupted: run.status === "interrupted", - }); - return { runId: input.childRunId, status: "completed", result }; - } catch (error) { - if (error instanceof WorkflowRunBackgroundedError) { - const currentRun = await this.runStore.getRun(input.childRunId); - return { runId: input.childRunId, status: currentRun.status, result: null }; + await this.ensureChildWorkflowRun({ ...input, projectTrusted: currentProjectTrusted }); + let run = await this.runStore.getRun(input.childRunId); + if (run.status === "completed") { + return { + runId: input.childRunId, + status: "completed", + result: getWorkflowRunResult(run), + }; + } + if (run.status === "failed") { + return { runId: input.childRunId, status: "failed", result: getWorkflowRunResult(run) }; + } + assertRunCanResumeWithCurrentTrust(run, currentProjectTrusted); + + if (isAbortSignalAborted(input.abortSignal)) { + const interrupted = await this.appendInterruptedIfActive(input.childRunId); + return { runId: input.childRunId, status: interrupted.status, result: null }; + } + + const retryDelayMs = await this.runStore.getLeaseRetryDelayMs( + input.childRunId, + this.clock?.nowMs() ?? Date.now() + ); + if (retryDelayMs > 0) { + if (input.backgroundOnMessageQueued !== false) { + return { runId: input.childRunId, status: "backgrounded", result: null }; + } + await waitForAbortableDelay( + Math.min(retryDelayMs, CHILD_WORKFLOW_ACTIVE_POLL_MS), + input.abortSignal + ); + continue; + } + + const runner = await this.createRunner(input.childRunId, currentProjectTrusted); + try { + const result = await runner.run(input.childRunId, { + abortSignal: input.abortSignal, + backgroundOnMessageQueued: input.backgroundOnMessageQueued ?? false, + allowResumeFromInterrupted: run.status === "interrupted", + }); + return { runId: input.childRunId, status: "completed", result }; + } catch (error) { + if (error instanceof WorkflowRunBackgroundedError) { + run = await this.runStore.getRun(input.childRunId); + return { runId: input.childRunId, status: run.status, result: null }; + } + if (isWorkflowRunAlreadyActiveError(error, input.childRunId)) { + if (input.backgroundOnMessageQueued !== false) { + return { runId: input.childRunId, status: "backgrounded", result: null }; + } + const activeRetryDelayMs = await this.runStore.getLeaseRetryDelayMs( + input.childRunId, + this.clock?.nowMs() ?? Date.now() + ); + await waitForAbortableDelay( + Math.min(Math.max(1, activeRetryDelayMs), CHILD_WORKFLOW_ACTIVE_POLL_MS), + input.abortSignal + ); + continue; + } + if (isAbortSignalAborted(input.abortSignal)) { + const interrupted = await this.appendInterruptedIfActive(input.childRunId); + return { runId: input.childRunId, status: interrupted.status, result: null }; + } + const currentRun = await this.getRunIfPresent(input.childRunId); + if (currentRun?.status === "failed" || currentRun?.status === "interrupted") { + return { + runId: input.childRunId, + status: currentRun.status, + result: getWorkflowRunResult(currentRun), + }; + } + throw error; } - throw error; } }, }; @@ -964,6 +1077,37 @@ function unrefTimer(timer: ReturnType): void { } } +function isAbortSignalAborted(abortSignal?: AbortSignal): boolean { + return abortSignal?.aborted === true; +} + +function isInterruptibleWorkflowRunStatus(status: WorkflowRunStatus): boolean { + return status === "pending" || status === "running" || status === "backgrounded"; +} + +function getWorkflowRunResult(run: WorkflowRunRecord): unknown { + return run.events.findLast((event) => event.type === "result")?.result ?? null; +} + +async function waitForAbortableDelay(delayMs: number, abortSignal?: AbortSignal): Promise { + assert(delayMs > 0, "WorkflowService.waitForAbortableDelay: delayMs must be positive"); + if (abortSignal?.aborted === true) { + return; + } + await new Promise((resolve) => { + const timer = setTimeout(resolve, delayMs); + unrefTimer(timer); + abortSignal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true } + ); + }); +} + function canResumeRunWithCurrentTrust(run: WorkflowRunRecord, projectTrusted: boolean): boolean { return ( (run.definition.scope !== "project" && run.definition.scope !== "scratch") || projectTrusted From b9ae78b4b7b412f233421225860a1fd1420683d1 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 17:44:34 +0000 Subject: [PATCH 3/6] =?UTF-8?q?=F0=9F=A4=96=20fix:=20address=20nested=20wo?= =?UTF-8?q?rkflow=20review=20gaps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Preserve explicit null args for nested workflow starts and keep completed child workflow reports out of post-compaction top-level report indexes. Validation: - bun test src/node/services/workflows/WorkflowService.test.ts src/node/services/attachmentService.completedReports.test.ts --timeout 20000 - MUX_ESLINT_CONCURRENCY=2 make static-check --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `2874038{MUX_COSTS_USD:-unknown}`_ --- ...attachmentService.completedReports.test.ts | 18 +++++++ src/node/services/attachmentService.ts | 8 ++- src/node/services/workflows/WorkflowRunner.ts | 5 +- .../workflows/WorkflowService.test.ts | 50 +++++++++++++++++++ 4 files changed, 78 insertions(+), 3 deletions(-) diff --git a/src/node/services/attachmentService.completedReports.test.ts b/src/node/services/attachmentService.completedReports.test.ts index 1e56c69ff0..c1c22a23b9 100644 --- a/src/node/services/attachmentService.completedReports.test.ts +++ b/src/node/services/attachmentService.completedReports.test.ts @@ -44,6 +44,12 @@ async function writeWorkflowRun( workspaceId?: string; completedAt?: string; failed?: boolean; + parentWorkflow?: { + runId: string; + stepId: string; + inputHash: string; + depth: number; + }; reportMarkdown?: string; } ): Promise { @@ -57,6 +63,7 @@ async function writeWorkflowRun( scope: "built-in" as const, executable: true, }, + ...(params.parentWorkflow !== undefined ? { parentWorkflow: params.parentWorkflow } : {}), definitionSource: "export default async function workflow() { return 'ok'; }\n", args: {}, now: "2026-06-01T00:00:00.000Z", @@ -171,6 +178,17 @@ describe("AttachmentService.generateCompletedReportsAttachment", () => { completedAt: "2026-06-01T12:00:00.000Z", }); + await writeWorkflowRun(tmp.path, { + runId: "wfr_child_done", + completedAt: "2026-06-01T12:00:00.000Z", + parentWorkflow: { + runId: "wfr_parent_done", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + }); + const attachment = await AttachmentService.generateCompletedReportsAttachment({ workspaceId: WORKSPACE_ID, sessionDir: tmp.path, diff --git a/src/node/services/attachmentService.ts b/src/node/services/attachmentService.ts index 7a1149a88a..515c4da76a 100644 --- a/src/node/services/attachmentService.ts +++ b/src/node/services/attachmentService.ts @@ -7,7 +7,7 @@ import type { CompletedReportEntry, CompletedReportsIndexAttachment, } from "@/common/types/attachment"; -import type { WorkflowRunEvent } from "@/common/types/workflow"; +import { isNestedWorkflowRun, type WorkflowRunEvent } from "@/common/types/workflow"; import { getPlanFilePath, getLegacyPlanFilePath } from "@/common/utils/planStorage"; import type { FileEditDiff } from "@/common/utils/messages/extractEditedFiles"; import assert from "@/common/utils/assert"; @@ -190,7 +190,11 @@ export class AttachmentService { const runStore = new WorkflowRunStore({ sessionDir: params.sessionDir }); const runs = await runStore.listRuns(); for (const run of runs) { - if (run.workspaceId !== params.workspaceId || run.status !== "completed") { + if ( + run.workspaceId !== params.workspaceId || + run.status !== "completed" || + isNestedWorkflowRun(run) + ) { continue; } const resultEvent = run.events.findLast( diff --git a/src/node/services/workflows/WorkflowRunner.ts b/src/node/services/workflows/WorkflowRunner.ts index a34dc35675..ec14f3222d 100644 --- a/src/node/services/workflows/WorkflowRunner.ts +++ b/src/node/services/workflows/WorkflowRunner.ts @@ -2179,7 +2179,10 @@ function parseWorkflowStartInput(rawInput: unknown): WorkflowStartInput { input.wait !== false && input.runInBackground !== true && input.run_in_background !== true, "workflows.start currently waits for the child workflow to reach a terminal result" ); - return { name: input.name, args: input.args ?? {} }; + return { + name: input.name, + args: Object.prototype.hasOwnProperty.call(input, "args") ? input.args : {}, + }; } function buildWorkflowStartReplayInput(input: WorkflowStartInput): unknown { diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index f0bef9ebbb..05445c0a7a 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -369,6 +369,56 @@ export default function workflow({ args, agent }) { ).toHaveLength(1); }); + test("preserves explicit null args for nested child workflows", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-null-args"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-null-args", + "// description: Child null args\nexport default function workflow({ args }) { return { reportMarkdown: String(args === null) }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-null-args", + `// description: Parent null args +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-null-args", + input: { name: "child-null-args", args: null }, + }); +}\n` + ); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore: new WorkflowRunStore({ sessionDir: tmp.path }), + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + generateRunId: () => "wfr_parent_null_args", + runnerId: "runner-a", + }); + + const result = await service.startNamedWorkflow({ + name: "parent-null-args", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + + expect(result.result).toMatchObject({ reportMarkdown: "true" }); + }); + test("records terminal parent step state when a nested child workflow fails", async () => { using tmp = new DisposableTempDir("workflow-service-nested-failure"); const workspaceRoot = path.join(tmp.path, "project"); From 195272ec8aafb4641788fd708035b5ff64b313d1 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 18:32:50 +0000 Subject: [PATCH 4/6] =?UTF-8?q?=F0=9F=A4=96=20fix:=20preserve=20nested=20w?= =?UTF-8?q?orkflow=20cwd=20and=20discovery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Codex follow-up feedback by snapshotting child runs with the parent run default action cwd and listing runner-coordinated workflows.start actions in runtime-backed workspaces. Validation: - bun test src/node/services/workflows/WorkflowService.test.ts src/node/services/workflows/WorkflowActionRegistry.test.ts --timeout 20000 - MUX_ESLINT_CONCURRENCY=2 make static-check --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `3783076{MUX_COSTS_USD:-unknown}`_ --- .../workflows/WorkflowActionRegistry.test.ts | 12 +++- .../workflows/WorkflowActionRegistry.ts | 4 +- .../workflows/WorkflowService.test.ts | 67 +++++++++++++++++++ .../services/workflows/WorkflowService.ts | 4 +- 4 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/node/services/workflows/WorkflowActionRegistry.test.ts b/src/node/services/workflows/WorkflowActionRegistry.test.ts index a4a1bcd1de..01f423894f 100644 --- a/src/node/services/workflows/WorkflowActionRegistry.test.ts +++ b/src/node/services/workflows/WorkflowActionRegistry.test.ts @@ -211,7 +211,9 @@ describe("WorkflowActionRegistry", () => { await expectRuntimeProjectActionRejection(registry); const actions = await registry.listActions({ projectTrusted: true }); - expect(actions).toEqual([]); + expect(actions.map((action) => ({ name: action.name, scope: action.scope }))).toEqual([ + { name: "workflows.start", scope: "built-in" }, + ]); }); test("rejects runtime-backed global actions instead of using a remote cwd locally", async () => { @@ -235,7 +237,9 @@ describe("WorkflowActionRegistry", () => { } const actions = await registry.listActions({ projectTrusted: true }); - expect(actions).toEqual([]); + expect(actions.map((action) => ({ name: action.name, scope: action.scope }))).toEqual([ + { name: "workflows.start", scope: "built-in" }, + ]); }); test("rejects runtime-backed built-in actions instead of using a remote cwd locally", async () => { @@ -268,8 +272,12 @@ describe("WorkflowActionRegistry", () => { }); const action = await registry.resolveAction("workflows.start", { projectTrusted: true }); + const actions = await registry.listActions({ projectTrusted: true }); expect(action).toMatchObject({ name: "workflows.start", scope: "built-in" }); + expect(actions.map((action) => ({ name: action.name, scope: action.scope }))).toEqual([ + { name: "workflows.start", scope: "built-in" }, + ]); }); test("blocks project-local actions without Project Trust while allowing global actions", async () => { diff --git a/src/node/services/workflows/WorkflowActionRegistry.ts b/src/node/services/workflows/WorkflowActionRegistry.ts index 0253ddb4bb..29d9e2bff6 100644 --- a/src/node/services/workflows/WorkflowActionRegistry.ts +++ b/src/node/services/workflows/WorkflowActionRegistry.ts @@ -58,7 +58,9 @@ export class WorkflowActionRegistry { async listActions(options: { projectTrusted: boolean }): Promise { if (this.projectRuntime != null) { - return []; + return scanBuiltInActions() + .filter((action) => isRunnerCoordinatedBuiltInAction(action.name)) + .sort((a, b) => a.name.localeCompare(b.name)); } const byName = await this.collectActions(options); return Array.from(byName.values()).sort((a, b) => a.name.localeCompare(b.name)); diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 05445c0a7a..bddb74c4af 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -369,6 +369,73 @@ export default function workflow({ args, agent }) { ).toHaveLength(1); }); + test("snapshots parent action cwd when replay creates a nested child run", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-cwd"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-cwd", + "// description: Child cwd\nexport default function workflow() { return { reportMarkdown: 'child' }; }\n" + ); + const parentSource = `// description: Parent cwd +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-cwd", + input: { name: "child-cwd", args: {} }, + }); +}\n`; + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const parentCwd = path.join(tmp.path, "parent-cwd"); + await runStore.createRun({ + id: "wfr_parent_cwd", + workspaceId: "workspace-1", + definition: { + name: "parent-cwd", + description: "Parent cwd", + scope: "global", + executable: true, + }, + definitionSource: parentSource, + args: {}, + defaultActionCwd: parentCwd, + now: "2026-05-29T00:00:00.000Z", + }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + defaultActionCwd: path.join(tmp.path, "service-cwd"), + runnerId: "runner-a", + }); + + await service.resumeRun({ + workspaceId: "workspace-1", + runId: "wfr_parent_cwd", + projectTrusted: true, + }); + const parentRun = await runStore.getRun("wfr_parent_cwd"); + const childRunId = parentRun.steps.find((step) => step.stepId === "child-cwd")?.taskId; + if (childRunId == null) { + throw new Error("Expected nested cwd workflow to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + + expect(childRun.defaultActionCwd).toBe(parentCwd); + }); + test("preserves explicit null args for nested child workflows", async () => { using tmp = new DisposableTempDir("workflow-service-nested-null-args"); const workspaceRoot = path.join(tmp.path, "project"); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index 726543686d..6594c89dd8 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -1005,7 +1005,9 @@ export class WorkflowService { definition: definition.descriptor, definitionSource: definition.source, args: input.args, - ...(this.defaultActionCwd != null ? { defaultActionCwd: this.defaultActionCwd } : {}), + ...(parent.defaultActionCwd != null || this.defaultActionCwd != null + ? { defaultActionCwd: parent.defaultActionCwd ?? this.defaultActionCwd } + : {}), parentWorkflow: { runId: input.parentRunId, stepId: input.stepId, From cc0a91ade419f8e10aa06e0f605404f26e46825f Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 18:49:45 +0000 Subject: [PATCH 5/6] =?UTF-8?q?=F0=9F=A4=96=20fix:=20register=20active=20n?= =?UTF-8?q?ested=20child=20runners?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Register nested child workflow runners with the active abort-controller map once their lease is acquired so direct interrupts by child run ID abort the child runtime in addition to marking status. Validation: - bun test src/node/services/workflows/WorkflowService.test.ts --timeout 20000 - MUX_ESLINT_CONCURRENCY=2 make static-check --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `3989322{MUX_COSTS_USD:-unknown}`_ --- .../workflows/WorkflowService.test.ts | 102 ++++++++++++++++++ .../services/workflows/WorkflowService.ts | 33 +++++- 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index bddb74c4af..664593d547 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -1055,6 +1055,108 @@ export default function workflow({ action }) { expect(runError instanceof Error ? runError.message : "").toMatch(/interrupted|aborted/i); }); + test("interruptRun aborts an active nested child workflow by child run id", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-direct-interrupt"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-direct-interruptible", + "// description: Child direct interruptible\nexport default function workflow({ agent }) { return agent({ id: 'slow-child', prompt: 'slow' }); }\n" + ); + await writeWorkflow( + globalRoot, + "parent-direct-interruptible", + `// description: Parent direct interruptible +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-direct-interruptible", + input: { name: "child-direct-interruptible", args: {} }, + }); +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let agentWaitStarted = false; + let agentAbortObserved = false; + const starterService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + agentWaitStarted = true; + return await new Promise((_, reject) => { + waitOptions?.abortSignal?.addEventListener( + "abort", + () => { + agentAbortObserved = true; + reject(new Error("Task interrupted")); + }, + { once: true } + ); + }); + }, + }, + generateRunId: () => "wfr_nested_direct_interrupt_parent", + runnerId: "runner-a", + }); + const interruptService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("interrupt service runAgent should not be called"); + }, + async interruptRun() { + // child task adapter interrupt is supplementary; the child workflow runtime must + // also abort through the active child runner controller. + }, + }, + runnerId: "runner-b", + }); + + const runPromise = starterService.startNamedWorkflow({ + name: "parent-direct-interruptible", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + const runErrorPromise = runPromise.then( + () => null, + (error: unknown) => error + ); + await waitForCondition("nested child agent to start", () => agentWaitStarted); + const parentRun = await runStore.getRun("wfr_nested_direct_interrupt_parent"); + const childRunId = parentRun.steps.find( + (step) => step.stepId === "child-direct-interruptible" + )?.taskId; + if (childRunId == null) { + throw new Error("Expected direct interrupt workflow to record a child run id"); + } + + const interrupted = await interruptService.interruptRun({ + workspaceId: "workspace-1", + runId: childRunId, + }); + const runError = await runErrorPromise; + + expect(interrupted.status).toBe("interrupted"); + expect(agentAbortObserved).toBe(true); + expect(runError).toBeInstanceOf(Error); + }); + test("backgrounds and resumes parent workflows when a nested child workflow backgrounds", async () => { using tmp = new DisposableTempDir("workflow-service-nested-background"); const workspaceRoot = path.join(tmp.path, "project"); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index 6594c89dd8..6ed713ba3b 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -899,11 +899,23 @@ export class WorkflowService { } const runner = await this.createRunner(input.childRunId, currentProjectTrusted); + const childAbortController = new AbortController(); + const removeParentAbortForwarding = forwardAbortSignal( + input.abortSignal, + childAbortController + ); + let unregisterChildRunnerAbort: () => void = () => undefined; try { const result = await runner.run(input.childRunId, { - abortSignal: input.abortSignal, + abortSignal: childAbortController.signal, backgroundOnMessageQueued: input.backgroundOnMessageQueued ?? false, allowResumeFromInterrupted: run.status === "interrupted", + onLeaseAcquired: () => { + unregisterChildRunnerAbort = this.registerActiveRunnerAbortController( + input.childRunId, + childAbortController + ); + }, }); return { runId: input.childRunId, status: "completed", result }; } catch (error) { @@ -938,6 +950,9 @@ export class WorkflowService { }; } throw error; + } finally { + removeParentAbortForwarding(); + unregisterChildRunnerAbort(); } } }, @@ -1079,6 +1094,22 @@ function unrefTimer(timer: ReturnType): void { } } +function forwardAbortSignal( + abortSignal: AbortSignal | undefined, + controller: AbortController +): () => void { + if (abortSignal == null) { + return () => undefined; + } + if (abortSignal.aborted) { + controller.abort(); + return () => undefined; + } + const abort = () => controller.abort(); + abortSignal.addEventListener("abort", abort, { once: true }); + return () => abortSignal.removeEventListener("abort", abort); +} + function isAbortSignalAborted(abortSignal?: AbortSignal): boolean { return abortSignal?.aborted === true; } From 0895bbfa7481d742a602ef4ee1b5423079b2a8ce Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 18:59:00 +0000 Subject: [PATCH 6/6] =?UTF-8?q?=F0=9F=A4=96=20fix:=20cascade=20interrupts?= =?UTF-8?q?=20through=20interrupted=20children?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Include already-interrupted nested children in parent interrupt cascades so workflow-owned child task adapters are still interrupted after the child run status was marked interrupted first. Validation: - bun test src/node/services/workflows/WorkflowService.test.ts --timeout 20000 - MUX_ESLINT_CONCURRENCY=2 make static-check --- _Generated with `mux` • Model: `openai:gpt-5.5` • Thinking: `xhigh` • Cost: `4091238{MUX_COSTS_USD:-unknown}`_ --- .../workflows/WorkflowService.test.ts | 70 +++++++++++++++++++ .../services/workflows/WorkflowService.ts | 2 +- 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 664593d547..a46796fb1e 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -1055,6 +1055,76 @@ export default function workflow({ action }) { expect(runError instanceof Error ? runError.message : "").toMatch(/interrupted|aborted/i); }); + test("interruptRun still interrupts tasks for already-interrupted nested children", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-interrupted-task-cascade"); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + await runStore.createRun({ + id: "wfr_parent_interrupted_child_task", + workspaceId: "workspace-1", + definition: { name: "parent", description: "Parent", scope: "built-in", executable: true }, + definitionSource: + "export default function workflow() { return { reportMarkdown: 'parent' }; }\n", + args: {}, + now: "2026-05-29T00:00:00.000Z", + }); + await runStore.appendStatus( + "wfr_parent_interrupted_child_task", + "running", + "2026-05-29T00:00:01.000Z" + ); + await runStore.createRun({ + id: "wfr_child_already_interrupted", + workspaceId: "workspace-1", + definition: { name: "child", description: "Child", scope: "built-in", executable: true }, + definitionSource: + "export default function workflow() { return { reportMarkdown: 'child' }; }\n", + args: {}, + parentWorkflow: { + runId: "wfr_parent_interrupted_child_task", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + now: "2026-05-29T00:00:00.000Z", + }); + await runStore.appendStatus( + "wfr_child_already_interrupted", + "running", + "2026-05-29T00:00:01.000Z" + ); + await runStore.appendStatus( + "wfr_child_already_interrupted", + "interrupted", + "2026-05-29T00:00:02.000Z" + ); + const interruptedTaskAdapters: string[] = []; + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ + projectRoot: path.join(tmp.path, "project"), + globalRoot: path.join(tmp.path, "global"), + builtIns: [], + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapterFactory: (runId) => ({ + async runAgent() { + throw new Error("agent should not run"); + }, + async interruptRun() { + interruptedTaskAdapters.push(runId); + }, + }), + runnerId: "runner-a", + }); + + await service.interruptRun({ + workspaceId: "workspace-1", + runId: "wfr_parent_interrupted_child_task", + }); + + expect(interruptedTaskAdapters).toContain("wfr_child_already_interrupted"); + }); + test("interruptRun aborts an active nested child workflow by child run id", async () => { using tmp = new DisposableTempDir("workflow-service-nested-direct-interrupt"); const workspaceRoot = path.join(tmp.path, "project"); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index 6ed713ba3b..1ce2967e8e 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -780,7 +780,7 @@ export class WorkflowService { (run) => run.workspaceId === workspaceId && run.parentWorkflow?.runId === parentRunId && - isInterruptibleWorkflowRunStatus(run.status) + (isInterruptibleWorkflowRunStatus(run.status) || run.status === "interrupted") ); for (const child of activeChildren) { await this.interruptChildWorkflowRun(child, workspaceId, seenRunIds);