diff --git a/docs/hooks/tools.mdx b/docs/hooks/tools.mdx index baa5986320..b0b6fb846a 100644 --- a/docs/hooks/tools.mdx +++ b/docs/hooks/tools.mdx @@ -659,8 +659,8 @@ If a value is too large for the environment, it may be omitted (not set). Mux al | `MUX_TOOL_INPUT_FILTER` | `filter` | string | Optional regex to filter bash task output lines. By default, only matching lines are returned. When filter_exclude is true, matching lines are excluded instead. Non-matching lines are discarded and cannot be retrieved later. | | `MUX_TOOL_INPUT_FILTER_EXCLUDE` | `filter_exclude` | boolean | When true, lines matching 'filter' are excluded instead of kept. Requires 'filter' to be set. | | `MUX_TOOL_INPUT_MIN_COMPLETED` | `min_completed` | number | Number of awaited tasks that must complete before this call returns. Defaults to 1, so by default task_await returns as soon as the FIRST awaited task completes, letting you act on it while the rest keep running. The result still includes every task complete at that moment plus current status (running/queued) for the rest. Tasks that have not yet completed keep running and remain re-awaitable on a later task_await call. Raise this (e.g. set it to the total number of awaited tasks) when you genuinely need more before proceeding — for example best-of-N synthesis that must compare every candidate. Clamped to the number of awaited tasks; values above that behave like 'wait for all'. | -| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. | -| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs.) | +| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs. | +| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs.) | | `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\|starting\|running\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. | diff --git a/src/browser/features/Tools/WorkflowRunToolCall.test.tsx b/src/browser/features/Tools/WorkflowRunToolCall.test.tsx index b33aa90052..d56b4799df 100644 --- a/src/browser/features/Tools/WorkflowRunToolCall.test.tsx +++ b/src/browser/features/Tools/WorkflowRunToolCall.test.tsx @@ -626,6 +626,74 @@ describe("WorkflowRunToolCall", () => { expect(view.container.textContent).toContain("durationMs"); }); + test("coalesces nested workflow start and completion events into one row with child run details", () => { + const view = render( + + + + + + ); + + fireEvent.click(getWorkflowHeader(view)); + + expect(view.getByText("Workflow events (1)")).toBeTruthy(); + expect(view.getByText("child-simple / child-simple / wfr_child_abc / completed")).toBeTruthy(); + expect(view.queryByText("#2")).toBeNull(); + }); + test("coalesces patch start and applied events into one row with combined details", () => { const view = render( diff --git a/src/browser/features/Tools/WorkflowRunToolCall.tsx b/src/browser/features/Tools/WorkflowRunToolCall.tsx index ba4c00d5bd..abaf358a73 100644 --- a/src/browser/features/Tools/WorkflowRunToolCall.tsx +++ b/src/browser/features/Tools/WorkflowRunToolCall.tsx @@ -209,16 +209,19 @@ function getStructuredOutput(value: unknown): unknown { type WorkflowTaskEvent = Extract; type WorkflowActionEvent = Extract; +type WorkflowChildEvent = Extract; type WorkflowPatchEvent = Extract; type WorkflowDisplayRow = | { kind: "event"; event: WorkflowRunEvent } | { kind: "task"; firstEvent: WorkflowTaskEvent; latestEvent: WorkflowTaskEvent } | { kind: "action"; firstEvent: WorkflowActionEvent; latestEvent: WorkflowActionEvent } + | { kind: "workflow"; firstEvent: WorkflowChildEvent; latestEvent: WorkflowChildEvent } | { kind: "patch"; firstEvent: WorkflowPatchEvent; latestEvent: WorkflowPatchEvent }; type WorkflowTaskRow = Extract; type WorkflowActionRow = Extract; +type WorkflowChildRow = Extract; type WorkflowPatchRow = Extract; interface PendingWorkflowActionRows { rows: WorkflowActionRow[]; @@ -229,6 +232,10 @@ function getTaskEventKey(event: WorkflowTaskEvent): string { return `task:${event.stepId}:${event.taskId}`; } +function getWorkflowChildEventKey(event: WorkflowChildEvent): string { + return `workflow:${event.stepId}:${event.runId}`; +} + function getPatchEventKey(event: WorkflowPatchEvent): string { return `patch:${event.stepId}:${event.sourceTaskId}`; } @@ -273,6 +280,7 @@ function getWorkflowDisplayRows(events: readonly WorkflowRunEvent[]): WorkflowDi const rows: WorkflowDisplayRow[] = []; const taskRows = new Map(); const actionRows = new Map(); + const workflowRows = new Map(); const patchRows = new Map(); for (const event of events) { @@ -297,6 +305,24 @@ function getWorkflowDisplayRows(events: readonly WorkflowRunEvent[]): WorkflowDi continue; } + if (event.type === "workflow") { + const key = getWorkflowChildEventKey(event); + const existingRow = workflowRows.get(key); + if (existingRow != null) { + existingRow.latestEvent = event; + continue; + } + + const row: WorkflowChildRow = { + kind: "workflow", + firstEvent: event, + latestEvent: event, + }; + workflowRows.set(key, row); + rows.push(row); + continue; + } + if (event.type === "patch") { // A patch step emits started → applied/conflict/failed for the same // stepId+sourceTaskId; collapse them into one row (latest status wins), @@ -377,6 +403,9 @@ function getDisplayRowKey(row: WorkflowDisplayRow): string { if (row.kind === "action") { return `${getActionEventKey(row.firstEvent)}:${row.firstEvent.sequence}`; } + if (row.kind === "workflow") { + return getWorkflowChildEventKey(row.firstEvent); + } if (row.kind === "patch") { return getPatchEventKey(row.firstEvent); } @@ -396,6 +425,8 @@ function getWorkflowEventLabel(event: WorkflowRunEvent): string { // Prefer the human-readable sub-agent title (matches the spawned // workspace title); fall back to stepId for legacy events without one. return `${event.title ?? event.stepId} / ${event.taskId} / ${event.status}`; + case "workflow": + return `${event.stepId} / ${event.name} / ${event.runId} / ${event.status}`; case "patch": return `${event.stepId} / ${event.sourceTaskId} / ${event.status}`; case "action": @@ -423,6 +454,8 @@ function getWorkflowEventDetail(event: WorkflowRunEvent): unknown { return event.data; case "result": return event.result; + case "workflow": + return event.details; case "patch": return event.details; case "action": @@ -459,6 +492,13 @@ function getEventTone(event: WorkflowRunEvent): "normal" | "success" | "warning" } return event.status === "failed" ? "warning" : "normal"; } + if (event.type === "workflow") { + return event.status === "completed" + ? "success" + : event.status === "failed" || event.status === "interrupted" + ? "warning" + : "normal"; + } if (event.type === "result") { return "success"; } @@ -551,7 +591,9 @@ function WorkflowEventTooltip(props: { ); } -function getWorkflowMergedRowDetail(row: WorkflowActionRow | WorkflowPatchRow): unknown { +function getWorkflowMergedRowDetail( + row: WorkflowActionRow | WorkflowChildRow | WorkflowPatchRow +): unknown { const firstDetail = getWorkflowEventDetail(row.firstEvent); const latestDetail = getWorkflowEventDetail(row.latestEvent); if (row.firstEvent === row.latestEvent || row.latestEvent.status === "started") { @@ -1544,7 +1586,7 @@ export const WorkflowRunToolCall: React.FC = ({ /> ); } - if (row.kind === "action" || row.kind === "patch") { + if (row.kind === "action" || row.kind === "workflow" || row.kind === "patch") { return ( ; export type StructuredTaskOutput = z.infer; export type WorkflowRunEvent = z.infer; export type WorkflowStepRecord = z.infer; +export type WorkflowRunParent = z.infer; export type WorkflowRunRecord = z.infer; +export function isNestedWorkflowRun(run: WorkflowRunRecord): boolean { + return run.parentWorkflow != null; +} + export function assertWorkflowRunStatusTransition( from: WorkflowRunStatus, to: WorkflowRunStatus diff --git a/src/common/utils/tools/toolDefinitions.ts b/src/common/utils/tools/toolDefinitions.ts index e19a92705f..aa6e98cd88 100644 --- a/src/common/utils/tools/toolDefinitions.ts +++ b/src/common/utils/tools/toolDefinitions.ts @@ -509,8 +509,8 @@ export const TaskAwaitToolArgsSchema = z .nullish() .describe( "List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. " + - "task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. " + - "When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs." + "task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. " + + "When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs." ), filter: z .string() @@ -1780,7 +1780,7 @@ export const TOOL_DEFINITIONS = { "\n\nIMPORTANT: Do not call task_await in the same parallel tool-call batch as task, bash, or workflow_run — " + "the taskId/runId is not available until the spawning tool returns. " + "Always wait for the task/bash/workflow_run tool result first, then call task_await in a subsequent step. " + - "When omitting task_ids to await active tasks/workflows, ensure at least one background task or workflow was already spawned in a prior step. Omitted task_ids exclude workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. " + + "When omitting task_ids to await active tasks/workflows, ensure at least one background task or workflow was already spawned in a prior step. Omitted task_ids discover top-level workflow runs only and exclude workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs. " + "\n\nAgent tasks and workflow runs return reports when completed. " + "Completed reports are persisted on disk and survive context compaction: calling task_await on an already-completed task/workflow run ID (timeout_secs: 0 for non-blocking) re-fetches the full report instead of re-running the work. " + "Bash tasks return incremental output while running and a final reportMarkdown when they exit. " + @@ -1807,7 +1807,7 @@ export const TOOL_DEFINITIONS = { task_list: { description: "List descendant tasks for the current workspace, including status + metadata. " + - "This includes sub-agent tasks, background bash tasks, and workflow runs, but omits workflow-owned sub-agents (and their background bash tasks) whose reports are consumed through their workflow run. " + + "This includes sub-agent tasks, background bash tasks, and top-level workflow runs, but omits workflow-owned sub-agents/background bash tasks and nested child workflow runs whose reports are consumed through parent workflow runs. " + "Use this after compaction, interruptions, or an app restart to rediscover active tasks and resumable workflow runs (statuses interrupted/failed; resume with workflow_resume). " + "This is a discovery tool, NOT a waiting mechanism. If the current request actually depends on a task's output, call task_await with the specific task IDs you need; do not await all active tasks just because they appear here.", schema: TaskListToolArgsSchema, diff --git a/src/common/utils/workflowRunMessages.test.ts b/src/common/utils/workflowRunMessages.test.ts new file mode 100644 index 0000000000..2f1d395c75 --- /dev/null +++ b/src/common/utils/workflowRunMessages.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, test } from "bun:test"; + +import type { WorkflowRunRecord } from "@/common/types/workflow"; +import { buildWorkflowRunToolPart, stripWorkflowRunRecordForModel } from "./workflowRunMessages"; + +const run: WorkflowRunRecord = { + id: "wfr_test", + workspaceId: "workspace-1", + definition: { + name: "nested-parent-simple", + description: "Nested parent", + scope: "scratch", + executable: true, + }, + definitionSource: "export default function workflow() { return { reportMarkdown: 'done' }; }", + definitionHash: "sha256:test", + args: {}, + status: "completed", + createdAt: "2026-05-29T00:00:00.000Z", + updatedAt: "2026-05-29T00:00:01.000Z", + events: [], + steps: [], +}; + +describe("workflowRunMessages", () => { + test("strips large run snapshots from model-bound workflow tool outputs", () => { + const stripped = stripWorkflowRunRecordForModel("workflow_run", { + status: "completed", + runId: run.id, + result: { reportMarkdown: "done" }, + run, + }); + + expect(stripped).toEqual({ + status: "completed", + runId: run.id, + result: { reportMarkdown: "done" }, + }); + }); + + test("preserves run snapshots in UI workflow card tool parts", () => { + const part = buildWorkflowRunToolPart( + { name: "nested-parent-simple", args: {} }, + { runId: run.id, status: "completed", result: { reportMarkdown: "done" }, run }, + 1_000 + ); + + expect(part.state).toBe("output-available"); + if (part.state !== "output-available") { + throw new Error("Expected workflow run tool part to include output"); + } + expect(part.output).toMatchObject({ + status: "completed", + runId: run.id, + run: { id: run.id }, + }); + }); +}); diff --git a/src/node/builtinSkills/workflow-authoring.md b/src/node/builtinSkills/workflow-authoring.md index 4ec88798d7..98172a4287 100644 --- a/src/node/builtinSkills/workflow-authoring.md +++ b/src/node/builtinSkills/workflow-authoring.md @@ -203,6 +203,32 @@ export async function execute(input, ctx) { } ``` +### `action.workflows.start(spec)` + +Starts another workflow in the same workspace and waits for its terminal result. This is a special built-in workflow action: project/global actions can still override `workflows.start`, but the built-in version is coordinated by the durable workflow runner rather than a normal action process. + +```js +const child = action.workflows.start({ + id: "research", + input: { + name: "deep-research", + args: { input: args.topic }, + }, +}); + +return { reportMarkdown: "Child said:\n\n" + child.reportMarkdown }; +``` + +Replay semantics: + +- Parent `id` + child `{ name, args }` maps to exactly one deterministic child run ID. +- Child workflow step IDs stay local to the child run; they do not collide with parent step IDs. +- Parent replay/resume reuses the linked child run, even if the child workflow source has changed since the child run was created. +- Child completion returns `{ runId, status, name, reportMarkdown, structuredOutput? }` to the parent. +- Child runs are omitted from top-level `task_list` and omitted-ID `task_await` discovery because the parent consumes their result; explicitly awaiting a known child `runId` still works. +- Child failure fails the parent step; parent interruption cascades to active child workflow runs. +- V1 is same-workspace and wait-to-terminal only; fire-and-forget child workflows are intentionally rejected. + ### `applyPatch(spec)` Applies a workflow-owned child task's git patch artifact to the current parent workspace. The host always dry-runs first in a temporary worktree and only performs the real apply when the dry-run succeeds. The conductor never receives raw patch text and cannot apply arbitrary patches. diff --git a/src/node/services/agentSkills/builtInSkillContent.generated.ts b/src/node/services/agentSkills/builtInSkillContent.generated.ts index cd34ae9339..80ff8d9ccb 100644 --- a/src/node/services/agentSkills/builtInSkillContent.generated.ts +++ b/src/node/services/agentSkills/builtInSkillContent.generated.ts @@ -4589,8 +4589,8 @@ export const BUILTIN_SKILL_FILES: Record> = { "| `MUX_TOOL_INPUT_FILTER` | `filter` | string | Optional regex to filter bash task output lines. By default, only matching lines are returned. When filter_exclude is true, matching lines are excluded instead. Non-matching lines are discarded and cannot be retrieved later. |", "| `MUX_TOOL_INPUT_FILTER_EXCLUDE` | `filter_exclude` | boolean | When true, lines matching 'filter' are excluded instead of kept. Requires 'filter' to be set. |", "| `MUX_TOOL_INPUT_MIN_COMPLETED` | `min_completed` | number | Number of awaited tasks that must complete before this call returns. Defaults to 1, so by default task_await returns as soon as the FIRST awaited task completes, letting you act on it while the rest keep running. The result still includes every task complete at that moment plus current status (running/queued) for the rest. Tasks that have not yet completed keep running and remain re-awaitable on a later task_await call. Raise this (e.g. set it to the total number of awaited tasks) when you genuinely need more before proceeding — for example best-of-N synthesis that must compare every candidate. Clamped to the number of awaited tasks; values above that behave like 'wait for all'. |", - "| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs. |", - "| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and workflow runs of the current workspace, excluding workflow-owned sub-agents and their background bash tasks because those results are consumed through workflow runs.) |", + "| `MUX_TOOL_INPUT_TASK_IDS_` | `task_ids[]` | string | List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs. |", + "| `MUX_TOOL_INPUT_TASK_IDS_COUNT` | `task_ids.length` | number | Number of elements in task_ids (List of task IDs or workflow run IDs to await — use only real IDs returned by prior task, bash, or workflow_run results; never fabricate an ID. task_list can rediscover sub-agent/background bash IDs, but top-level workflow run rediscovery is done by omitting task_ids. When omitted, waits for active descendant tasks and top-level workflow runs of the current workspace, excluding workflow-owned sub-agents/background bash tasks and nested child workflow runs because those results are consumed through parent workflow runs.) |", "| `MUX_TOOL_INPUT_TIMEOUT_SECS` | `timeout_secs` | number | Maximum time to wait in seconds for each task. For bash tasks, this waits for NEW output (or process exit). If exceeded, the result returns status=queued\\|starting\\|running\\|awaiting_report (task is still active). Defaults to 600 seconds (10 minutes) if not specified. Set to 0 for a non-blocking status check. |", "", "", @@ -7147,6 +7147,32 @@ export const BUILTIN_SKILL_FILES: Record> = { "}", "```", "", + "### `action.workflows.start(spec)`", + "", + "Starts another workflow in the same workspace and waits for its terminal result. This is a special built-in workflow action: project/global actions can still override `workflows.start`, but the built-in version is coordinated by the durable workflow runner rather than a normal action process.", + "", + "```js", + "const child = action.workflows.start({", + ' id: "research",', + " input: {", + ' name: "deep-research",', + " args: { input: args.topic },", + " },", + "});", + "", + 'return { reportMarkdown: "Child said:\\n\\n" + child.reportMarkdown };', + "```", + "", + "Replay semantics:", + "", + "- Parent `id` + child `{ name, args }` maps to exactly one deterministic child run ID.", + "- Child workflow step IDs stay local to the child run; they do not collide with parent step IDs.", + "- Parent replay/resume reuses the linked child run, even if the child workflow source has changed since the child run was created.", + "- Child completion returns `{ runId, status, name, reportMarkdown, structuredOutput? }` to the parent.", + "- Child runs are omitted from top-level `task_list` and omitted-ID `task_await` discovery because the parent consumes their result; explicitly awaiting a known child `runId` still works.", + "- Child failure fails the parent step; parent interruption cascades to active child workflow runs.", + "- V1 is same-workspace and wait-to-terminal only; fire-and-forget child workflows are intentionally rejected.", + "", "### `applyPatch(spec)`", "", "Applies a workflow-owned child task's git patch artifact to the current parent workspace. The host always dry-runs first in a temporary worktree and only performs the real apply when the dry-run succeeds. The conductor never receives raw patch text and cannot apply arbitrary patches.", diff --git a/src/node/services/attachmentService.completedReports.test.ts b/src/node/services/attachmentService.completedReports.test.ts index 1e56c69ff0..c1c22a23b9 100644 --- a/src/node/services/attachmentService.completedReports.test.ts +++ b/src/node/services/attachmentService.completedReports.test.ts @@ -44,6 +44,12 @@ async function writeWorkflowRun( workspaceId?: string; completedAt?: string; failed?: boolean; + parentWorkflow?: { + runId: string; + stepId: string; + inputHash: string; + depth: number; + }; reportMarkdown?: string; } ): Promise { @@ -57,6 +63,7 @@ async function writeWorkflowRun( scope: "built-in" as const, executable: true, }, + ...(params.parentWorkflow !== undefined ? { parentWorkflow: params.parentWorkflow } : {}), definitionSource: "export default async function workflow() { return 'ok'; }\n", args: {}, now: "2026-06-01T00:00:00.000Z", @@ -171,6 +178,17 @@ describe("AttachmentService.generateCompletedReportsAttachment", () => { completedAt: "2026-06-01T12:00:00.000Z", }); + await writeWorkflowRun(tmp.path, { + runId: "wfr_child_done", + completedAt: "2026-06-01T12:00:00.000Z", + parentWorkflow: { + runId: "wfr_parent_done", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + }); + const attachment = await AttachmentService.generateCompletedReportsAttachment({ workspaceId: WORKSPACE_ID, sessionDir: tmp.path, diff --git a/src/node/services/attachmentService.ts b/src/node/services/attachmentService.ts index 7a1149a88a..515c4da76a 100644 --- a/src/node/services/attachmentService.ts +++ b/src/node/services/attachmentService.ts @@ -7,7 +7,7 @@ import type { CompletedReportEntry, CompletedReportsIndexAttachment, } from "@/common/types/attachment"; -import type { WorkflowRunEvent } from "@/common/types/workflow"; +import { isNestedWorkflowRun, type WorkflowRunEvent } from "@/common/types/workflow"; import { getPlanFilePath, getLegacyPlanFilePath } from "@/common/utils/planStorage"; import type { FileEditDiff } from "@/common/utils/messages/extractEditedFiles"; import assert from "@/common/utils/assert"; @@ -190,7 +190,11 @@ export class AttachmentService { const runStore = new WorkflowRunStore({ sessionDir: params.sessionDir }); const runs = await runStore.listRuns(); for (const run of runs) { - if (run.workspaceId !== params.workspaceId || run.status !== "completed") { + if ( + run.workspaceId !== params.workspaceId || + run.status !== "completed" || + isNestedWorkflowRun(run) + ) { continue; } const resultEvent = run.events.findLast( diff --git a/src/node/services/tools/task_await.ts b/src/node/services/tools/task_await.ts index 910889d0fa..abb7883b07 100644 --- a/src/node/services/tools/task_await.ts +++ b/src/node/services/tools/task_await.ts @@ -9,7 +9,11 @@ import { TOOL_DEFINITIONS, } from "@/common/utils/tools/toolDefinitions"; import { canRetryWorkflowFromCheckpoint } from "@/common/utils/workflowRetryEligibility"; -import type { WorkflowRunRecord, WorkflowRunStatus } from "@/common/types/workflow"; +import { + isNestedWorkflowRun, + type WorkflowRunRecord, + type WorkflowRunStatus, +} from "@/common/types/workflow"; import { fromBashTaskId, isWorkflowRunTaskId, toBashTaskId } from "./taskId"; import { formatBashOutputReport } from "./bashTaskReport"; @@ -280,7 +284,11 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => { const runs = await config.workflowService.listRuns({ workspaceId }); for (const rawRun of runs) { const parsed = WorkflowRunRecordSchema.safeParse(rawRun); - if (!parsed.success || !isWorkflowRunAwaitableStatus(parsed.data.status)) { + if ( + !parsed.success || + !isWorkflowRunAwaitableStatus(parsed.data.status) || + isNestedWorkflowRun(parsed.data) + ) { continue; } workflowRunIds.push(parsed.data.id); diff --git a/src/node/services/tools/task_list.ts b/src/node/services/tools/task_list.ts index ae47f8e20a..359edd8715 100644 --- a/src/node/services/tools/task_list.ts +++ b/src/node/services/tools/task_list.ts @@ -5,6 +5,7 @@ import type { ToolConfiguration, ToolFactory } from "@/common/utils/tools/tools" import { WorkflowRunRecordSchema } from "@/common/orpc/schemas"; import { TaskListToolResultSchema, TOOL_DEFINITIONS } from "@/common/utils/tools/toolDefinitions"; +import { isNestedWorkflowRun } from "@/common/types/workflow"; import type { AgentTaskStatus } from "@/node/services/taskService"; import { toBashTaskId } from "./taskId"; @@ -63,7 +64,11 @@ export const createTaskListTool: ToolFactory = (config: ToolConfiguration) => { const runs = await config.workflowService.listRuns({ workspaceId }); for (const rawRun of runs) { const parsed = WorkflowRunRecordSchema.safeParse(rawRun); - if (!parsed.success || !statuses.includes(parsed.data.status)) { + if ( + !parsed.success || + !statuses.includes(parsed.data.status) || + isNestedWorkflowRun(parsed.data) + ) { continue; } tasks.push({ diff --git a/src/node/services/workflows/WorkflowActionRegistry.test.ts b/src/node/services/workflows/WorkflowActionRegistry.test.ts index a038809b38..01f423894f 100644 --- a/src/node/services/workflows/WorkflowActionRegistry.test.ts +++ b/src/node/services/workflows/WorkflowActionRegistry.test.ts @@ -211,7 +211,9 @@ describe("WorkflowActionRegistry", () => { await expectRuntimeProjectActionRejection(registry); const actions = await registry.listActions({ projectTrusted: true }); - expect(actions).toEqual([]); + expect(actions.map((action) => ({ name: action.name, scope: action.scope }))).toEqual([ + { name: "workflows.start", scope: "built-in" }, + ]); }); test("rejects runtime-backed global actions instead of using a remote cwd locally", async () => { @@ -235,7 +237,9 @@ describe("WorkflowActionRegistry", () => { } const actions = await registry.listActions({ projectTrusted: true }); - expect(actions).toEqual([]); + expect(actions.map((action) => ({ name: action.name, scope: action.scope }))).toEqual([ + { name: "workflows.start", scope: "built-in" }, + ]); }); test("rejects runtime-backed built-in actions instead of using a remote cwd locally", async () => { @@ -258,6 +262,24 @@ describe("WorkflowActionRegistry", () => { } }); + test("allows runner-coordinated workflows.start in runtime-backed workspaces", async () => { + using tmp = new DisposableTempDir("workflow-actions-runtime-workflows-start"); + const registry = new WorkflowActionRegistry({ + projectRoot: "/runtime/project/.mux/actions", + globalRoot: path.join(tmp.path, "global-actions"), + projectRuntime: createRuntimeWithoutActions(), + projectCwd: "/runtime/project", + }); + + const action = await registry.resolveAction("workflows.start", { projectTrusted: true }); + const actions = await registry.listActions({ projectTrusted: true }); + + expect(action).toMatchObject({ name: "workflows.start", scope: "built-in" }); + expect(actions.map((action) => ({ name: action.name, scope: action.scope }))).toEqual([ + { name: "workflows.start", scope: "built-in" }, + ]); + }); + test("blocks project-local actions without Project Trust while allowing global actions", async () => { using tmp = new DisposableTempDir("workflow-actions-trust"); const projectRoot = path.join(tmp.path, "project-actions"); diff --git a/src/node/services/workflows/WorkflowActionRegistry.ts b/src/node/services/workflows/WorkflowActionRegistry.ts index c6b07b2eeb..29d9e2bff6 100644 --- a/src/node/services/workflows/WorkflowActionRegistry.ts +++ b/src/node/services/workflows/WorkflowActionRegistry.ts @@ -58,7 +58,9 @@ export class WorkflowActionRegistry { async listActions(options: { projectTrusted: boolean }): Promise { if (this.projectRuntime != null) { - return []; + return scanBuiltInActions() + .filter((action) => isRunnerCoordinatedBuiltInAction(action.name)) + .sort((a, b) => a.name.localeCompare(b.name)); } const byName = await this.collectActions(options); return Array.from(byName.values()).sort((a, b) => a.name.localeCompare(b.name)); @@ -74,7 +76,7 @@ export class WorkflowActionRegistry { if (builtInAction == null) { throw new Error(`Built-in workflow action not found: ${normalizedName}`); } - if (this.projectRuntime != null) { + if (this.projectRuntime != null && !isRunnerCoordinatedBuiltInAction(normalizedName)) { throw new Error("Workflow actions are not supported for runtime-backed workspaces yet"); } return builtInAction; @@ -101,7 +103,7 @@ export class WorkflowActionRegistry { const builtInAction = this.readBuiltInAction(normalizedName); if (builtInAction != null) { - if (this.projectRuntime != null) { + if (this.projectRuntime != null && !isRunnerCoordinatedBuiltInAction(normalizedName)) { throw new Error("Workflow actions are not supported for runtime-backed workspaces yet"); } return builtInAction; @@ -213,7 +215,11 @@ export class WorkflowActionRegistry { } } -export function normalizeWorkflowActionName(name: string): string { +function isRunnerCoordinatedBuiltInAction(name: string): boolean { + return name === "workflows.start"; +} + +function normalizeWorkflowActionName(name: string): string { assert(typeof name === "string", "Workflow action name must be a string"); const normalized = name.trim(); assert(normalized.length > 0, "Workflow action name is required"); diff --git a/src/node/services/workflows/WorkflowRunStore.test.ts b/src/node/services/workflows/WorkflowRunStore.test.ts index 3ba1690876..3013fe3ee2 100644 --- a/src/node/services/workflows/WorkflowRunStore.test.ts +++ b/src/node/services/workflows/WorkflowRunStore.test.ts @@ -68,6 +68,61 @@ describe("WorkflowRunStore", () => { ).rejects.toThrow(/runId must match/); }); + test("createRunIfAbsent recovers an incomplete deterministic run directory", async () => { + using tmp = new DisposableTempDir("workflow-runs-partial-child"); + const store = new WorkflowRunStore({ sessionDir: tmp.path }); + await fs.mkdir(path.join(tmp.path, "workflows", "wfr_child_partial"), { recursive: true }); + + const run = await store.createRunIfAbsent({ + id: "wfr_child_partial", + workspaceId: "workspace-1", + definition, + definitionSource: source, + args: { topic: "nested" }, + parentWorkflow: { + runId: "wfr_parent", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + now: "2026-05-29T00:00:00.000Z", + }); + + expect(run.id).toBe("wfr_child_partial"); + await expect(store.getRun("wfr_child_partial")).resolves.toMatchObject({ + id: "wfr_child_partial", + parentWorkflow: { runId: "wfr_parent" }, + }); + }); + + test("createRunIfAbsent reuses a snapshotted child run after definition source changes", async () => { + using tmp = new DisposableTempDir("workflow-runs-child-source-change"); + const store = new WorkflowRunStore({ sessionDir: tmp.path }); + const input = { + id: "wfr_child_source_change", + workspaceId: "workspace-1", + definition, + args: { topic: "nested" }, + parentWorkflow: { + runId: "wfr_parent", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + now: "2026-05-29T00:00:00.000Z", + }; + const created = await store.createRunIfAbsent({ ...input, definitionSource: source }); + + const reused = await store.createRunIfAbsent({ + ...input, + definitionSource: + "export default function workflow() { return { reportMarkdown: 'new' }; }\n", + }); + + expect(reused.id).toBe(created.id); + expect(reused.definitionSource).toBe(source); + }); + test("ignores malformed journal lines while preserving valid events and steps", async () => { using tmp = new DisposableTempDir("workflow-runs"); const store = await createStore(tmp.path); diff --git a/src/node/services/workflows/WorkflowRunStore.ts b/src/node/services/workflows/WorkflowRunStore.ts index 4699b54fc5..d2bb743c7e 100644 --- a/src/node/services/workflows/WorkflowRunStore.ts +++ b/src/node/services/workflows/WorkflowRunStore.ts @@ -16,6 +16,7 @@ import type { StructuredTaskOutput, WorkflowDefinitionDescriptor, WorkflowRunEvent, + WorkflowRunParent, WorkflowRunRecord, WorkflowRunStatus, WorkflowStepRecord, @@ -36,6 +37,7 @@ export interface CreateWorkflowRunInput { definitionSource: string; args: unknown; defaultActionCwd?: string; + parentWorkflow?: WorkflowRunParent; now: string; } @@ -99,6 +101,7 @@ export class WorkflowRunStore { definitionHash: hashSource(input.definitionSource), args: input.args, ...(input.defaultActionCwd != null ? { defaultActionCwd: input.defaultActionCwd } : {}), + ...(input.parentWorkflow != null ? { parentWorkflow: input.parentWorkflow } : {}), status: "pending", createdAt: input.now, updatedAt: input.now, @@ -110,6 +113,76 @@ export class WorkflowRunStore { return run; } + async createRunIfAbsent(input: CreateWorkflowRunInput): Promise { + assert(input.id.length > 0, "WorkflowRunStore.createRunIfAbsent: id is required"); + assert( + input.workspaceId.length > 0, + "WorkflowRunStore.createRunIfAbsent: workspaceId is required" + ); + assert( + input.definitionSource.length > 0, + "WorkflowRunStore.createRunIfAbsent: definitionSource is required" + ); + + const runDir = this.runDir(input.id); + await fs.mkdir(this.workflowsDir(), { recursive: true }); + const lockDir = `${runDir}.create.lock`; + await acquireWorkflowMutationLock( + lockDir, + this.leaseMutationLockStaleMs(), + this.leaseMutationWaitTimeoutMs() + ); + try { + const existing = await this.getRunIfFullyCreated(input.id); + if (existing != null) { + assertSameWorkflowRunIdentity(existing, input); + return existing; + } + + // A deterministic child run ID must be recoverable after a crash between mkdir and + // run.json. Treat an unreadable run directory as an incomplete create, not identity. + await fs.rm(runDir, { recursive: true, force: true }); + await fs.mkdir(runDir, { recursive: false }); + try { + await fs.writeFile(path.join(runDir, "definition.js"), input.definitionSource, "utf-8"); + await fs.writeFile(path.join(runDir, "events.jsonl"), "", { flag: "a" }); + await fs.writeFile(path.join(runDir, "steps.jsonl"), "", { flag: "a" }); + + const run = WorkflowRunRecordSchema.parse({ + id: input.id, + workspaceId: input.workspaceId, + definition: input.definition, + definitionSource: input.definitionSource, + definitionHash: hashSource(input.definitionSource), + args: input.args, + ...(input.defaultActionCwd != null ? { defaultActionCwd: input.defaultActionCwd } : {}), + ...(input.parentWorkflow != null ? { parentWorkflow: input.parentWorkflow } : {}), + status: "pending", + createdAt: input.now, + updatedAt: input.now, + events: [], + steps: [], + }); + + await this.writeRunFile(input.id, run); + return run; + } catch (error) { + await fs.rm(runDir, { recursive: true, force: true }); + throw error; + } + } finally { + await fs.rm(lockDir, { recursive: true, force: true }); + } + } + + private async getRunIfFullyCreated(runId: string): Promise { + try { + return await this.getRun(runId); + } catch { + return null; + } + } + async getRun(runId: string): Promise { // UI polling must not wait behind the writer lock; while a mutation is in progress, // fall back to the last atomic run.json snapshot instead of reading half-updated journals. @@ -852,6 +925,22 @@ function assertValidWorkflowRunId(runId: string): void { ); } +function assertSameWorkflowRunIdentity( + run: WorkflowRunRecord, + input: CreateWorkflowRunInput +): void { + const sameIdentity = + run.id === input.id && + run.workspaceId === input.workspaceId && + run.definition.name === input.definition.name && + JSON.stringify(run.args) === JSON.stringify(input.args) && + JSON.stringify(run.parentWorkflow ?? null) === JSON.stringify(input.parentWorkflow ?? null); + assert( + sameIdentity, + `WorkflowRunStore.createRunIfAbsent: existing run identity does not match requested run ${input.id}` + ); +} + function getWorkflowStepKey(step: WorkflowStepLookup): string { return `${step.stepId}\0${step.inputHash}`; } diff --git a/src/node/services/workflows/WorkflowRunner.ts b/src/node/services/workflows/WorkflowRunner.ts index 2d56b2d77d..ec14f3222d 100644 --- a/src/node/services/workflows/WorkflowRunner.ts +++ b/src/node/services/workflows/WorkflowRunner.ts @@ -8,6 +8,7 @@ import type { WorkflowActionMetadata, WorkflowResult, WorkflowRunEvent, + WorkflowRunStatus, WorkflowStepRecord, } from "@/common/types/workflow"; import assert from "@/common/utils/assert"; @@ -24,6 +25,7 @@ import { type WorkflowActionExecutionResult, } from "./WorkflowActionRunner"; import type { AppendWorkflowRunEventOptions, WorkflowRunStore } from "./WorkflowRunStore"; +import { deriveChildWorkflowRunId } from "./nestedWorkflowRuns"; import { assertWorkflowStepId, hashWorkflowStepInput } from "./workflowReplayKey"; export class WorkflowRunBackgroundedError extends Error { @@ -98,6 +100,17 @@ export interface WorkflowActionSpec { cache?: boolean; } +export interface WorkflowStartInput { + name: string; + args: unknown; +} + +export type WorkflowNestedResult = WorkflowResult & { + runId: string; + status: WorkflowRunStatus; + name: string; +}; + export interface WorkflowActionResult { output: unknown; stdout: string; @@ -137,6 +150,19 @@ export interface WorkflowTaskAdapter { onRunEnded?(): Promise | void; } +export interface WorkflowChildRunAdapter { + runChildWorkflowToTerminal(input: { + parentRunId: string; + stepId: string; + inputHash: string; + childRunId: string; + name: string; + args: unknown; + backgroundOnMessageQueued?: boolean; + abortSignal?: AbortSignal; + }): Promise<{ runId: string; status: WorkflowRunStatus; result: unknown }>; +} + export interface WorkflowRunnerRunOptions { onLeaseAcquired?: () => void; abortSignal?: AbortSignal; @@ -159,6 +185,7 @@ export interface WorkflowRunnerOptions { runtimeFactory: IJSRuntimeFactory; taskAdapter: WorkflowTaskAdapter; actionRegistry?: WorkflowActionRegistry; + childRunAdapter?: WorkflowChildRunAdapter; actionRunner?: WorkflowActionRunner; getProjectTrusted?: () => boolean | Promise; projectTrusted?: boolean; @@ -281,6 +308,7 @@ export class WorkflowRunner { private readonly runtimeFactory: IJSRuntimeFactory; private readonly taskAdapter: WorkflowTaskAdapter; private readonly actionRegistry?: WorkflowActionRegistry; + private readonly childRunAdapter?: WorkflowChildRunAdapter; private readonly actionRunner: WorkflowActionRunner; private readonly getProjectTrusted: () => boolean | Promise; private readonly defaultActionCwd?: string; @@ -294,6 +322,7 @@ export class WorkflowRunner { this.runtimeFactory = options.runtimeFactory; this.taskAdapter = options.taskAdapter; this.actionRegistry = options.actionRegistry; + this.childRunAdapter = options.childRunAdapter; this.actionRunner = options.actionRunner ?? new WorkflowActionRunner(); this.getProjectTrusted = options.getProjectTrusted ?? (() => options.projectTrusted ?? false); this.defaultActionCwd = options.defaultActionCwd; @@ -486,6 +515,7 @@ export class WorkflowRunner { try { return await this.runActionStep(runId, sequence, rawName, rawSpec, { abortSignal: setupRuntime.getAbortSignal(), + backgroundOnMessageQueued: options?.backgroundOnMessageQueued ?? true, leaseGuard, }); } catch (error) { @@ -687,9 +717,10 @@ export class WorkflowRunner { rawSpec: unknown, options: { abortSignal?: AbortSignal; + backgroundOnMessageQueued?: boolean; leaseGuard: WorkflowRunnerLeaseGuard; } - ): Promise { + ): Promise { assert(typeof rawName === "string" && rawName.length > 0, "action requires a name"); const spec = parseWorkflowActionSpec(rawSpec); assertWorkflowStepId(spec.id, "action"); @@ -700,6 +731,13 @@ export class WorkflowRunner { projectTrusted: await this.getProjectTrusted(), builtInOnly: spec.builtInOnly, }); + if (isBuiltInWorkflowsStartAction(action)) { + return await this.runNestedWorkflowStep(runId, sequence, spec, { + abortSignal: options.abortSignal, + backgroundOnMessageQueued: options.backgroundOnMessageQueued, + leaseGuard: options.leaseGuard, + }); + } const cwd = getWorkflowActionCwd(spec, action, await this.getDefaultActionCwd(runId)); const inputHash = hashWorkflowStepInput( spec.id, @@ -786,6 +824,240 @@ export class WorkflowRunner { }); } + private async runNestedWorkflowStep( + runId: string, + sequence: WorkflowEventSequence, + spec: WorkflowActionSpec, + options: { + abortSignal?: AbortSignal; + backgroundOnMessageQueued?: boolean; + leaseGuard: WorkflowRunnerLeaseGuard; + } + ): Promise { + const childInput = parseWorkflowStartInput(spec.input); + const inputHash = hashWorkflowStepInput(spec.id, buildWorkflowStartReplayInput(childInput)); + const childRunId = deriveChildWorkflowRunId({ + parentRunId: runId, + stepId: spec.id, + inputHash, + }); + const existingStep = await this.runStore.getStep(runId, spec.id, inputHash); + if (existingStep?.status === "completed" && existingStep.result?.structuredOutput != null) { + const cached = normalizeWorkflowNestedResult(existingStep.result.structuredOutput); + await this.recordWorkflowTerminalEventIfMissing(runId, sequence, { + stepId: spec.id, + runId: cached.runId, + name: cached.name, + status: "completed", + details: cached, + }); + return cached; + } + + const run = await this.runStore.getRun(runId); + const driftedStep = run.steps.find( + (step) => + step.stepId === spec.id && step.inputHash !== inputHash && step.taskId?.startsWith("wfr_") + ); + if (driftedStep != null) { + throw new Error( + `Workflow child step ${spec.id} has a prior replay identity and cannot start a different child workflow` + ); + } + + const childRunAdapter = this.childRunAdapter; + assert(childRunAdapter != null, "Nested workflows are not configured for this workflow runner"); + const startedAt = existingStep?.startedAt ?? this.clock.nowIso(); + options.leaseGuard.throwIfLost(); + await this.recordStepStarted(runId, { + stepId: spec.id, + inputHash, + taskId: childRunId, + startedAt, + }); + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: "started", + }); + + let child: Awaited>; + try { + child = await childRunAdapter.runChildWorkflowToTerminal({ + parentRunId: runId, + stepId: spec.id, + inputHash, + childRunId, + name: childInput.name, + args: childInput.args, + backgroundOnMessageQueued: options.backgroundOnMessageQueued, + abortSignal: options.abortSignal, + }); + } catch (error) { + await this.recordNestedWorkflowFailed(runId, sequence, { + stepId: spec.id, + inputHash, + childRunId, + name: childInput.name, + startedAt, + error, + }); + throw error; + } + if (child.status === "backgrounded" && options.backgroundOnMessageQueued !== false) { + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: "backgrounded", + }); + throw createForegroundWaitBackgroundedError(); + } + if (child.status !== "completed") { + const message = `Child workflow ${childInput.name} finished with status ${child.status}`; + await this.recordStepFailed(runId, { + stepId: spec.id, + inputHash, + taskId: childRunId, + error: message, + startedAt, + completedAt: this.clock.nowIso(), + }); + const eventStatus = child.status === "pending" ? "running" : child.status; + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: eventStatus, + details: { error: message }, + }); + throw new Error(message); + } + + const childResult = normalizeWorkflowResultForEvent(child.result); + const nestedResult: WorkflowNestedResult = { + reportMarkdown: childResult.reportMarkdown, + ...(childResult.structuredOutput !== undefined + ? { structuredOutput: childResult.structuredOutput } + : {}), + runId: child.runId, + status: child.status, + name: childInput.name, + }; + await this.recordStepCompleted(runId, { + stepId: spec.id, + inputHash, + taskId: childRunId, + result: { + reportMarkdown: childResult.reportMarkdown, + structuredOutput: nestedResult, + }, + startedAt, + completedAt: this.clock.nowIso(), + }); + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: spec.id, + runId: childRunId, + name: childInput.name, + status: "completed", + details: nestedResult, + }); + return nestedResult; + } + + private async recordNestedWorkflowFailed( + runId: string, + sequence: WorkflowEventSequence, + input: { + stepId: string; + inputHash: string; + childRunId: string; + name: string; + startedAt: string; + error: unknown; + } + ): Promise { + const message = getErrorMessage(input.error); + try { + await this.recordStepFailed(runId, { + stepId: input.stepId, + inputHash: input.inputHash, + taskId: input.childRunId, + error: message, + startedAt: input.startedAt, + completedAt: this.clock.nowIso(), + }); + await this.recordWorkflowTerminalEventIfMissing(runId, sequence, { + stepId: input.stepId, + runId: input.childRunId, + name: input.name, + status: "failed", + details: { error: message }, + }); + } catch (recordError) { + const run = await this.runStore.getRun(runId); + if (run.status !== "interrupted") { + throw recordError; + } + } + } + + private async recordWorkflowTerminalEventIfMissing( + runId: string, + sequence: WorkflowEventSequence, + workflow: { + stepId: string; + runId: string; + name: string; + status: "completed" | "failed" | "interrupted"; + details?: unknown; + } + ): Promise { + assert( + runId.length > 0, + "WorkflowRunner.recordWorkflowTerminalEventIfMissing: runId is required" + ); + assert(workflow.stepId.length > 0, "WorkflowRunner: workflow event stepId is required"); + assert(workflow.runId.length > 0, "WorkflowRunner: workflow event runId is required"); + assert(workflow.name.length > 0, "WorkflowRunner: workflow event name is required"); + + await using _lock = await this.taskEventMutex.acquire(); + const run = await this.runStore.getRun(runId); + const alreadyRecorded = run.events.some( + (event) => + event.type === "workflow" && + event.stepId === workflow.stepId && + event.runId === workflow.runId && + event.status === workflow.status + ); + if (alreadyRecorded) { + return; + } + await this.appendEvent(runId, { + sequence: sequence.next(), + type: "workflow", + at: this.clock.nowIso(), + stepId: workflow.stepId, + runId: workflow.runId, + name: workflow.name, + status: workflow.status, + details: workflow.details, + }); + } + private async getCachedWorkflowActionEffect( runId: string, stepId: string, @@ -1893,6 +2165,38 @@ function parseWorkflowActionSpec(rawSpec: unknown): WorkflowActionSpec { return parsed; } +function parseWorkflowStartInput(rawInput: unknown): WorkflowStartInput { + assert( + rawInput != null && typeof rawInput === "object" && !Array.isArray(rawInput), + "workflows.start input must be an object" + ); + const input = rawInput as Record; + assert( + typeof input.name === "string" && input.name.length > 0, + "workflows.start input.name is required" + ); + assert( + input.wait !== false && input.runInBackground !== true && input.run_in_background !== true, + "workflows.start currently waits for the child workflow to reach a terminal result" + ); + return { + name: input.name, + args: Object.prototype.hasOwnProperty.call(input, "args") ? input.args : {}, + }; +} + +function buildWorkflowStartReplayInput(input: WorkflowStartInput): unknown { + return { + primitive: "workflows.start", + name: input.name, + args: input.args, + }; +} + +function isBuiltInWorkflowsStartAction(action: ResolvedWorkflowAction): boolean { + return action.scope === "built-in" && action.name === "workflows.start"; +} + function buildWorkflowActionReplayInput( action: ResolvedWorkflowAction, spec: WorkflowActionSpec, @@ -2014,6 +2318,31 @@ function workflowActionErrorDetails(error: unknown): Record { return { error: getErrorMessage(error) }; } +function normalizeWorkflowNestedResult(rawResult: unknown): WorkflowNestedResult { + const result = normalizeWorkflowResultForEvent(rawResult); + assert(rawResult != null && typeof rawResult === "object", "workflow result must be an object"); + const record = rawResult as Record; + assert( + typeof record.runId === "string" && record.runId.length > 0, + "workflow result requires runId" + ); + assert( + typeof record.name === "string" && record.name.length > 0, + "workflow result requires name" + ); + assert( + typeof record.status === "string" && record.status.length > 0, + "workflow result requires status" + ); + return { + reportMarkdown: result.reportMarkdown, + ...(result.structuredOutput !== undefined ? { structuredOutput: result.structuredOutput } : {}), + runId: record.runId, + name: record.name, + status: record.status as WorkflowRunStatus, + }; +} + function normalizeWorkflowActionResult(rawResult: unknown): WorkflowActionResult { assert(rawResult != null && typeof rawResult === "object", "action result must be an object"); const record = rawResult as Record; diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 6b2d66e68c..a46796fb1e 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -270,6 +270,345 @@ export default function workflow({ args, agent }) { expect(run.definition.scope).toBe("global"); }); + test("runs a nested workflow through action.workflows.start and reuses the child on replay", async () => { + using tmp = new DisposableTempDir("workflow-service-nested"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-simple", + "// description: Child workflow\nexport default function workflow({ args }) { return { reportMarkdown: 'Child: ' + args.topic }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-simple", + `// description: Parent workflow + export default function workflow({ args, action }) { + const child = action.workflows.start({ + id: "child-simple", + input: { name: "child-simple", args: { topic: args.topic } }, + }); + return { reportMarkdown: "Parent saw " + child.reportMarkdown, structuredOutput: { childRunId: child.runId } }; + }\n` + ); + + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + generateRunId: () => "wfr_parent_simple", + runnerId: "runner-a", + clock: { + nowIso: () => "2026-05-29T00:00:00.000Z", + nowMs: () => 1_000, + }, + }); + + const result = await service.startNamedWorkflow({ + name: "parent-simple", + workspaceId: "workspace-1", + projectTrusted: true, + args: { topic: "nested smoke" }, + }); + const parentRun = await runStore.getRun("wfr_parent_simple"); + const childStep = parentRun.steps.find((step) => step.stepId === "child-simple"); + expect(childStep?.taskId).toMatch(/^wfr_child_/); + const childRunId = childStep?.taskId; + if (childRunId == null) { + throw new Error("Expected nested workflow step to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + + expect(result).toEqual({ + runId: "wfr_parent_simple", + status: "completed", + result: { + reportMarkdown: "Parent saw Child: nested smoke", + structuredOutput: { childRunId }, + }, + }); + expect(childRun.id).toBe(childRunId); + expect(childRun.workspaceId).toBe("workspace-1"); + expect(childRun.definition.name).toBe("child-simple"); + expect(childRun.parentWorkflow?.runId).toBe("wfr_parent_simple"); + expect(childRun.parentWorkflow?.stepId).toBe("child-simple"); + expect(childRun.status).toBe("completed"); + expect(parentRun.events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: "workflow", + stepId: "child-simple", + runId: childRunId, + status: "started", + }), + expect.objectContaining({ + type: "workflow", + stepId: "child-simple", + runId: childRunId, + status: "completed", + }), + ]) + ); + + const runsAfterCompletion = await runStore.listRuns(); + expect( + runsAfterCompletion.filter((run) => run.parentWorkflow?.runId === "wfr_parent_simple") + ).toHaveLength(1); + }); + + test("snapshots parent action cwd when replay creates a nested child run", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-cwd"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-cwd", + "// description: Child cwd\nexport default function workflow() { return { reportMarkdown: 'child' }; }\n" + ); + const parentSource = `// description: Parent cwd +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-cwd", + input: { name: "child-cwd", args: {} }, + }); +}\n`; + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const parentCwd = path.join(tmp.path, "parent-cwd"); + await runStore.createRun({ + id: "wfr_parent_cwd", + workspaceId: "workspace-1", + definition: { + name: "parent-cwd", + description: "Parent cwd", + scope: "global", + executable: true, + }, + definitionSource: parentSource, + args: {}, + defaultActionCwd: parentCwd, + now: "2026-05-29T00:00:00.000Z", + }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + defaultActionCwd: path.join(tmp.path, "service-cwd"), + runnerId: "runner-a", + }); + + await service.resumeRun({ + workspaceId: "workspace-1", + runId: "wfr_parent_cwd", + projectTrusted: true, + }); + const parentRun = await runStore.getRun("wfr_parent_cwd"); + const childRunId = parentRun.steps.find((step) => step.stepId === "child-cwd")?.taskId; + if (childRunId == null) { + throw new Error("Expected nested cwd workflow to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + + expect(childRun.defaultActionCwd).toBe(parentCwd); + }); + + test("preserves explicit null args for nested child workflows", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-null-args"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-null-args", + "// description: Child null args\nexport default function workflow({ args }) { return { reportMarkdown: String(args === null) }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-null-args", + `// description: Parent null args +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-null-args", + input: { name: "child-null-args", args: null }, + }); +}\n` + ); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore: new WorkflowRunStore({ sessionDir: tmp.path }), + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + generateRunId: () => "wfr_parent_null_args", + runnerId: "runner-a", + }); + + const result = await service.startNamedWorkflow({ + name: "parent-null-args", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + + expect(result.result).toMatchObject({ reportMarkdown: "true" }); + }); + + test("records terminal parent step state when a nested child workflow fails", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-failure"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-fails", + "// description: Child fails\nexport default function workflow() { throw new Error('child boom'); }\n" + ); + await writeWorkflow( + globalRoot, + "parent-catches-child-failure", + `// description: Parent catches child failure +export default function workflow({ action }) { + try { + action.workflows.start({ + id: "child-fails", + input: { name: "child-fails", args: {} }, + }); + } catch (error) { + return { reportMarkdown: "caught " + error.message }; + } + return { reportMarkdown: "unexpected" }; +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + generateRunId: () => "wfr_parent_catches_child_failure", + runnerId: "runner-a", + }); + + const result = await service.startNamedWorkflow({ + name: "parent-catches-child-failure", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + const parentRun = await runStore.getRun("wfr_parent_catches_child_failure"); + const childStep = parentRun.steps.find((step) => step.stepId === "child-fails"); + + expect(result.status).toBe("completed"); + expect(childStep?.status).toBe("failed"); + expect(parentRun.events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: "workflow", + stepId: "child-fails", + status: "failed", + }), + ]) + ); + }); + + test("applies current project trust before starting nested project workflows", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-trust"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + projectRoot, + "child-project", + "// description: Project child\nexport default function workflow() { return { reportMarkdown: 'project child' }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-project-child", + `// description: Parent project child +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-project", + input: { name: "child-project", args: {} }, + }); +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("agent should not run"); + }, + }, + getCurrentProjectTrusted: () => false, + generateRunId: () => "wfr_parent_project_child", + runnerId: "runner-a", + }); + + await expect( + service.startNamedWorkflow({ + name: "parent-project-child", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }) + ).rejects.toThrow(/Project trust|Workflow definition not found/); + const parentRun = await runStore.getRun("wfr_parent_project_child"); + + expect(parentRun.steps.find((step) => step.stepId === "child-project")?.status).toBe("failed"); + }); + test("runs workspace scratch workflow definitions authored as files", async () => { using tmp = new DisposableTempDir("workflow-service"); const workspaceRoot = path.join(tmp.path, "project"); @@ -610,6 +949,365 @@ export default function workflow({ args, agent }) { expect(runError instanceof Error ? runError.message : "").toMatch(/interrupted|aborted/i); }); + test("interruptRun cascades to an active nested child workflow", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-interrupt"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-interruptible", + "// description: Child interruptible\nexport default function workflow({ agent }) { return agent({ id: 'slow-child', prompt: 'slow' }); }\n" + ); + await writeWorkflow( + globalRoot, + "parent-interruptible", + `// description: Parent interruptible +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-interruptible", + input: { name: "child-interruptible", args: {} }, + }); +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let agentWaitStarted = false; + let agentAbortObserved = false; + const starterService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + agentWaitStarted = true; + return await new Promise((_, reject) => { + waitOptions?.abortSignal?.addEventListener( + "abort", + () => { + agentAbortObserved = true; + reject(new Error("Task interrupted")); + }, + { once: true } + ); + }); + }, + }, + generateRunId: () => "wfr_nested_interrupt_parent", + runnerId: "runner-a", + }); + let interruptCalls = 0; + const interruptService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("interrupt service runAgent should not be called"); + }, + async interruptRun() { + interruptCalls += 1; + }, + }, + runnerId: "runner-b", + }); + + const runPromise = starterService.startNamedWorkflow({ + name: "parent-interruptible", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + const runErrorPromise = runPromise.then( + () => null, + (error: unknown) => error + ); + await waitForCondition("nested child agent to start", () => agentWaitStarted); + + const interrupted = await interruptService.interruptRun({ + workspaceId: "workspace-1", + runId: "wfr_nested_interrupt_parent", + }); + const parentRun = await runStore.getRun("wfr_nested_interrupt_parent"); + const childRunId = parentRun.steps.find( + (step) => step.stepId === "child-interruptible" + )?.taskId; + if (childRunId == null) { + throw new Error("Expected nested interrupt workflow to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + const runError = await runErrorPromise; + + expect(interrupted.status).toBe("interrupted"); + expect(childRun.status).toBe("interrupted"); + expect(agentAbortObserved).toBe(true); + expect(interruptCalls).toBeGreaterThanOrEqual(1); + expect(runError).toBeInstanceOf(Error); + expect(runError instanceof Error ? runError.message : "").toMatch(/interrupted|aborted/i); + }); + + test("interruptRun still interrupts tasks for already-interrupted nested children", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-interrupted-task-cascade"); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + await runStore.createRun({ + id: "wfr_parent_interrupted_child_task", + workspaceId: "workspace-1", + definition: { name: "parent", description: "Parent", scope: "built-in", executable: true }, + definitionSource: + "export default function workflow() { return { reportMarkdown: 'parent' }; }\n", + args: {}, + now: "2026-05-29T00:00:00.000Z", + }); + await runStore.appendStatus( + "wfr_parent_interrupted_child_task", + "running", + "2026-05-29T00:00:01.000Z" + ); + await runStore.createRun({ + id: "wfr_child_already_interrupted", + workspaceId: "workspace-1", + definition: { name: "child", description: "Child", scope: "built-in", executable: true }, + definitionSource: + "export default function workflow() { return { reportMarkdown: 'child' }; }\n", + args: {}, + parentWorkflow: { + runId: "wfr_parent_interrupted_child_task", + stepId: "child", + inputHash: "hash:child", + depth: 0, + }, + now: "2026-05-29T00:00:00.000Z", + }); + await runStore.appendStatus( + "wfr_child_already_interrupted", + "running", + "2026-05-29T00:00:01.000Z" + ); + await runStore.appendStatus( + "wfr_child_already_interrupted", + "interrupted", + "2026-05-29T00:00:02.000Z" + ); + const interruptedTaskAdapters: string[] = []; + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ + projectRoot: path.join(tmp.path, "project"), + globalRoot: path.join(tmp.path, "global"), + builtIns: [], + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapterFactory: (runId) => ({ + async runAgent() { + throw new Error("agent should not run"); + }, + async interruptRun() { + interruptedTaskAdapters.push(runId); + }, + }), + runnerId: "runner-a", + }); + + await service.interruptRun({ + workspaceId: "workspace-1", + runId: "wfr_parent_interrupted_child_task", + }); + + expect(interruptedTaskAdapters).toContain("wfr_child_already_interrupted"); + }); + + test("interruptRun aborts an active nested child workflow by child run id", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-direct-interrupt"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-direct-interruptible", + "// description: Child direct interruptible\nexport default function workflow({ agent }) { return agent({ id: 'slow-child', prompt: 'slow' }); }\n" + ); + await writeWorkflow( + globalRoot, + "parent-direct-interruptible", + `// description: Parent direct interruptible +export default function workflow({ action }) { + return action.workflows.start({ + id: "child-direct-interruptible", + input: { name: "child-direct-interruptible", args: {} }, + }); +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let agentWaitStarted = false; + let agentAbortObserved = false; + const starterService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + agentWaitStarted = true; + return await new Promise((_, reject) => { + waitOptions?.abortSignal?.addEventListener( + "abort", + () => { + agentAbortObserved = true; + reject(new Error("Task interrupted")); + }, + { once: true } + ); + }); + }, + }, + generateRunId: () => "wfr_nested_direct_interrupt_parent", + runnerId: "runner-a", + }); + const interruptService = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent() { + throw new Error("interrupt service runAgent should not be called"); + }, + async interruptRun() { + // child task adapter interrupt is supplementary; the child workflow runtime must + // also abort through the active child runner controller. + }, + }, + runnerId: "runner-b", + }); + + const runPromise = starterService.startNamedWorkflow({ + name: "parent-direct-interruptible", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + const runErrorPromise = runPromise.then( + () => null, + (error: unknown) => error + ); + await waitForCondition("nested child agent to start", () => agentWaitStarted); + const parentRun = await runStore.getRun("wfr_nested_direct_interrupt_parent"); + const childRunId = parentRun.steps.find( + (step) => step.stepId === "child-direct-interruptible" + )?.taskId; + if (childRunId == null) { + throw new Error("Expected direct interrupt workflow to record a child run id"); + } + + const interrupted = await interruptService.interruptRun({ + workspaceId: "workspace-1", + runId: childRunId, + }); + const runError = await runErrorPromise; + + expect(interrupted.status).toBe("interrupted"); + expect(agentAbortObserved).toBe(true); + expect(runError).toBeInstanceOf(Error); + }); + + test("backgrounds and resumes parent workflows when a nested child workflow backgrounds", async () => { + using tmp = new DisposableTempDir("workflow-service-nested-background"); + const workspaceRoot = path.join(tmp.path, "project"); + const projectRoot = path.join(workspaceRoot, ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + const actionProjectRoot = path.join(workspaceRoot, ".mux", "actions"); + const actionGlobalRoot = path.join(tmp.path, "mux-home", "actions"); + await writeWorkflow( + globalRoot, + "child-backgroundable", + "// description: Child backgroundable\nexport default function workflow({ agent }) { const result = agent({ id: 'slow-child', prompt: 'slow' }); return { reportMarkdown: 'Child ' + result.reportMarkdown }; }\n" + ); + await writeWorkflow( + globalRoot, + "parent-backgroundable", + `// description: Parent backgroundable +export default function workflow({ action }) { + const child = action.workflows.start({ + id: "child-backgroundable", + input: { name: "child-backgroundable", args: {} }, + }); + return { reportMarkdown: "Parent " + child.reportMarkdown }; +}\n` + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let calls = 0; + const backgroundFlags: Array = []; + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + actionRegistry: new WorkflowActionRegistry({ + projectRoot: actionProjectRoot, + globalRoot: actionGlobalRoot, + }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + calls += 1; + backgroundFlags.push(waitOptions?.backgroundOnMessageQueued); + if (calls === 1) { + throw new ForegroundWaitBackgroundedError(); + } + return { taskId: "task_slow_child", reportMarkdown: "done" }; + }, + }, + generateRunId: () => "wfr_nested_background_parent", + runnerId: "runner-a", + }); + + const result = await service.startNamedWorkflow({ + name: "parent-backgroundable", + workspaceId: "workspace-1", + projectTrusted: true, + args: {}, + }); + + expect(result).toEqual({ + runId: "wfr_nested_background_parent", + status: "backgrounded", + result: null, + }); + await waitForWorkflowStatus(runStore, "wfr_nested_background_parent", "completed"); + const parentRun = await runStore.getRun("wfr_nested_background_parent"); + const childRunId = parentRun.steps.find( + (step) => step.stepId === "child-backgroundable" + )?.taskId; + if (childRunId == null) { + throw new Error("Expected nested background workflow to record a child run id"); + } + const childRun = await runStore.getRun(childRunId); + + expect(childRun.status).toBe("completed"); + expect(parentRun.events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ type: "workflow", status: "backgrounded", runId: childRunId }), + expect.objectContaining({ type: "workflow", status: "completed", runId: childRunId }), + ]) + ); + expect(backgroundFlags).toEqual([true, false]); + }); + test("moves foreground workflow runs to background when child waits are backgrounded", async () => { using tmp = new DisposableTempDir("workflow-service"); const projectRoot = path.join(tmp.path, "project", ".mux", "workflows"); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index c60a422294..1ce2967e8e 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -22,10 +22,12 @@ import type { WorkflowRunStore } from "./WorkflowRunStore"; import { WorkflowRunBackgroundedError, WorkflowRunner, + type WorkflowChildRunAdapter, type WorkflowRunnerClock, type WorkflowRunnerRunOptions, type WorkflowTaskAdapter, } from "./WorkflowRunner"; +import { MAX_NESTED_WORKFLOW_DEPTH } from "./nestedWorkflowRuns"; export interface WorkflowBackgroundRunTerminalEvent { runId: string; @@ -95,6 +97,8 @@ export interface StartNamedWorkflowResult { result: unknown; } +const CHILD_WORKFLOW_ACTIVE_POLL_MS = 1_000; + const WORKFLOW_BACKGROUND_CONTINUATION_STATUSES = new Set([ "completed", "failed", @@ -199,7 +203,9 @@ export class WorkflowService { async listRuns(input: { workspaceId: string }): Promise { assert(input.workspaceId.length > 0, "WorkflowService.listRuns: workspaceId is required"); const runs = await this.runStore.listRuns(); - return runs.filter((run) => run.workspaceId === input.workspaceId); + return runs.filter( + (run) => run.workspaceId === input.workspaceId && run.parentWorkflow == null + ); } async resumeCrashedRuns(input: { @@ -251,6 +257,7 @@ export class WorkflowService { "interrupted", this.clock?.nowIso() ?? new Date().toISOString() ); + await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId])); await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.(); return interrupted; } @@ -338,7 +345,7 @@ export class WorkflowService { >; backgroundedFailureMessage: string; }): Promise { - if (input.abortSignal?.aborted === true) { + if (isAbortSignalAborted(input.abortSignal)) { // The caller was aborted before the runner started; leave the run in its current // (still resumable) state instead of churning status transitions. throw new Error(`Workflow run interrupted: ${input.runId}`); @@ -458,7 +465,7 @@ export class WorkflowService { async startNamedWorkflow(input: StartNamedWorkflowInput): Promise { const runId = await this.createNamedWorkflowRun(input); - if (input.abortSignal?.aborted === true) { + if (isAbortSignalAborted(input.abortSignal)) { await this.interruptRun({ workspaceId: input.workspaceId, runId }); throw new Error(`Workflow run interrupted: ${runId}`); } @@ -763,6 +770,269 @@ export class WorkflowService { } } + private async interruptChildWorkflowRuns( + parentRunId: string, + workspaceId: string, + seenRunIds: Set + ): Promise { + const runs = await this.runStore.listRuns(); + const activeChildren = runs.filter( + (run) => + run.workspaceId === workspaceId && + run.parentWorkflow?.runId === parentRunId && + (isInterruptibleWorkflowRunStatus(run.status) || run.status === "interrupted") + ); + for (const child of activeChildren) { + await this.interruptChildWorkflowRun(child, workspaceId, seenRunIds); + } + } + + private async interruptChildWorkflowRun( + child: WorkflowRunRecord, + workspaceId: string, + seenRunIds: Set + ): Promise { + if (seenRunIds.has(child.id)) { + return; + } + seenRunIds.add(child.id); + this.abortActiveRunner(child.id); + const interrupted = await this.appendInterruptedIfActive(child.id); + if ( + !isInterruptibleWorkflowRunStatus(interrupted.status) && + interrupted.status !== "interrupted" + ) { + return; + } + await this.interruptChildWorkflowRuns(child.id, workspaceId, seenRunIds); + await (this.taskAdapterFactory?.(child.id) ?? this.requireTaskAdapter()).interruptRun?.(); + } + + private async appendInterruptedIfActive(runId: string): Promise { + const run = await this.runStore.getRun(runId); + if (!isInterruptibleWorkflowRunStatus(run.status)) { + return run; + } + try { + return await this.runStore.appendStatus( + runId, + "interrupted", + this.clock?.nowIso() ?? new Date().toISOString() + ); + } catch (error) { + const latest = await this.runStore.getRun(runId); + if (!isInterruptibleWorkflowRunStatus(latest.status)) { + return latest; + } + throw error; + } + } + + private async interruptExistingChildWorkflowRun( + runId: string + ): Promise { + const run = await this.getRunIfPresent(runId); + if (run == null) { + return null; + } + this.abortActiveRunner(runId); + return await this.appendInterruptedIfActive(runId); + } + + private async getRunIfPresent(runId: string): Promise { + try { + return await this.runStore.getRun(runId); + } catch (error) { + if (isMissingWorkflowRunError(error)) { + return null; + } + throw error; + } + } + + private createChildRunAdapter(projectTrusted: boolean): WorkflowChildRunAdapter { + return { + runChildWorkflowToTerminal: async (input) => { + while (true) { + const currentProjectTrusted = await this.resolveCurrentProjectTrust(projectTrusted); + if (isAbortSignalAborted(input.abortSignal)) { + const interrupted = await this.interruptExistingChildWorkflowRun(input.childRunId); + return { + runId: input.childRunId, + status: interrupted?.status ?? "interrupted", + result: null, + }; + } + + await this.ensureChildWorkflowRun({ ...input, projectTrusted: currentProjectTrusted }); + let run = await this.runStore.getRun(input.childRunId); + if (run.status === "completed") { + return { + runId: input.childRunId, + status: "completed", + result: getWorkflowRunResult(run), + }; + } + if (run.status === "failed") { + return { runId: input.childRunId, status: "failed", result: getWorkflowRunResult(run) }; + } + assertRunCanResumeWithCurrentTrust(run, currentProjectTrusted); + + if (isAbortSignalAborted(input.abortSignal)) { + const interrupted = await this.appendInterruptedIfActive(input.childRunId); + return { runId: input.childRunId, status: interrupted.status, result: null }; + } + + const retryDelayMs = await this.runStore.getLeaseRetryDelayMs( + input.childRunId, + this.clock?.nowMs() ?? Date.now() + ); + if (retryDelayMs > 0) { + if (input.backgroundOnMessageQueued !== false) { + return { runId: input.childRunId, status: "backgrounded", result: null }; + } + await waitForAbortableDelay( + Math.min(retryDelayMs, CHILD_WORKFLOW_ACTIVE_POLL_MS), + input.abortSignal + ); + continue; + } + + const runner = await this.createRunner(input.childRunId, currentProjectTrusted); + const childAbortController = new AbortController(); + const removeParentAbortForwarding = forwardAbortSignal( + input.abortSignal, + childAbortController + ); + let unregisterChildRunnerAbort: () => void = () => undefined; + try { + const result = await runner.run(input.childRunId, { + abortSignal: childAbortController.signal, + backgroundOnMessageQueued: input.backgroundOnMessageQueued ?? false, + allowResumeFromInterrupted: run.status === "interrupted", + onLeaseAcquired: () => { + unregisterChildRunnerAbort = this.registerActiveRunnerAbortController( + input.childRunId, + childAbortController + ); + }, + }); + return { runId: input.childRunId, status: "completed", result }; + } catch (error) { + if (error instanceof WorkflowRunBackgroundedError) { + run = await this.runStore.getRun(input.childRunId); + return { runId: input.childRunId, status: run.status, result: null }; + } + if (isWorkflowRunAlreadyActiveError(error, input.childRunId)) { + if (input.backgroundOnMessageQueued !== false) { + return { runId: input.childRunId, status: "backgrounded", result: null }; + } + const activeRetryDelayMs = await this.runStore.getLeaseRetryDelayMs( + input.childRunId, + this.clock?.nowMs() ?? Date.now() + ); + await waitForAbortableDelay( + Math.min(Math.max(1, activeRetryDelayMs), CHILD_WORKFLOW_ACTIVE_POLL_MS), + input.abortSignal + ); + continue; + } + if (isAbortSignalAborted(input.abortSignal)) { + const interrupted = await this.appendInterruptedIfActive(input.childRunId); + return { runId: input.childRunId, status: interrupted.status, result: null }; + } + const currentRun = await this.getRunIfPresent(input.childRunId); + if (currentRun?.status === "failed" || currentRun?.status === "interrupted") { + return { + runId: input.childRunId, + status: currentRun.status, + result: getWorkflowRunResult(currentRun), + }; + } + throw error; + } finally { + removeParentAbortForwarding(); + unregisterChildRunnerAbort(); + } + } + }, + }; + } + + private async ensureChildWorkflowRun(input: { + parentRunId: string; + stepId: string; + inputHash: string; + childRunId: string; + name: string; + args: unknown; + projectTrusted: boolean; + }): Promise { + assert( + input.parentRunId.length > 0, + "WorkflowService.ensureChildWorkflowRun: parentRunId required" + ); + assert( + input.childRunId.length > 0, + "WorkflowService.ensureChildWorkflowRun: childRunId required" + ); + assert(input.stepId.length > 0, "WorkflowService.ensureChildWorkflowRun: stepId required"); + assert( + input.inputHash.length > 0, + "WorkflowService.ensureChildWorkflowRun: inputHash required" + ); + const parent = await this.runStore.getRun(input.parentRunId); + const depth = (parent.parentWorkflow?.depth ?? -1) + 1; + assert( + depth <= MAX_NESTED_WORKFLOW_DEPTH, + `Nested workflow depth limit exceeded (${MAX_NESTED_WORKFLOW_DEPTH})` + ); + + try { + const existing = await this.runStore.getRun(input.childRunId); + assert( + existing.workspaceId === parent.workspaceId, + "Child workflow workspace must match parent" + ); + assert( + existing.definition.name === input.name, + "Child workflow name must match existing run" + ); + assert( + existing.parentWorkflow?.runId === input.parentRunId && + existing.parentWorkflow.stepId === input.stepId && + existing.parentWorkflow.inputHash === input.inputHash, + "Existing child workflow run must match parent replay identity" + ); + return; + } catch (error) { + if (!isMissingWorkflowRunError(error)) { + throw error; + } + } + + const definition = await this.definitionStore.readDefinition(input.name, { + projectTrusted: input.projectTrusted, + }); + await this.runStore.createRunIfAbsent({ + id: input.childRunId, + workspaceId: parent.workspaceId, + definition: definition.descriptor, + definitionSource: definition.source, + args: input.args, + ...(parent.defaultActionCwd != null || this.defaultActionCwd != null + ? { defaultActionCwd: parent.defaultActionCwd ?? this.defaultActionCwd } + : {}), + parentWorkflow: { + runId: input.parentRunId, + stepId: input.stepId, + inputHash: input.inputHash, + depth, + }, + now: this.clock?.nowIso() ?? new Date().toISOString(), + }); + } + private async createRunner(runId: string, projectTrusted: boolean): Promise { // The run record always exists by the time a runner is created (create/resume/retry // paths persist it first), so resolve the definition name for task labeling here. @@ -775,6 +1045,7 @@ export class WorkflowService { runtimeFactory: this.runtimeFactory, taskAdapter: this.taskAdapterFactory?.(runId, workflowName) ?? this.requireTaskAdapter(), actionRegistry: this.actionRegistry, + childRunAdapter: this.createChildRunAdapter(projectTrusted), actionRunner: this.actionRunner, getProjectTrusted: () => this.resolveCurrentProjectTrust(projectTrusted), projectTrusted, @@ -803,6 +1074,13 @@ export class WorkflowService { } } +function isMissingWorkflowRunError(error: unknown): boolean { + return ( + error instanceof Error && + (("code" in error && error.code === "ENOENT") || error.message.includes("ENOENT")) + ); +} + function isWorkflowRunAlreadyActiveError(error: unknown, runId: string): boolean { return error instanceof Error && error.message === `Workflow run is already active: ${runId}`; } @@ -816,6 +1094,53 @@ function unrefTimer(timer: ReturnType): void { } } +function forwardAbortSignal( + abortSignal: AbortSignal | undefined, + controller: AbortController +): () => void { + if (abortSignal == null) { + return () => undefined; + } + if (abortSignal.aborted) { + controller.abort(); + return () => undefined; + } + const abort = () => controller.abort(); + abortSignal.addEventListener("abort", abort, { once: true }); + return () => abortSignal.removeEventListener("abort", abort); +} + +function isAbortSignalAborted(abortSignal?: AbortSignal): boolean { + return abortSignal?.aborted === true; +} + +function isInterruptibleWorkflowRunStatus(status: WorkflowRunStatus): boolean { + return status === "pending" || status === "running" || status === "backgrounded"; +} + +function getWorkflowRunResult(run: WorkflowRunRecord): unknown { + return run.events.findLast((event) => event.type === "result")?.result ?? null; +} + +async function waitForAbortableDelay(delayMs: number, abortSignal?: AbortSignal): Promise { + assert(delayMs > 0, "WorkflowService.waitForAbortableDelay: delayMs must be positive"); + if (abortSignal?.aborted === true) { + return; + } + await new Promise((resolve) => { + const timer = setTimeout(resolve, delayMs); + unrefTimer(timer); + abortSignal?.addEventListener( + "abort", + () => { + clearTimeout(timer); + resolve(); + }, + { once: true } + ); + }); +} + function canResumeRunWithCurrentTrust(run: WorkflowRunRecord, projectTrusted: boolean): boolean { return ( (run.definition.scope !== "project" && run.definition.scope !== "scratch") || projectTrusted diff --git a/src/node/services/workflows/builtInWorkflowActions.ts b/src/node/services/workflows/builtInWorkflowActions.ts index 8bad24d515..645ad073ba 100644 --- a/src/node/services/workflows/builtInWorkflowActions.ts +++ b/src/node/services/workflows/builtInWorkflowActions.ts @@ -881,7 +881,30 @@ module.exports.execute = async function (rawInput, ctx) { module.exports.reconcile = module.exports.execute; `; +const WORKFLOWS_START_SOURCE = String.raw` +module.exports.metadata = { + version: 1, + description: "Start a child workflow from the current workflow and wait for its terminal result", + effect: "workspace", + inputSchema: { + type: "object", + required: ["name"], + properties: { + name: { type: "string" }, + args: {}, + }, + }, + outputSchema: { type: "object" }, + timeoutMs: 86400000, +}; + +module.exports.execute = async function () { + throw new Error("workflows.start is executed by the workflow runner"); +}; +`; + const STATIC_BUILT_IN_WORKFLOW_ACTION_SOURCES: Record = { + "workflows.start": WORKFLOWS_START_SOURCE, "git.status": GIT_STATUS_SOURCE, "git.commitsBetween": GIT_COMMITS_BETWEEN_SOURCE, "git.diff": GIT_DIFF_SOURCE, diff --git a/src/node/services/workflows/nestedWorkflowRuns.ts b/src/node/services/workflows/nestedWorkflowRuns.ts new file mode 100644 index 0000000000..ff13532019 --- /dev/null +++ b/src/node/services/workflows/nestedWorkflowRuns.ts @@ -0,0 +1,31 @@ +import * as crypto from "node:crypto"; + +import { WorkflowRunIdSchema } from "@/common/orpc/schemas"; +import assert from "@/common/utils/assert"; + +export const MAX_NESTED_WORKFLOW_DEPTH = 8; + +export function deriveChildWorkflowRunId(input: { + parentRunId: string; + stepId: string; + inputHash: string; +}): string { + assert(input.parentRunId.length > 0, "deriveChildWorkflowRunId: parentRunId is required"); + assert(input.stepId.length > 0, "deriveChildWorkflowRunId: stepId is required"); + assert(input.inputHash.length > 0, "deriveChildWorkflowRunId: inputHash is required"); + const digest = crypto + .createHash("sha256") + .update(input.parentRunId) + .update("\0") + .update(input.stepId) + .update("\0") + .update(input.inputHash) + .digest("base64url") + .slice(0, 32); + const runId = `wfr_child_${digest}`; + assert( + WorkflowRunIdSchema.safeParse(runId).success, + "deriveChildWorkflowRunId: derived child run id must satisfy workflow run id schema" + ); + return runId; +}