Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/node/services/workflows/WorkflowService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ export default function workflow({ action }) {
);

let releaseAgent: ((value: { taskId: string; reportMarkdown: string }) => void) | undefined;
const runCreatedGate = Promise.withResolvers<void>();
const lifecycleEvents: string[] = [];
const runStore = new WorkflowRunStore({ sessionDir: tmp.path });
const service = new WorkflowService({
Expand All @@ -638,6 +639,8 @@ export default function workflow({ action }) {
runnerId: "runner-a",
});

// Hold the callback open so this assertion checks WorkflowService's ordering instead of
// racing the runner after a synchronous onRunCreated callback returns.
const resultPromise = service.startNamedWorkflow({
name: "scheduled-scan",
workspaceId: "workspace-1",
Expand All @@ -656,13 +659,16 @@ export default function workflow({ action }) {
args: { severity: "high" },
},
});
return runCreatedGate.promise;
},
});

await waitForCondition("foreground run creation callback", () =>
lifecycleEvents.includes("run-created")
);
expect(lifecycleEvents).toEqual(["run-created"]);
expect(releaseAgent).toBeUndefined();
runCreatedGate.resolve();
await waitForCondition("foreground agent to start", () => releaseAgent != null);
expect(lifecycleEvents).toEqual(["run-created", "agent-started"]);

Expand Down Expand Up @@ -2580,6 +2586,81 @@ export default function workflow({ action }) {
expect(terminalEvents).toEqual([]);
});

test("notifies requested interrupted background runs without logging a failure", async () => {
using tmp = new DisposableTempDir("workflow-service");
const projectRoot = path.join(tmp.path, "project", ".mux", "workflows");
const globalRoot = path.join(tmp.path, "mux-home", "workflows");
await writeWorkflow(
globalRoot,
"logged-interrupt-background",
"// description: Logged interrupt background workflow\nexport default function workflow({ agent }) { return agent({ id: 'slow-step', prompt: 'slow' }); }\n"
);
const runStore = new WorkflowRunStore({ sessionDir: tmp.path });
let agentStarted = false;
const terminalEvents: Array<{ runId: string; status: string; result: unknown }> = [];
const consoleErrors: Array<Parameters<typeof console.error>> = [];
const originalConsoleError = console.error;
console.error = (...args: Parameters<typeof console.error>) => {
consoleErrors.push(args);
};
try {
const service = new WorkflowService({
definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }),
runStore,
runtimeFactory: new QuickJSRuntimeFactory(),
taskAdapter: {
async runAgent(_spec, _lifecycle, waitOptions) {
agentStarted = true;
const abortSignal = waitOptions?.abortSignal;
if (abortSignal == null) {
throw new Error("expected a background run abort signal");
}
return await new Promise<{ taskId: string; reportMarkdown: string }>((_, reject) => {
abortSignal.addEventListener("abort", () => reject(new Error("Task interrupted")), {
once: true,
});
});
},
},
onBackgroundRunTerminal(event) {
terminalEvents.push({ runId: event.runId, status: event.status, result: event.result });
},
notifyInterruptedBackgroundRunTerminal: true,
generateRunId: () => "wfr_background_interrupt_logged",
runnerId: "runner-a",
});

await service.startNamedWorkflowInBackground({
name: "logged-interrupt-background",
workspaceId: "workspace-1",
projectTrusted: false,
args: {},
});
await waitForCondition("background agent to start", () => agentStarted);

const interrupted = await service.interruptRun({
workspaceId: "workspace-1",
runId: "wfr_background_interrupt_logged",
});

expect(interrupted.status).toBe("interrupted");
await waitForCondition(
"interrupted background terminal callback",
() => terminalEvents.length === 1
);
expect(terminalEvents).toEqual([
{
runId: "wfr_background_interrupt_logged",
status: "interrupted",
result: null,
},
]);
expect(consoleErrors).toEqual([]);
} finally {
console.error = originalConsoleError;
}
});

test("auto-resumes crash-recovered running runs without resuming user-interrupted runs", async () => {
using tmp = new DisposableTempDir("workflow-service");
const runStore = new WorkflowRunStore({ sessionDir: tmp.path });
Expand Down
61 changes: 48 additions & 13 deletions src/node/services/workflows/WorkflowService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const WORKFLOW_BACKGROUND_CONTINUATION_STATUSES = new Set<WorkflowRunStatus>([
// oRPC creates a WorkflowService per request, so workflow lifecycle state that spans requests
// needs process-wide registries.
const pendingCrashResumeTimers = new Map<string, ReturnType<typeof setTimeout>>();
const activeWorkflowInterruptStatusWrites = new Map<string, Promise<void>>();
const activeWorkflowRunnerAbortControllers = new Map<string, AbortController>();

export class WorkflowService {
Expand Down Expand Up @@ -263,17 +264,35 @@ export class WorkflowService {
async interruptRun(input: { workspaceId: string; runId: string }): Promise<WorkflowRunRecord> {
const run = await this.requireRunForWorkspace(input);
assertWorkflowRunCanTransition(run.status, "interrupted");
// Stop the active coordinator only after ownership is validated; child cleanup and status
// writes can block on I/O, but a mis-scoped request must not abort another workspace's run.
this.abortActiveRunner(input.runId);
const interrupted = await this.runStore.appendStatus(
input.runId,
"interrupted",
this.clock?.nowIso() ?? new Date().toISOString()
);
await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId]));
await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.();
return interrupted;
const interruptStatusWrite = Promise.withResolvers<void>();
activeWorkflowInterruptStatusWrites.set(input.runId, interruptStatusWrite.promise);
let statusWriteSettled = false;
const settleStatusWrite = () => {
if (statusWriteSettled) {
return;
}
statusWriteSettled = true;
interruptStatusWrite.resolve();
};
try {
// Stop the active coordinator only after ownership is validated; child cleanup and status
// writes can block on I/O, but a mis-scoped request must not abort another workspace's run.
this.abortActiveRunner(input.runId);
const interrupted = await this.runStore.appendStatus(
input.runId,
"interrupted",
this.clock?.nowIso() ?? new Date().toISOString()
);
settleStatusWrite();
await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId]));
await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.();
return interrupted;
} finally {
settleStatusWrite();
if (activeWorkflowInterruptStatusWrites.get(input.runId) === interruptStatusWrite.promise) {
activeWorkflowInterruptStatusWrites.delete(input.runId);
}
}
}

async retryRunFromCheckpointInBackground(input: {
Expand Down Expand Up @@ -750,10 +769,13 @@ export class WorkflowService {
.catch(async (error: unknown) => {
const hadStarted = startedSettled;
markStartFailed(error);
if (hadStarted || !isWorkflowRunAlreadyActiveError(error, runId)) {
if (!hadStarted && isWorkflowRunAlreadyActiveError(error, runId)) {
return;
}
if (!(await this.isInterruptedBackgroundRun(runId))) {
console.error(failureMessage, error);
await this.notifyBackgroundRunTerminal(runId, null);
}
await this.notifyBackgroundRunTerminal(runId, null);
});
this.backgroundRuns.add(runPromise);
void runPromise.finally(() => {
Expand All @@ -763,6 +785,19 @@ export class WorkflowService {
return started;
}

private async isInterruptedBackgroundRun(runId: string): Promise<boolean> {
assert(runId.length > 0, "WorkflowService.isInterruptedBackgroundRun: runId required");
// interruptRun aborts the active runner before writing the durable interrupted status so
// cancellation is prompt; wait for that status write before classifying the background exit.
await activeWorkflowInterruptStatusWrites.get(runId);
try {
const run = await this.runStore.getRun(runId);
return run.status === "interrupted";
} catch {
return false;
}
}

private async notifyBackgroundRunTerminal(runId: string, result: unknown): Promise<void> {
if (this.onBackgroundRunTerminal == null) {
return;
Expand Down
Loading