From 8f833042040158c000e9975c3d6279e7e845e94a Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Tue, 16 Jun 2026 08:54:42 +0000 Subject: [PATCH] tests: stabilize WorkflowService background interrupts --- .../workflows/WorkflowService.test.ts | 81 +++++++++++++++++++ .../services/workflows/WorkflowService.ts | 61 +++++++++++--- 2 files changed, 129 insertions(+), 13 deletions(-) diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 1286115ca8..cec9e62461 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -620,6 +620,7 @@ export default function workflow({ action }) { ); let releaseAgent: ((value: { taskId: string; reportMarkdown: string }) => void) | undefined; + const runCreatedGate = Promise.withResolvers(); const lifecycleEvents: string[] = []; const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); const service = new WorkflowService({ @@ -638,6 +639,8 @@ export default function workflow({ action }) { runnerId: "runner-a", }); + // Hold the callback open so this assertion checks WorkflowService's ordering instead of + // racing the runner after a synchronous onRunCreated callback returns. const resultPromise = service.startNamedWorkflow({ name: "scheduled-scan", workspaceId: "workspace-1", @@ -656,6 +659,7 @@ export default function workflow({ action }) { args: { severity: "high" }, }, }); + return runCreatedGate.promise; }, }); @@ -663,6 +667,8 @@ export default function workflow({ action }) { lifecycleEvents.includes("run-created") ); expect(lifecycleEvents).toEqual(["run-created"]); + expect(releaseAgent).toBeUndefined(); + runCreatedGate.resolve(); await waitForCondition("foreground agent to start", () => releaseAgent != null); expect(lifecycleEvents).toEqual(["run-created", "agent-started"]); @@ -2580,6 +2586,81 @@ export default function workflow({ action }) { expect(terminalEvents).toEqual([]); }); + test("notifies requested interrupted background runs without logging a failure", async () => { + using tmp = new DisposableTempDir("workflow-service"); + const projectRoot = path.join(tmp.path, "project", ".mux", "workflows"); + const globalRoot = path.join(tmp.path, "mux-home", "workflows"); + await writeWorkflow( + globalRoot, + "logged-interrupt-background", + "// description: Logged interrupt background workflow\nexport default function workflow({ agent }) { return agent({ id: 'slow-step', prompt: 'slow' }); }\n" + ); + const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); + let agentStarted = false; + const terminalEvents: Array<{ runId: string; status: string; result: unknown }> = []; + const consoleErrors: Array> = []; + const originalConsoleError = console.error; + console.error = (...args: Parameters) => { + consoleErrors.push(args); + }; + try { + const service = new WorkflowService({ + definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), + runStore, + runtimeFactory: new QuickJSRuntimeFactory(), + taskAdapter: { + async runAgent(_spec, _lifecycle, waitOptions) { + agentStarted = true; + const abortSignal = waitOptions?.abortSignal; + if (abortSignal == null) { + throw new Error("expected a background run abort signal"); + } + return await new Promise<{ taskId: string; reportMarkdown: string }>((_, reject) => { + abortSignal.addEventListener("abort", () => reject(new Error("Task interrupted")), { + once: true, + }); + }); + }, + }, + onBackgroundRunTerminal(event) { + terminalEvents.push({ runId: event.runId, status: event.status, result: event.result }); + }, + notifyInterruptedBackgroundRunTerminal: true, + generateRunId: () => "wfr_background_interrupt_logged", + runnerId: "runner-a", + }); + + await service.startNamedWorkflowInBackground({ + name: "logged-interrupt-background", + workspaceId: "workspace-1", + projectTrusted: false, + args: {}, + }); + await waitForCondition("background agent to start", () => agentStarted); + + const interrupted = await service.interruptRun({ + workspaceId: "workspace-1", + runId: "wfr_background_interrupt_logged", + }); + + expect(interrupted.status).toBe("interrupted"); + await waitForCondition( + "interrupted background terminal callback", + () => terminalEvents.length === 1 + ); + expect(terminalEvents).toEqual([ + { + runId: "wfr_background_interrupt_logged", + status: "interrupted", + result: null, + }, + ]); + expect(consoleErrors).toEqual([]); + } finally { + console.error = originalConsoleError; + } + }); + test("auto-resumes crash-recovered running runs without resuming user-interrupted runs", async () => { using tmp = new DisposableTempDir("workflow-service"); const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index 0902b9fbfe..be2353ea9d 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -118,6 +118,7 @@ const WORKFLOW_BACKGROUND_CONTINUATION_STATUSES = new Set([ // oRPC creates a WorkflowService per request, so workflow lifecycle state that spans requests // needs process-wide registries. const pendingCrashResumeTimers = new Map>(); +const activeWorkflowInterruptStatusWrites = new Map>(); const activeWorkflowRunnerAbortControllers = new Map(); export class WorkflowService { @@ -263,17 +264,35 @@ export class WorkflowService { async interruptRun(input: { workspaceId: string; runId: string }): Promise { const run = await this.requireRunForWorkspace(input); assertWorkflowRunCanTransition(run.status, "interrupted"); - // Stop the active coordinator only after ownership is validated; child cleanup and status - // writes can block on I/O, but a mis-scoped request must not abort another workspace's run. - this.abortActiveRunner(input.runId); - const interrupted = await this.runStore.appendStatus( - input.runId, - "interrupted", - this.clock?.nowIso() ?? new Date().toISOString() - ); - await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId])); - await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.(); - return interrupted; + const interruptStatusWrite = Promise.withResolvers(); + activeWorkflowInterruptStatusWrites.set(input.runId, interruptStatusWrite.promise); + let statusWriteSettled = false; + const settleStatusWrite = () => { + if (statusWriteSettled) { + return; + } + statusWriteSettled = true; + interruptStatusWrite.resolve(); + }; + try { + // Stop the active coordinator only after ownership is validated; child cleanup and status + // writes can block on I/O, but a mis-scoped request must not abort another workspace's run. + this.abortActiveRunner(input.runId); + const interrupted = await this.runStore.appendStatus( + input.runId, + "interrupted", + this.clock?.nowIso() ?? new Date().toISOString() + ); + settleStatusWrite(); + await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId])); + await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.(); + return interrupted; + } finally { + settleStatusWrite(); + if (activeWorkflowInterruptStatusWrites.get(input.runId) === interruptStatusWrite.promise) { + activeWorkflowInterruptStatusWrites.delete(input.runId); + } + } } async retryRunFromCheckpointInBackground(input: { @@ -750,10 +769,13 @@ export class WorkflowService { .catch(async (error: unknown) => { const hadStarted = startedSettled; markStartFailed(error); - if (hadStarted || !isWorkflowRunAlreadyActiveError(error, runId)) { + if (!hadStarted && isWorkflowRunAlreadyActiveError(error, runId)) { + return; + } + if (!(await this.isInterruptedBackgroundRun(runId))) { console.error(failureMessage, error); - await this.notifyBackgroundRunTerminal(runId, null); } + await this.notifyBackgroundRunTerminal(runId, null); }); this.backgroundRuns.add(runPromise); void runPromise.finally(() => { @@ -763,6 +785,19 @@ export class WorkflowService { return started; } + private async isInterruptedBackgroundRun(runId: string): Promise { + assert(runId.length > 0, "WorkflowService.isInterruptedBackgroundRun: runId required"); + // interruptRun aborts the active runner before writing the durable interrupted status so + // cancellation is prompt; wait for that status write before classifying the background exit. + await activeWorkflowInterruptStatusWrites.get(runId); + try { + const run = await this.runStore.getRun(runId); + return run.status === "interrupted"; + } catch { + return false; + } + } + private async notifyBackgroundRunTerminal(runId: string, result: unknown): Promise { if (this.onBackgroundRunTerminal == null) { return;