From 21b0f11e181f978bc4f8a987b8714bc21dd40936 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 15:43:11 +0000 Subject: [PATCH 1/9] Show active workflow runs in workspace sidebar --- .../AgentListItem/AgentListItem.stories.tsx | 33 +++++ .../AgentListItem/AgentListItem.test.tsx | 26 ++++ .../AgentListItem/AgentListItem.tsx | 38 +++++- .../PinnedTodoList/PinnedTodoList.test.tsx | 1 + .../ProjectSidebar/ProjectSidebar.test.tsx | 1 + .../ProjectSidebar/ProjectSidebar.tsx | 15 ++- .../WorkspaceStatusIndicator.test.tsx | 3 + .../ImmersiveReviewAgentStatusBar.test.tsx | 1 + src/browser/stores/WorkspaceStore.test.ts | 3 + src/browser/stores/WorkspaceStore.ts | 7 + src/browser/utils/commands/sources.test.ts | 1 + src/common/orpc/schemas/workspace.ts | 4 + src/node/orpc/router.ts | 1 + .../workflows/WorkflowService.test.ts | 8 ++ .../services/workflows/WorkflowService.ts | 52 ++++++++ src/node/services/workspaceService.ts | 120 +++++++++++++++++- 16 files changed, 302 insertions(+), 12 deletions(-) diff --git a/src/browser/components/AgentListItem/AgentListItem.stories.tsx b/src/browser/components/AgentListItem/AgentListItem.stories.tsx index 4f4829eca7..0b3214ad68 100644 --- a/src/browser/components/AgentListItem/AgentListItem.stories.tsx +++ b/src/browser/components/AgentListItem/AgentListItem.stories.tsx @@ -459,9 +459,42 @@ function renderAppSidebarThreeActiveSubAgents() { ); } +function renderWorkflowOnlyActivity() { + const workflowWorkspace = createWorkspace({ + id: "ws-workflow-only", + name: "workflow-only", + title: "Workflow-only run", + projectName: PROJECT_NAME, + projectPath: PROJECT_PATH, + createdAt: new Date(NOW - 6_000).toISOString(), + }); + + return ( + + + + ); +} + // Composite gallery covering the primary single-workspace states. Replaces the // former FigmaStates, Selected, Active, ErrorState, Archiving, Question, and // Draft stories โ€” one snapshot, all states preserved and labeled. +export const WorkflowOnlyActivity: Story = { + args: undefined as never, + render: renderWorkflowOnlyActivity, +}; + export const States: Story = { args: undefined as never, render: renderStatesGallery, diff --git a/src/browser/components/AgentListItem/AgentListItem.test.tsx b/src/browser/components/AgentListItem/AgentListItem.test.tsx index 2841b32ba5..6fb2b0c96b 100644 --- a/src/browser/components/AgentListItem/AgentListItem.test.tsx +++ b/src/browser/components/AgentListItem/AgentListItem.test.tsx @@ -75,6 +75,7 @@ function createWorkspaceSidebarState( loadedSkills: [], skillLoadErrors: [], agentStatus: undefined, + activeWorkflowRunCount: 0, terminalActiveCount: 0, terminalSessionCount: 0, ...overrides, @@ -362,6 +363,31 @@ describe("AgentListItem", () => { expect(customTitle.view.getByText("My renamed run")).toBeTruthy(); }); + test("shows workflow-only activity on idle workspace rows", () => { + mockWorkspaceSidebarState = createWorkspaceSidebarState({ activeWorkflowRunCount: 1 }); + + const { row } = renderWorkspaceItem(); + const rowView = within(row); + + expect(row.querySelector(".workspace-status-dot-active")).toBeTruthy(); + expect(rowView.getByText("Workflow running")).toBeTruthy(); + expect(rowView.queryByTestId(`workspace-status-indicator-${TEST_WORKSPACE_ID}`)).toBeNull(); + }); + + test("keeps sidebar status text while workflow-only activity drives the active dot", () => { + mockWorkspaceSidebarState = createWorkspaceSidebarState({ + activeWorkflowRunCount: 1, + agentStatus: { emoji: "๐Ÿ”„", message: "Verifying workflow output" }, + }); + + const { row } = renderWorkspaceItem(); + const rowView = within(row); + + expect(row.querySelector(".workspace-status-dot-active")).toBeTruthy(); + expect(rowView.getByTestId(`workspace-status-indicator-${TEST_WORKSPACE_ID}`)).toBeTruthy(); + expect(rowView.queryByText("Workflow running")).toBeNull(); + }); + test("shows active delegated workflow work on idle workspace rows", () => { const { row } = renderWorkspaceItem({ delegatedActivity: { diff --git a/src/browser/components/AgentListItem/AgentListItem.tsx b/src/browser/components/AgentListItem/AgentListItem.tsx index 3c3a09e30a..69deb4d61d 100644 --- a/src/browser/components/AgentListItem/AgentListItem.tsx +++ b/src/browser/components/AgentListItem/AgentListItem.tsx @@ -16,6 +16,7 @@ import { type AgentRowRenderMeta, type WorkspaceDelegatedActivity, } from "@/browser/utils/ui/workspaceFiltering"; +import assert from "@/common/utils/assert"; import { cn } from "@/common/lib/utils"; import { TASK_GROUP_KIND, @@ -218,6 +219,24 @@ function formatDelegatedActivityText(activity: WorkspaceDelegatedActivity): stri return parts.length > 0 ? parts.join(" ยท ") : null; } +function formatWorkflowRunCount(count: number): string { + assert(count > 0, "formatWorkflowRunCount requires a positive count"); + return count === 1 ? "Workflow running" : `${count} workflows running`; +} + +function WorkflowActivityIndicator(props: { workspaceId: string; activeWorkflowRunCount: number }) { + const statusText = formatWorkflowRunCount(props.activeWorkflowRunCount); + + return ( +
+ {statusText} +
+ ); +} + function DelegatedActivityIndicator(props: { workspaceId: string; activity: WorkspaceDelegatedActivity; @@ -612,6 +631,7 @@ function RegularAgentListItemInner(props: AgentListItemProps) { awaitingUserQuestion, isStarting, agentStatus, + activeWorkflowRunCount, terminalActiveCount, lastAbortReason, } = useWorkspaceSidebarState(workspaceId); @@ -626,13 +646,19 @@ function RegularAgentListItemInner(props: AgentListItemProps) { useWorkspaceStreamingStatusPhase(streamingStatusPhase); const isWorking = displayStreamingStatusPhase !== null && !awaitingUserQuestion; const hasError = lastAbortReason?.reason === "system"; + const hasActiveWorkflowRun = activeWorkflowRunCount > 0; const hasActiveDelegatedWork = (delegatedActivity?.activeCount ?? 0) > 0; const delegatedStatusText = delegatedActivity ? formatDelegatedActivityText(delegatedActivity) : null; const hasDelegatedStatusText = delegatedStatusText != null; + const shouldShowWorkflowStatus = + hasActiveWorkflowRun && !agentStatus && displayStreamingStatusPhase === null; const hasOwnLiveStatusText = - awaitingUserQuestion || displayStreamingStatusPhase !== null || isRemoving; + awaitingUserQuestion || + displayStreamingStatusPhase !== null || + isRemoving || + shouldShowWorkflowStatus; const shouldShowDelegatedStatus = hasDelegatedStatusText && !hasOwnLiveStatusText && !hasError; const visualState = getVisualState({ awaitingUserQuestion, @@ -641,7 +667,7 @@ function RegularAgentListItemInner(props: AgentListItemProps) { isArchiving: isArchiving === true, isWorking, isStarting: displayStreamingStatusPhase === "starting", - hasActiveDelegatedWork, + hasActiveDelegatedWork: hasActiveDelegatedWork || hasActiveWorkflowRun, isUnread, isSelected, hasError, @@ -653,7 +679,8 @@ function RegularAgentListItemInner(props: AgentListItemProps) { Boolean(agentStatus) || awaitingUserQuestion || displayStreamingStatusPhase !== null || - isRemoving; + isRemoving || + shouldShowWorkflowStatus; // Keep archiving feedback inline with the title so the row doesn't jump to a // two-line layout right before it disappears from the sidebar. const shouldShowInlineArchivingStatus = isArchiving === true && !isRemoving; @@ -1145,6 +1172,11 @@ function RegularAgentListItemInner(props: AgentListItemProps) { workspaceId={workspaceId} activity={delegatedActivity} /> + ) : shouldShowWorkflowStatus ? ( + ) : ( 0) && + !sidebarState.awaitingUserQuestion; return { isWorking, awaitingUserQuestion: sidebarState.awaitingUserQuestion, @@ -1171,12 +1174,10 @@ const ProjectSidebarInner: React.FC = ({ const workspaceHasAttention = useCallback( (workspace: FrontendWorkspaceMetadata) => { const workspaceId = workspace.id; - const aggregator = workspaceStore.getAggregator(workspaceId); - const hasActiveStreams = aggregator?.hasInterruptibleActiveStream() ?? false; - const isStarting = aggregator?.getPendingStreamStartTime() != null && !hasActiveStreams; - const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false; - const isWorking = (hasActiveStreams || isStarting) && !awaitingUserQuestion; - const hasError = aggregator?.getLastAbortReason()?.reason === "system"; + const attentionSignal = getWorkspaceAttentionSignal(workspaceStore, workspaceId); + const isWorking = attentionSignal?.isWorking === true; + const awaitingUserQuestion = attentionSignal?.awaitingUserQuestion === true; + const hasError = attentionSignal?.hasSystemError === true; const isRemoving = workspace.isRemoving === true; const isArchiving = archivingWorkspaceIds.has(workspaceId); const isInitializing = workspace.isInitializing === true; diff --git a/src/browser/components/WorkspaceStatusIndicator/WorkspaceStatusIndicator.test.tsx b/src/browser/components/WorkspaceStatusIndicator/WorkspaceStatusIndicator.test.tsx index afe5981d21..5d57e0ddbe 100644 --- a/src/browser/components/WorkspaceStatusIndicator/WorkspaceStatusIndicator.test.tsx +++ b/src/browser/components/WorkspaceStatusIndicator/WorkspaceStatusIndicator.test.tsx @@ -23,6 +23,7 @@ function mockSidebarState( loadedSkills: [], skillLoadErrors: [], agentStatus: undefined, + activeWorkflowRunCount: 0, terminalActiveCount: 0, terminalSessionCount: 0, ...overrides, @@ -143,6 +144,7 @@ describe("WorkspaceStatusIndicator", () => { loadedSkills: [], skillLoadErrors: [], agentStatus: undefined, + activeWorkflowRunCount: 0, terminalActiveCount: 0, terminalSessionCount: 0, }; @@ -201,6 +203,7 @@ describe("WorkspaceStatusIndicator", () => { loadedSkills: [], skillLoadErrors: [], agentStatus: undefined, + activeWorkflowRunCount: 0, terminalActiveCount: 0, terminalSessionCount: 0, }; diff --git a/src/browser/features/RightSidebar/CodeReview/ImmersiveReviewAgentStatusBar.test.tsx b/src/browser/features/RightSidebar/CodeReview/ImmersiveReviewAgentStatusBar.test.tsx index 176bfb0618..a7a1aa3ffb 100644 --- a/src/browser/features/RightSidebar/CodeReview/ImmersiveReviewAgentStatusBar.test.tsx +++ b/src/browser/features/RightSidebar/CodeReview/ImmersiveReviewAgentStatusBar.test.tsx @@ -58,6 +58,7 @@ function buildState(workspaceId: string, input: SeedInput): WorkspaceState { loadedSkills: [], skillLoadErrors: [], agentStatus: undefined, + activeWorkflowRunCount: 0, lastAbortReason: null, pendingStreamStartTime: null, pendingStreamModel: null, diff --git a/src/browser/stores/WorkspaceStore.test.ts b/src/browser/stores/WorkspaceStore.test.ts index b141648b27..c712469162 100644 --- a/src/browser/stores/WorkspaceStore.test.ts +++ b/src/browser/stores/WorkspaceStore.test.ts @@ -2947,6 +2947,7 @@ describe("WorkspaceStore", () => { streaming: true, lastModel: "claude-sonnet-4", lastThinkingLevel: "high", + activeWorkflowRunCount: 1, todoStatus: { emoji: "๐Ÿ”„", message: "Run checks" }, hasTodos: true, }; @@ -2971,6 +2972,8 @@ describe("WorkspaceStore", () => { expect(state.canInterrupt).toBe(true); expect(state.currentModel).toBe(activitySnapshot.lastModel); expect(state.currentThinkingLevel).toBe(activitySnapshot.lastThinkingLevel); + expect(state.activeWorkflowRunCount).toBe(1); + expect(store.getWorkspaceSidebarState(workspaceId).activeWorkflowRunCount).toBe(1); expect(state.agentStatus).toEqual(activitySnapshot.todoStatus ?? undefined); expect(state.recencyTimestamp).toBe(activitySnapshot.recency); }); diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 00a73afc7f..4f88180ee8 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -123,6 +123,7 @@ export interface WorkspaceState { loadedSkills: LoadedSkill[]; skillLoadErrors: SkillLoadError[]; agentStatus: { emoji: string; message: string; url?: string } | undefined; + activeWorkflowRunCount: number; lastAbortReason: StreamAbortReasonSnapshot | null; pendingStreamStartTime: number | null; // Model used for the pending send (used during "starting" phase) @@ -181,6 +182,7 @@ export interface WorkspaceSidebarState { loadedSkills: LoadedSkill[]; skillLoadErrors: SkillLoadError[]; agentStatus: { emoji: string; message: string; url?: string } | undefined; + activeWorkflowRunCount: number; terminalActiveCount: number; terminalSessionCount: number; goal?: GoalSnapshot | null; @@ -1974,6 +1976,7 @@ export class WorkspaceStore { (activity?.hasTodos === false ? undefined : deriveTodoStatus(aggregatorTodos))); const agentStatus = displayStatus ?? liveTodoStatus ?? fallbackAgentStatus ?? persistedTodoStatus; + const activeWorkflowRunCount = activity?.activeWorkflowRunCount ?? 0; const goal = activity?.goal ?? null; return { @@ -1998,6 +2001,7 @@ export class WorkspaceStore { skillLoadErrors: aggregator.getSkillLoadErrors(), lastAbortReason: aggregator.getLastAbortReason(), agentStatus, + activeWorkflowRunCount, pendingStreamStartTime, pendingStreamModel: aggregator.getPendingStreamModel(), autoRetryStatus: transient.autoRetryStatus, @@ -2108,6 +2112,7 @@ export class WorkspaceStore { cached.loadedSkills === fullState.loadedSkills && cached.skillLoadErrors === fullState.skillLoadErrors && cached.agentStatus === fullState.agentStatus && + cached.activeWorkflowRunCount === fullState.activeWorkflowRunCount && cached.terminalActiveCount === terminalActiveCount && cached.terminalSessionCount === terminalSessionCount && cached.goal === fullState.goal @@ -2130,6 +2135,7 @@ export class WorkspaceStore { loadedSkills: fullState.loadedSkills, skillLoadErrors: fullState.skillLoadErrors, agentStatus: fullState.agentStatus, + activeWorkflowRunCount: fullState.activeWorkflowRunCount, terminalActiveCount, terminalSessionCount, goal: fullState.goal, @@ -2791,6 +2797,7 @@ export class WorkspaceStore { previous?.lastThinkingLevel !== snapshot?.lastThinkingLevel || previous?.recency !== snapshot?.recency || previous?.hasTodos !== snapshot?.hasTodos || + (previous?.activeWorkflowRunCount ?? 0) !== (snapshot?.activeWorkflowRunCount ?? 0) || !areAgentStatusesEqual(previous?.displayStatus, snapshot?.displayStatus) || !areAgentStatusesEqual(previous?.todoStatus, snapshot?.todoStatus) || previous?.goal?.goalId !== snapshot?.goal?.goalId || diff --git a/src/browser/utils/commands/sources.test.ts b/src/browser/utils/commands/sources.test.ts index a5dc8f10c8..79bad7c14a 100644 --- a/src/browser/utils/commands/sources.test.ts +++ b/src/browser/utils/commands/sources.test.ts @@ -536,6 +536,7 @@ function makeWorkspaceState(goal: WorkspaceState["goal"]): WorkspaceState { loadedSkills: [], skillLoadErrors: [], agentStatus: undefined, + activeWorkflowRunCount: 0, lastAbortReason: null, pendingStreamStartTime: null, pendingStreamModel: null, diff --git a/src/common/orpc/schemas/workspace.ts b/src/common/orpc/schemas/workspace.ts index 50de07df93..e18b29c056 100644 --- a/src/common/orpc/schemas/workspace.ts +++ b/src/common/orpc/schemas/workspace.ts @@ -375,6 +375,10 @@ export const WorkspaceActivitySnapshotSchema = z.object({ isIdleCompaction: z.boolean().optional().meta({ description: "Whether the current streaming activity is an idle (background) compaction", }), + activeWorkflowRunCount: z.number().int().nonnegative().optional().meta({ + description: + "Number of top-level workflow runs in this workspace that are pending, running, or backgrounded.", + }), goal: GoalSnapshotSchema.nullable().optional().meta({ description: "Current workspace goal snapshot for sidebar indicators and the Goal tab", }), diff --git a/src/node/orpc/router.ts b/src/node/orpc/router.ts index 1080f846ec..1f767b1495 100644 --- a/src/node/orpc/router.ts +++ b/src/node/orpc/router.ts @@ -456,6 +456,7 @@ export async function resolveWorkflowContext( subagentFileReports: subagentFileReportsExperimentEnabled, }, }), + onRunStatusChanged: (event) => context.workspaceService.emitWorkflowRunActivity(event), ...(options.onBackgroundRunTerminal != null ? { onBackgroundRunTerminal: options.onBackgroundRunTerminal } : {}), diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 02da576eea..3fb399e4b8 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -240,10 +240,14 @@ export default function workflow({ args, agent }) { return { taskId: "task_1", reportMarkdown: "child summary" }; }, }; + const statusEvents: Array<{ workspaceId: string; runId: string; status: string }> = []; const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); const service = new WorkflowService({ definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), runStore, + onRunStatusChanged: (event) => { + statusEvents.push(event); + }, runtimeFactory: new QuickJSRuntimeFactory(), taskAdapter, generateRunId: () => "wfr_demo", @@ -267,6 +271,10 @@ export default function workflow({ args, agent }) { status: "completed", result: { reportMarkdown: "Final child summary" }, }); + expect(statusEvents).toEqual([ + { workspaceId: "workspace-1", runId: "wfr_demo", status: "pending" }, + { workspaceId: "workspace-1", runId: "wfr_demo", status: "completed" }, + ]); expect(run.definitionSource).toBe(source); expect(run.definition.scope).toBe("global"); }); diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index ab42d2b306..856db0d969 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -38,6 +38,12 @@ export interface WorkflowBackgroundRunTerminalEvent { run: WorkflowRunRecord; } +export interface WorkflowRunStatusChangedEvent { + workspaceId: string; + runId: string; + status: WorkflowRunStatus; +} + export interface WorkflowServiceOptions { definitionStore: WorkflowDefinitionStore; runStore: WorkflowRunStore; @@ -49,6 +55,7 @@ export interface WorkflowServiceOptions { /** workflowName is the human-readable definition name, used to label spawned tasks. */ taskAdapterFactory?: (runId: string, workflowName?: string) => WorkflowTaskAdapter; onBackgroundRunTerminal?: (event: WorkflowBackgroundRunTerminalEvent) => Promise | void; + onRunStatusChanged?: (event: WorkflowRunStatusChangedEvent) => Promise | void; /** When true, background terminal notifications also fire for interrupted runs. */ notifyInterruptedBackgroundRunTerminal?: boolean; generateRunId?: () => string; @@ -140,6 +147,9 @@ export class WorkflowService { private readonly onBackgroundRunTerminal?: ( event: WorkflowBackgroundRunTerminalEvent ) => Promise | void; + private readonly onRunStatusChanged?: ( + event: WorkflowRunStatusChangedEvent + ) => Promise | void; private readonly notifyInterruptedBackgroundRunTerminal: boolean; private readonly generateRunId: () => string; private readonly getCurrentProjectTrusted?: () => boolean | Promise; @@ -163,6 +173,7 @@ export class WorkflowService { this.taskAdapter = options.taskAdapter; this.taskAdapterFactory = options.taskAdapterFactory; this.onBackgroundRunTerminal = options.onBackgroundRunTerminal; + this.onRunStatusChanged = options.onRunStatusChanged; this.notifyInterruptedBackgroundRunTerminal = options.notifyInterruptedBackgroundRunTerminal === true; this.generateRunId = options.generateRunId ?? generateWorkflowRunId; @@ -271,6 +282,31 @@ export class WorkflowService { } } + private async notifyRunStatusChanged( + run: WorkflowRunRecord, + status: WorkflowRunStatus = run.status + ): Promise { + if (this.onRunStatusChanged == null || run.parentWorkflow != null) { + return; + } + try { + await this.onRunStatusChanged({ workspaceId: run.workspaceId, runId: run.id, status }); + } catch (error) { + console.error("Workflow run activity notification failed:", error); + } + } + + private async notifyLatestRunStatus(runId: string): Promise { + if (this.onRunStatusChanged == null) { + return; + } + try { + await this.notifyRunStatusChanged(await this.runStore.getRun(runId)); + } catch (error) { + console.error("Failed to load workflow run for activity notification:", error); + } + } + async interruptRun(input: { workspaceId: string; runId: string }): Promise { const run = await this.requireRunForWorkspace(input); assertWorkflowRunCanTransition(run.status, "interrupted"); @@ -294,6 +330,7 @@ export class WorkflowService { this.clock?.nowIso() ?? new Date().toISOString() ); settleStatusWrite(); + await this.notifyRunStatusChanged(interrupted); await this.interruptChildWorkflowRuns(input.runId, input.workspaceId, new Set([input.runId])); await (this.taskAdapterFactory?.(input.runId) ?? this.requireTaskAdapter()).interruptRun?.(); return interrupted; @@ -317,6 +354,7 @@ export class WorkflowService { allowRetryFromFailedCheckpoint: true, projectTrusted: input.projectTrusted, }); + await this.notifyRunStatusChanged(run, "running"); return { runId: input.runId, status: "running", result: null }; } @@ -332,6 +370,7 @@ export class WorkflowService { allowResumeFromInterrupted: run.status === "interrupted", projectTrusted: input.projectTrusted, }); + await this.notifyRunStatusChanged(run, "running"); return { runId: input.runId, status: "running", result: null }; } @@ -394,6 +433,8 @@ export class WorkflowService { throw new Error(`Workflow run interrupted: ${input.runId}`); } + await this.notifyRunStatusChanged(await this.runStore.getRun(input.runId), "running"); + const runnerAbortController = new AbortController(); let unregisterRunnerAbort: () => void = () => undefined; const abortInterrupt = this.interruptRunOnAbort( @@ -414,6 +455,7 @@ export class WorkflowService { }, ...input.runnerOptions, }); + await this.notifyLatestRunStatus(input.runId); return { runId: input.runId, status: "completed", result }; } catch (error) { if (error instanceof WorkflowRunBackgroundedError) { @@ -425,6 +467,7 @@ export class WorkflowService { // silently reverted back to `running`. Likewise skip the continuation entirely // when this call was aborted (interruptRunOnAbort aborts our runner controller and // is concurrently transitioning the run to `interrupted`). + await this.notifyLatestRunStatus(input.runId); if (!runnerAbortController.signal.aborted) { void this.runInBackground(input.runId, input.backgroundedFailureMessage, { projectTrusted: input.projectTrusted, @@ -432,6 +475,7 @@ export class WorkflowService { } return { runId: input.runId, status: "backgrounded", result: null }; } + await this.notifyLatestRunStatus(input.runId); throw error; } finally { abortInterrupt.remove(); @@ -495,12 +539,14 @@ export class WorkflowService { ): Promise { const createdRun = await this.createNamedWorkflowRun(input); const runId = createdRun.id; + await this.notifyRunStatusChanged(createdRun); await input.onRunCreated?.({ runId, status: "pending", result: null, run: createdRun }); const run = await this.runStore.appendStatus( runId, "running", this.clock?.nowIso() ?? new Date().toISOString() ); + await this.notifyRunStatusChanged(run); await input.onBackgroundRunCreated?.({ runId, status: "running", result: null, run }); void this.runInBackground(runId, "Background workflow run failed:", { projectTrusted: input.projectTrusted, @@ -511,6 +557,7 @@ export class WorkflowService { async startNamedWorkflow(input: StartNamedWorkflowInput): Promise { const createdRun = await this.createNamedWorkflowRun(input); const runId = createdRun.id; + await this.notifyRunStatusChanged(createdRun); await input.onRunCreated?.({ runId, status: "pending", result: null, run: createdRun }); if (isAbortSignalAborted(input.abortSignal)) { await this.interruptRun({ workspaceId: input.workspaceId, runId }); @@ -539,14 +586,17 @@ export class WorkflowService { ); }, }); + await this.notifyLatestRunStatus(runId); return { runId, status: "completed", result }; } catch (error) { if (error instanceof WorkflowRunBackgroundedError) { + await this.notifyLatestRunStatus(runId); void this.runInBackground(runId, "Backgrounded workflow run failed:", { projectTrusted: input.projectTrusted, }).catch(() => undefined); return { runId, status: "backgrounded", result: null }; } + await this.notifyLatestRunStatus(runId); throw error; } finally { abortInterrupt.remove(); @@ -777,6 +827,7 @@ export class WorkflowService { ...runnerOptions, }) .then(async (result) => { + await this.notifyLatestRunStatus(runId); await this.notifyBackgroundRunTerminal(runId, result); }) .catch(async (error: unknown) => { @@ -788,6 +839,7 @@ export class WorkflowService { if (!(await this.isInterruptedBackgroundRun(runId))) { console.error(failureMessage, error); } + await this.notifyLatestRunStatus(runId); await this.notifyBackgroundRunTerminal(runId, null); }); this.backgroundRuns.add(runPromise); diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index f2621743a4..67d41ed9d3 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -127,7 +127,12 @@ import { type MuxMessageMetadata, type MuxMessage, } from "@/common/types/message"; -import type { WorkflowRunRecord } from "@/common/types/workflow"; +import { + isNestedWorkflowRun, + type WorkflowRunRecord, + type WorkflowRunStatus, +} from "@/common/types/workflow"; +import { WorkflowRunStore } from "@/node/services/workflows/WorkflowRunStore"; import { WORKFLOW_RESULT_METADATA_TYPE, WORKFLOW_RUN_CARD_DISPLAY_METADATA_TYPE, @@ -1454,6 +1459,21 @@ export declare interface WorkspaceService { ): boolean; } +const ACTIVE_WORKFLOW_RUN_STATUSES = new Set([ + "pending", + "running", + "backgrounded", +]); + +function createDefaultActivitySnapshot(): WorkspaceActivitySnapshot { + return { + recency: 0, + streaming: false, + lastModel: null, + lastThinkingLevel: null, + }; +} + // eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging export class WorkspaceService extends EventEmitter { private readonly sessions = new Map(); @@ -1882,6 +1902,70 @@ export class WorkspaceService extends EventEmitter { }); } + private async getActiveWorkflowRunCount( + workspaceId: string, + changedRun?: { runId: string; status: WorkflowRunStatus } + ): Promise { + assert(workspaceId.length > 0, "getActiveWorkflowRunCount requires workspaceId"); + const activeRunIds = new Set(); + try { + const runStore = new WorkflowRunStore({ sessionDir: this.config.getSessionDir(workspaceId) }); + const runs = await runStore.listRuns(); + for (const run of runs) { + if ( + run.workspaceId === workspaceId && + !isNestedWorkflowRun(run) && + ACTIVE_WORKFLOW_RUN_STATUSES.has(run.status) + ) { + activeRunIds.add(run.id); + } + } + } catch (error) { + log.debug("Failed to inspect active workflow runs for workspace activity", { + workspaceId, + error, + }); + } + + if (changedRun != null) { + if (ACTIVE_WORKFLOW_RUN_STATUSES.has(changedRun.status)) { + activeRunIds.add(changedRun.runId); + } else { + activeRunIds.delete(changedRun.runId); + } + } + + return activeRunIds.size; + } + + private async withActiveWorkflowRunCount( + workspaceId: string, + snapshot: WorkspaceActivitySnapshot | null, + changedRun?: { runId: string; status: WorkflowRunStatus } + ): Promise { + const activeWorkflowRunCount = await this.getActiveWorkflowRunCount(workspaceId, changedRun); + const baseSnapshot = snapshot ?? createDefaultActivitySnapshot(); + return { + ...baseSnapshot, + ...(activeWorkflowRunCount > 0 ? { activeWorkflowRunCount } : {}), + }; + } + + public async emitWorkflowRunActivity(event: { + workspaceId: string; + runId: string; + status: WorkflowRunStatus; + }): Promise { + assert(event.workspaceId.length > 0, "emitWorkflowRunActivity requires workspaceId"); + assert(event.runId.length > 0, "emitWorkflowRunActivity requires runId"); + const snapshot = await this.withActiveWorkflowRunCount( + event.workspaceId, + await this.extensionMetadata.getSnapshot(event.workspaceId), + { runId: event.runId, status: event.status } + ); + this.emitWorkspaceActivity(event.workspaceId, snapshot); + } + /** * Public so AgentStatusService can broadcast a snapshot it produced after * a direct setX call. (Most callers use emitWorkspaceActivityUpdate, which @@ -7683,7 +7767,39 @@ export class WorkspaceService extends EventEmitter { async getActivityList(): Promise> { try { const snapshots = await this.extensionMetadata.getAllSnapshots(); - return Object.fromEntries(snapshots.entries()); + const workspaceIds = new Set(snapshots.keys()); + try { + for (const metadata of await this.config.getAllWorkspaceMetadata()) { + workspaceIds.add(metadata.id); + } + } catch (error) { + log.debug("Failed to include all workspaces while listing activity", { error }); + } + + const entries = await Promise.all( + Array.from( + workspaceIds, + async (workspaceId): Promise => { + const snapshot = snapshots.get(workspaceId) ?? null; + const activeWorkflowRunCount = await this.getActiveWorkflowRunCount(workspaceId); + if (snapshot == null && activeWorkflowRunCount === 0) { + return null; + } + return [ + workspaceId, + { + ...(snapshot ?? createDefaultActivitySnapshot()), + ...(activeWorkflowRunCount > 0 ? { activeWorkflowRunCount } : {}), + }, + ] as const; + } + ) + ); + return Object.fromEntries( + entries.filter( + (entry): entry is readonly [string, WorkspaceActivitySnapshot] => entry != null + ) + ); } catch (error) { log.error("Failed to list activity:", error); return {}; From c8d53b4cdafaf3bdfc159f4589921df4d2adfde3 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 16:08:06 +0000 Subject: [PATCH 2/9] =?UTF-8?q?=F0=9F=A4=96=20fix:=20simplify=20workflow?= =?UTF-8?q?=20activity=20status=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cache top-level active workflow run IDs per workspace, use lightweight workflow run status snapshots for activity notifications, centralize workflow status classifiers, and deduplicate sidebar activity indicator markup. --- .../AgentListItem/AgentListItem.tsx | 29 +++--- src/common/types/workflow.ts | 16 +++- src/node/services/taskService.ts | 16 +--- src/node/services/tools/task_await.ts | 19 ++-- .../workflows/WorkflowRunStore.test.ts | 34 +++++++ .../services/workflows/WorkflowRunStore.ts | 47 ++++++++++ .../workflows/WorkflowService.test.ts | 10 +++ .../services/workflows/WorkflowService.ts | 72 +++++++-------- src/node/services/workspaceService.test.ts | 83 +++++++++++++++++ src/node/services/workspaceService.ts | 90 +++++++++++-------- 10 files changed, 301 insertions(+), 115 deletions(-) diff --git a/src/browser/components/AgentListItem/AgentListItem.tsx b/src/browser/components/AgentListItem/AgentListItem.tsx index 69deb4d61d..c8d2aa0e86 100644 --- a/src/browser/components/AgentListItem/AgentListItem.tsx +++ b/src/browser/components/AgentListItem/AgentListItem.tsx @@ -224,19 +224,28 @@ function formatWorkflowRunCount(count: number): string { return count === 1 ? "Workflow running" : `${count} workflows running`; } -function WorkflowActivityIndicator(props: { workspaceId: string; activeWorkflowRunCount: number }) { - const statusText = formatWorkflowRunCount(props.activeWorkflowRunCount); - +function SidebarActivityIndicator(props: { text: string; testId: string }) { return (
- {statusText} + {props.text}
); } +function WorkflowActivityIndicator(props: { workspaceId: string; activeWorkflowRunCount: number }) { + const statusText = formatWorkflowRunCount(props.activeWorkflowRunCount); + + return ( + + ); +} + function DelegatedActivityIndicator(props: { workspaceId: string; activity: WorkspaceDelegatedActivity; @@ -247,12 +256,10 @@ function DelegatedActivityIndicator(props: { } return ( -
- {statusText} -
+ ); } diff --git a/src/common/types/workflow.ts b/src/common/types/workflow.ts index d85af42755..44fa3c3d8c 100644 --- a/src/common/types/workflow.ts +++ b/src/common/types/workflow.ts @@ -41,7 +41,21 @@ export type WorkflowStepRecord = z.infer; export type WorkflowRunParent = z.infer; export type WorkflowRunRecord = z.infer; -export function isNestedWorkflowRun(run: WorkflowRunRecord): boolean { +const ACTIVE_WORKFLOW_RUN_STATUSES = new Set([ + "pending", + "running", + "backgrounded", +]); + +export function isActiveWorkflowRunStatus(status: WorkflowRunStatus): boolean { + return ACTIVE_WORKFLOW_RUN_STATUSES.has(status); +} + +export function isTerminalWorkflowRunStatus(status: WorkflowRunStatus): boolean { + return status === "completed" || status === "failed" || status === "interrupted"; +} + +export function isNestedWorkflowRun(run: { parentWorkflow?: WorkflowRunParent | null }): boolean { return run.parentWorkflow != null; } diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index 352590d88a..5e5f4b62a0 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -74,7 +74,7 @@ import type { WorkspaceGoalService } from "@/node/services/workspaceGoalService" import { getTotalCost, sumUsageHistory } from "@/common/utils/tokens/usageAggregator"; import type { ParsedThinkingInput, ThinkingLevel } from "@/common/types/thinking"; import type { ErrorEvent, StreamEndEvent } from "@/common/types/stream"; -import type { WorkflowRunStatus } from "@/common/types/workflow"; +import { isActiveWorkflowRunStatus, isTerminalWorkflowRunStatus } from "@/common/types/workflow"; import { isDynamicToolPart, type DynamicToolPart } from "@/common/types/toolParts"; import { isWorkflowDisplayOnlyMessage, @@ -410,12 +410,6 @@ function isWorkspaceBusyIdleOnlySend(error: unknown): boolean { ); } -const ACTIVE_BACKGROUND_WORKFLOW_RUN_STATUSES = new Set([ - "pending", - "running", - "backgrounded", -]); - const COMPLETED_REPORT_CACHE_MAX_ENTRIES = 128; /** Maximum consecutive auto-resumes before stopping. Prevents infinite loops when descendants are stuck. */ @@ -690,10 +684,6 @@ function isWorkflowSupersessionMessage(message: MuxMessage): boolean { return isManualUserSupersessionMessage(message) || isResetBoundaryMessage(message); } -function isTerminalWorkflowRunStatus(status: WorkflowRunStatus): boolean { - return status === "completed" || status === "failed" || status === "interrupted"; -} - function isFailedWorkflowRunSnapshot(value: unknown, runId: string): boolean { if (value == null || typeof value !== "object") { return false; @@ -978,7 +968,7 @@ export class TaskService { (run) => referencedRunIdSet.has(run.id) && run.workspaceId === workspaceId && - ACTIVE_BACKGROUND_WORKFLOW_RUN_STATUSES.has(run.status) + isActiveWorkflowRunStatus(run.status) ) .map((run) => run.id); } catch (error: unknown) { @@ -1011,7 +1001,7 @@ export class TaskService { if (!referencedRunIdSet.has(run.id) || run.workspaceId !== workspaceId) { continue; } - if (ACTIVE_BACKGROUND_WORKFLOW_RUN_STATUSES.has(run.status)) { + if (isActiveWorkflowRunStatus(run.status)) { blockingRunIds.push(run.id); continue; } diff --git a/src/node/services/tools/task_await.ts b/src/node/services/tools/task_await.ts index abb7883b07..532e2e077f 100644 --- a/src/node/services/tools/task_await.ts +++ b/src/node/services/tools/task_await.ts @@ -10,9 +10,10 @@ import { } from "@/common/utils/tools/toolDefinitions"; import { canRetryWorkflowFromCheckpoint } from "@/common/utils/workflowRetryEligibility"; import { + isActiveWorkflowRunStatus, isNestedWorkflowRun, + isTerminalWorkflowRunStatus, type WorkflowRunRecord, - type WorkflowRunStatus, } from "@/common/types/workflow"; import { fromBashTaskId, isWorkflowRunTaskId, toBashTaskId } from "./taskId"; @@ -93,14 +94,6 @@ function buildTaskAwaitSequencingError(taskId: string, suggestedTaskIds: string[ }; } -function isWorkflowRunAwaitableStatus(status: WorkflowRunStatus): boolean { - return status === "pending" || status === "running" || status === "backgrounded"; -} - -function isWorkflowRunTerminalStatus(status: WorkflowRunStatus): boolean { - return status === "completed" || status === "failed" || status === "interrupted"; -} - function parseWorkflowRun(value: unknown): WorkflowRunRecord { return WorkflowRunRecordSchema.parse(value); } @@ -111,7 +104,7 @@ function getWorkflowRunElapsedMs(run: WorkflowRunRecord): number | undefined { return undefined; } const updatedAtMs = parseTimestampMs(run.updatedAt); - const endAtMs = isWorkflowRunTerminalStatus(run.status) ? updatedAtMs : Date.now(); + const endAtMs = isTerminalWorkflowRunStatus(run.status) ? updatedAtMs : Date.now(); return Math.max(0, (endAtMs ?? Date.now()) - createdAtMs); } @@ -286,7 +279,7 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => { const parsed = WorkflowRunRecordSchema.safeParse(rawRun); if ( !parsed.success || - !isWorkflowRunAwaitableStatus(parsed.data.status) || + !isActiveWorkflowRunStatus(parsed.data.status) || isNestedWorkflowRun(parsed.data) ) { continue; @@ -384,12 +377,12 @@ export const createTaskAwaitTool: ToolFactory = (config: ToolConfiguration) => { if (run == null) { return { status: "not_found" as const, taskId: runId }; } - if (timeoutMs === 0 || isWorkflowRunTerminalStatus(run.status)) { + if (timeoutMs === 0 || isTerminalWorkflowRunStatus(run.status)) { return buildWorkflowAwaitResult(run); } const deadline = Date.now() + (timeoutMs ?? DEFAULT_TASK_AWAIT_TIMEOUT_MS); - while (!isWorkflowRunTerminalStatus(run.status)) { + while (!isTerminalWorkflowRunStatus(run.status)) { if (abortSignal?.aborted) { return { status: "error" as const, taskId: runId, error: "Interrupted" }; } diff --git a/src/node/services/workflows/WorkflowRunStore.test.ts b/src/node/services/workflows/WorkflowRunStore.test.ts index 3013fe3ee2..2329a6ae55 100644 --- a/src/node/services/workflows/WorkflowRunStore.test.ts +++ b/src/node/services/workflows/WorkflowRunStore.test.ts @@ -48,6 +48,40 @@ describe("WorkflowRunStore", () => { expect(run.events.map((event) => event.sequence)).toEqual([1]); }); + test("lists lightweight run status snapshots without hydrating journals or source", async () => { + using tmp = new DisposableTempDir("workflow-runs-status-snapshots"); + const store = await createStore(tmp.path); + await store.createRun({ + id: "wfr_child", + workspaceId: "workspace-1", + definition, + definitionSource: source, + args: {}, + parentWorkflow: { runId: "wfr_123", stepId: "child", inputHash: "hash", depth: 0 }, + now: "2026-05-29T00:00:01.000Z", + }); + await store.appendStatus("wfr_123", "running", "2026-05-29T00:00:02.000Z"); + + await fs.writeFile(path.join(tmp.path, "workflows", "wfr_123", "definition.js"), "broken"); + await fs.writeFile( + path.join(tmp.path, "workflows", "wfr_123", "events.jsonl"), + "{not-json}\n", + "utf-8" + ); + + await expect(store.getRun("wfr_123")).resolves.toMatchObject({ definitionSource: "broken" }); + const snapshots = await store.listRunStatusSnapshots(); + + expect(snapshots).toHaveLength(2); + expect(snapshots[0]).toMatchObject({ + id: "wfr_123", + workspaceId: "workspace-1", + status: "running", + }); + expect(snapshots[1]?.id).toBe("wfr_child"); + expect(snapshots[1]?.parentWorkflow?.runId).toBe("wfr_123"); + }); + test("rejects invalid run ids before resolving run file paths", async () => { using tmp = new DisposableTempDir("workflow-runs"); const store = new WorkflowRunStore({ sessionDir: tmp.path }); diff --git a/src/node/services/workflows/WorkflowRunStore.ts b/src/node/services/workflows/WorkflowRunStore.ts index ebba5ac3c6..563a18932a 100644 --- a/src/node/services/workflows/WorkflowRunStore.ts +++ b/src/node/services/workflows/WorkflowRunStore.ts @@ -3,6 +3,8 @@ import type { Dirent } from "node:fs"; import * as fs from "node:fs/promises"; import * as path from "node:path"; +import type { z } from "zod"; + import writeFileAtomic from "write-file-atomic"; import { @@ -25,6 +27,17 @@ import assert from "@/common/utils/assert"; import { getErrorMessage } from "@/common/utils/errors"; import { log } from "@/node/services/log"; +const WorkflowRunStatusSnapshotSchema = WorkflowRunRecordSchema.pick({ + id: true, + workspaceId: true, + status: true, + parentWorkflow: true, + createdAt: true, + updatedAt: true, +}); + +export type WorkflowRunStatusSnapshot = z.infer; + export interface WorkflowRunStoreOptions { sessionDir: string; staleLeaseMs?: number; @@ -192,6 +205,40 @@ export class WorkflowRunStore { return await this.getRunUnlocked(runId); } + async getRunStatusSnapshot(runId: string): Promise { + assertValidWorkflowRunId(runId); + const rawRun = JSON.parse(await fs.readFile(this.runFile(runId), "utf-8")) as unknown; + return WorkflowRunStatusSnapshotSchema.parse(rawRun); + } + + async listRunStatusSnapshots(): Promise { + let entries: Dirent[]; + try { + entries = await fs.readdir(this.workflowsDir(), { withFileTypes: true }); + } catch { + return []; + } + + const snapshots = await Promise.all( + entries + .filter((entry) => entry.isDirectory()) + .map(async (entry): Promise => { + try { + return await this.getRunStatusSnapshot(entry.name); + } catch (error) { + log.warn( + `Skipping unreadable workflow run status '${entry.name}': ${getErrorMessage(error)}` + ); + return null; + } + }) + ); + + return snapshots + .filter((snapshot): snapshot is WorkflowRunStatusSnapshot => snapshot != null) + .sort((a, b) => a.createdAt.localeCompare(b.createdAt)); + } + async listRuns(): Promise { let entries: Dirent[]; try { diff --git a/src/node/services/workflows/WorkflowService.test.ts b/src/node/services/workflows/WorkflowService.test.ts index 3fb399e4b8..6d658e384c 100644 --- a/src/node/services/workflows/WorkflowService.test.ts +++ b/src/node/services/workflows/WorkflowService.test.ts @@ -1793,9 +1793,13 @@ export default function workflow({ action }) { const runStore = new WorkflowRunStore({ sessionDir: tmp.path }); let calls = 0; const backgroundFlags: Array = []; + const statusEvents: Array<{ workspaceId: string; runId: string; status: string }> = []; const service = new WorkflowService({ definitionStore: new WorkflowDefinitionStore({ projectRoot, globalRoot, builtIns: [] }), runStore, + onRunStatusChanged: (event) => { + statusEvents.push(event); + }, runtimeFactory: new QuickJSRuntimeFactory(), taskAdapter: { async runAgent(_spec, _lifecycle, waitOptions) { @@ -1823,6 +1827,12 @@ export default function workflow({ action }) { await waitForWorkflowRunFileStatus(tmp.path, "wfr_backgrounded", "completed"); expect(calls).toBe(2); expect(backgroundFlags).toEqual([true, false]); + await waitForCondition("backgrounded workflow status events", () => statusEvents.length === 3); + expect(statusEvents).toEqual([ + { workspaceId: "workspace-1", runId: "wfr_backgrounded", status: "pending" }, + { workspaceId: "workspace-1", runId: "wfr_backgrounded", status: "backgrounded" }, + { workspaceId: "workspace-1", runId: "wfr_backgrounded", status: "completed" }, + ]); }); test("can keep foreground workflow waits in the foreground", async () => { diff --git a/src/node/services/workflows/WorkflowService.ts b/src/node/services/workflows/WorkflowService.ts index 856db0d969..3bfa43d946 100644 --- a/src/node/services/workflows/WorkflowService.ts +++ b/src/node/services/workflows/WorkflowService.ts @@ -1,10 +1,11 @@ import * as crypto from "node:crypto"; -import type { - WorkflowActionDescriptor, - WorkflowDefinitionDescriptor, - WorkflowRunRecord, - WorkflowRunStatus, +import { + isActiveWorkflowRunStatus, + type WorkflowActionDescriptor, + type WorkflowDefinitionDescriptor, + type WorkflowRunRecord, + type WorkflowRunStatus, } from "@/common/types/workflow"; import assert from "@/common/utils/assert"; import { getErrorMessage } from "@/common/utils/errors"; @@ -19,7 +20,7 @@ import type { WorkflowDefinitionSummary, WorkflowPromotionLocation, } from "./WorkflowDefinitionStore"; -import type { WorkflowRunStore } from "./WorkflowRunStore"; +import type { WorkflowRunStatusSnapshot, WorkflowRunStore } from "./WorkflowRunStore"; import { WorkflowRunBackgroundedError, WorkflowRunner, @@ -283,7 +284,7 @@ export class WorkflowService { } private async notifyRunStatusChanged( - run: WorkflowRunRecord, + run: WorkflowRunStatusSnapshot, status: WorkflowRunStatus = run.status ): Promise { if (this.onRunStatusChanged == null || run.parentWorkflow != null) { @@ -301,7 +302,7 @@ export class WorkflowService { return; } try { - await this.notifyRunStatusChanged(await this.runStore.getRun(runId)); + await this.notifyRunStatusChanged(await this.runStore.getRunStatusSnapshot(runId)); } catch (error) { console.error("Failed to load workflow run for activity notification:", error); } @@ -385,7 +386,7 @@ export class WorkflowService { assertWorkflowRunCanTransition(run.status, "running"); return await this.runForegroundWithAbortInterrupt({ workspaceId: input.workspaceId, - runId: input.runId, + run, projectTrusted: input.projectTrusted, abortSignal: input.abortSignal, runnerOptions: { allowResumeFromInterrupted: run.status === "interrupted" }, @@ -404,7 +405,7 @@ export class WorkflowService { assertWorkflowRunCanRetryFromCheckpoint(run); return await this.runForegroundWithAbortInterrupt({ workspaceId: input.workspaceId, - runId: input.runId, + run, projectTrusted: input.projectTrusted, abortSignal: input.abortSignal, runnerOptions: { allowRetryFromFailedCheckpoint: true }, @@ -418,7 +419,7 @@ export class WorkflowService { */ private async runForegroundWithAbortInterrupt(input: { workspaceId: string; - runId: string; + run: WorkflowRunStatusSnapshot; projectTrusted: boolean; abortSignal?: AbortSignal; runnerOptions: Pick< @@ -427,36 +428,37 @@ export class WorkflowService { >; backgroundedFailureMessage: string; }): Promise { + const runId = input.run.id; if (isAbortSignalAborted(input.abortSignal)) { // The caller was aborted before the runner started; leave the run in its current // (still resumable) state instead of churning status transitions. - throw new Error(`Workflow run interrupted: ${input.runId}`); + throw new Error(`Workflow run interrupted: ${runId}`); } - await this.notifyRunStatusChanged(await this.runStore.getRun(input.runId), "running"); + await this.notifyRunStatusChanged(input.run, "running"); const runnerAbortController = new AbortController(); let unregisterRunnerAbort: () => void = () => undefined; const abortInterrupt = this.interruptRunOnAbort( input.workspaceId, - input.runId, + runId, input.abortSignal, runnerAbortController ); try { - const runner = await this.createRunner(input.runId, input.projectTrusted); - const result = await runner.run(input.runId, { + const runner = await this.createRunner(runId, input.projectTrusted); + const result = await runner.run(runId, { abortSignal: runnerAbortController.signal, onLeaseAcquired: () => { unregisterRunnerAbort = this.registerActiveRunnerAbortController( - input.runId, + runId, runnerAbortController ); }, ...input.runnerOptions, }); - await this.notifyLatestRunStatus(input.runId); - return { runId: input.runId, status: "completed", result }; + await this.notifyRunStatusChanged(input.run, "completed"); + return { runId, status: "completed", result }; } catch (error) { if (error instanceof WorkflowRunBackgroundedError) { // The runner durably appended `backgrounded` before throwing, so the continuation @@ -467,21 +469,21 @@ export class WorkflowService { // silently reverted back to `running`. Likewise skip the continuation entirely // when this call was aborted (interruptRunOnAbort aborts our runner controller and // is concurrently transitioning the run to `interrupted`). - await this.notifyLatestRunStatus(input.runId); + await this.notifyRunStatusChanged(input.run, "backgrounded"); if (!runnerAbortController.signal.aborted) { - void this.runInBackground(input.runId, input.backgroundedFailureMessage, { + void this.runInBackground(runId, input.backgroundedFailureMessage, { projectTrusted: input.projectTrusted, }).catch(() => undefined); } - return { runId: input.runId, status: "backgrounded", result: null }; + return { runId, status: "backgrounded", result: null }; } - await this.notifyLatestRunStatus(input.runId); + await this.notifyLatestRunStatus(runId); throw error; } finally { abortInterrupt.remove(); try { await abortInterrupt.wait(); - await this.ensureInterruptedAfterAbort(input.workspaceId, input.runId, input.abortSignal); + await this.ensureInterruptedAfterAbort(input.workspaceId, runId, input.abortSignal); } finally { unregisterRunnerAbort(); } @@ -586,11 +588,11 @@ export class WorkflowService { ); }, }); - await this.notifyLatestRunStatus(runId); + await this.notifyRunStatusChanged(createdRun, "completed"); return { runId, status: "completed", result }; } catch (error) { if (error instanceof WorkflowRunBackgroundedError) { - await this.notifyLatestRunStatus(runId); + await this.notifyRunStatusChanged(createdRun, "backgrounded"); void this.runInBackground(runId, "Backgrounded workflow run failed:", { projectTrusted: input.projectTrusted, }).catch(() => undefined); @@ -786,6 +788,7 @@ export class WorkflowService { projectTrusted: boolean; } ): Promise { + const runStatus = await this.runStore.getRunStatusSnapshot(runId); const runner = await this.createRunner(runId, runnerOptions.projectTrusted); const runnerAbortController = new AbortController(); let unregisterRunnerAbort: () => void = () => undefined; @@ -827,7 +830,7 @@ export class WorkflowService { ...runnerOptions, }) .then(async (result) => { - await this.notifyLatestRunStatus(runId); + await this.notifyRunStatusChanged(runStatus, "completed"); await this.notifyBackgroundRunTerminal(runId, result); }) .catch(async (error: unknown) => { @@ -900,7 +903,7 @@ export class WorkflowService { (run) => run.workspaceId === workspaceId && run.parentWorkflow?.runId === parentRunId && - (isInterruptibleWorkflowRunStatus(run.status) || run.status === "interrupted") + (isActiveWorkflowRunStatus(run.status) || run.status === "interrupted") ); for (const child of activeChildren) { await this.interruptChildWorkflowRun(child, workspaceId, seenRunIds); @@ -918,10 +921,7 @@ export class WorkflowService { seenRunIds.add(child.id); this.abortActiveRunner(child.id); const interrupted = await this.appendInterruptedIfActive(child.id); - if ( - !isInterruptibleWorkflowRunStatus(interrupted.status) && - interrupted.status !== "interrupted" - ) { + if (!isActiveWorkflowRunStatus(interrupted.status) && interrupted.status !== "interrupted") { return; } await this.interruptChildWorkflowRuns(child.id, workspaceId, seenRunIds); @@ -930,7 +930,7 @@ export class WorkflowService { private async appendInterruptedIfActive(runId: string): Promise { const run = await this.runStore.getRun(runId); - if (!isInterruptibleWorkflowRunStatus(run.status)) { + if (!isActiveWorkflowRunStatus(run.status)) { return run; } try { @@ -941,7 +941,7 @@ export class WorkflowService { ); } catch (error) { const latest = await this.runStore.getRun(runId); - if (!isInterruptibleWorkflowRunStatus(latest.status)) { + if (!isActiveWorkflowRunStatus(latest.status)) { return latest; } throw error; @@ -1235,10 +1235,6 @@ function isAbortSignalAborted(abortSignal?: AbortSignal): boolean { return abortSignal?.aborted === true; } -function isInterruptibleWorkflowRunStatus(status: WorkflowRunStatus): boolean { - return status === "pending" || status === "running" || status === "backgrounded"; -} - function getWorkflowRunResult(run: WorkflowRunRecord): unknown { return run.events.findLast((event) => event.type === "result")?.result ?? null; } diff --git a/src/node/services/workspaceService.test.ts b/src/node/services/workspaceService.test.ts index 0553d05f4b..0f3eaa5f5f 100644 --- a/src/node/services/workspaceService.test.ts +++ b/src/node/services/workspaceService.test.ts @@ -47,6 +47,7 @@ import * as forkOrchestratorModule from "@/node/services/utils/forkOrchestrator" import * as runtimeExecHelpers from "@/node/utils/runtime/helpers"; import * as removeManagedGitWorktreeModule from "@/node/worktree/removeManagedGitWorktree"; import * as workspaceTitleGenerator from "./workspaceTitleGenerator"; +import { WorkflowRunStore } from "./workflows/WorkflowRunStore"; import { WorkspaceGoalService } from "./workspaceGoalService"; import { IdleDispatcher } from "./idleDispatcher"; import type { GoalRecordV1 } from "@/common/types/goal"; @@ -214,6 +215,88 @@ function createFrontendWorkspaceMetadata( }; } +describe("WorkspaceService workflow activity", () => { + test("caches active workflow run counts and updates emitted activity from status events", async () => { + const { config, historyService, cleanup } = await createTestHistoryService(); + const listStatusSnapshotsSpy = spyOn(WorkflowRunStore.prototype, "listRunStatusSnapshots"); + try { + const workspaceId = "workflow-activity"; + const projectPath = path.join(config.rootDir, "project"); + await config.addWorkspace(projectPath, { + id: workspaceId, + name: "workflow-activity", + projectName: "project", + projectPath, + createdAt: "2026-06-17T00:00:00.000Z", + runtimeConfig: { type: "local" }, + }); + const extensionMetadata = new ExtensionMetadataService( + path.join(config.rootDir, "extensionMetadata.json") + ); + const workspaceService = createWorkspaceServiceForTest({ + config, + historyService, + extensionMetadata, + }); + const runStore = new WorkflowRunStore({ sessionDir: config.getSessionDir(workspaceId) }); + const definition = { + name: "demo", + description: "Demo workflow", + scope: "global" as const, + executable: true, + }; + await runStore.createRun({ + id: "wfr_active", + workspaceId, + definition, + definitionSource: "export default function workflow() { return {}; }", + args: {}, + now: "2026-06-17T00:00:00.000Z", + }); + await runStore.createRun({ + id: "wfr_nested", + workspaceId, + definition, + definitionSource: "export default function workflow() { return {}; }", + args: {}, + parentWorkflow: { runId: "wfr_active", stepId: "child", inputHash: "hash", depth: 0 }, + now: "2026-06-17T00:00:01.000Z", + }); + + expect((await workspaceService.getActivityList())[workspaceId]?.activeWorkflowRunCount).toBe( + 1 + ); + expect((await workspaceService.getActivityList())[workspaceId]?.activeWorkflowRunCount).toBe( + 1 + ); + expect(listStatusSnapshotsSpy).toHaveBeenCalledTimes(1); + + const activityEvents: Array<{ + workspaceId: string; + activity: WorkspaceActivitySnapshot | null; + }> = []; + workspaceService.on("activity", (event) => activityEvents.push(event)); + await workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_active", + status: "completed", + }); + expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBeUndefined(); + + await workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_next", + status: "running", + }); + expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBe(1); + expect(listStatusSnapshotsSpy).toHaveBeenCalledTimes(1); + } finally { + listStatusSnapshotsSpy.mockRestore(); + await cleanup(); + } + }); +}); + describe("WorkspaceService workflow invocation events", () => { test("emits workflow slash invocation rows through the active session chat stream", async () => { const { config, historyService, cleanup } = await createTestHistoryService(); diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 67d41ed9d3..6e57218726 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -128,6 +128,7 @@ import { type MuxMessage, } from "@/common/types/message"; import { + isActiveWorkflowRunStatus, isNestedWorkflowRun, type WorkflowRunRecord, type WorkflowRunStatus, @@ -1459,12 +1460,6 @@ export declare interface WorkspaceService { ): boolean; } -const ACTIVE_WORKFLOW_RUN_STATUSES = new Set([ - "pending", - "running", - "backgrounded", -]); - function createDefaultActivitySnapshot(): WorkspaceActivitySnapshot { return { recency: 0, @@ -1474,6 +1469,20 @@ function createDefaultActivitySnapshot(): WorkspaceActivitySnapshot { }; } +function mergeActiveWorkflowRunCount( + snapshot: WorkspaceActivitySnapshot | null, + activeWorkflowRunCount: number +): WorkspaceActivitySnapshot { + assert(activeWorkflowRunCount >= 0, "active workflow run count must be non-negative"); + const merged: WorkspaceActivitySnapshot = { ...(snapshot ?? createDefaultActivitySnapshot()) }; + if (activeWorkflowRunCount > 0) { + merged.activeWorkflowRunCount = activeWorkflowRunCount; + } else { + delete merged.activeWorkflowRunCount; + } + return merged; +} + // eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging export class WorkspaceService extends EventEmitter { private readonly sessions = new Map(); @@ -1485,6 +1494,9 @@ export class WorkspaceService extends EventEmitter { { chat: () => void; metadata: () => void } >(); + // Lazily bootstrapped workflow activity cache so sidebar refreshes don't rescan run history. + private readonly activeWorkflowRunIdsByWorkspace = new Map>(); + // Debounce post-compaction metadata refreshes (file_edit_* can fire rapidly) private readonly postCompactionRefreshTimers = new Map>(); // Tracks workspaces currently being renamed to prevent streaming during rename @@ -1902,20 +1914,22 @@ export class WorkspaceService extends EventEmitter { }); } - private async getActiveWorkflowRunCount( - workspaceId: string, - changedRun?: { runId: string; status: WorkflowRunStatus } - ): Promise { - assert(workspaceId.length > 0, "getActiveWorkflowRunCount requires workspaceId"); + private async getActiveWorkflowRunIds(workspaceId: string): Promise> { + assert(workspaceId.length > 0, "getActiveWorkflowRunIds requires workspaceId"); + const cached = this.activeWorkflowRunIdsByWorkspace.get(workspaceId); + if (cached != null) { + return cached; + } + const activeRunIds = new Set(); try { const runStore = new WorkflowRunStore({ sessionDir: this.config.getSessionDir(workspaceId) }); - const runs = await runStore.listRuns(); + const runs = await runStore.listRunStatusSnapshots(); for (const run of runs) { if ( run.workspaceId === workspaceId && !isNestedWorkflowRun(run) && - ACTIVE_WORKFLOW_RUN_STATUSES.has(run.status) + isActiveWorkflowRunStatus(run.status) ) { activeRunIds.add(run.id); } @@ -1927,28 +1941,26 @@ export class WorkspaceService extends EventEmitter { }); } - if (changedRun != null) { - if (ACTIVE_WORKFLOW_RUN_STATUSES.has(changedRun.status)) { - activeRunIds.add(changedRun.runId); - } else { - activeRunIds.delete(changedRun.runId); - } - } + this.activeWorkflowRunIdsByWorkspace.set(workspaceId, activeRunIds); + return activeRunIds; + } - return activeRunIds.size; + private async getActiveWorkflowRunCount(workspaceId: string): Promise { + return (await this.getActiveWorkflowRunIds(workspaceId)).size; } - private async withActiveWorkflowRunCount( - workspaceId: string, - snapshot: WorkspaceActivitySnapshot | null, - changedRun?: { runId: string; status: WorkflowRunStatus } - ): Promise { - const activeWorkflowRunCount = await this.getActiveWorkflowRunCount(workspaceId, changedRun); - const baseSnapshot = snapshot ?? createDefaultActivitySnapshot(); - return { - ...baseSnapshot, - ...(activeWorkflowRunCount > 0 ? { activeWorkflowRunCount } : {}), - }; + private async updateActiveWorkflowRunCount(event: { + workspaceId: string; + runId: string; + status: WorkflowRunStatus; + }): Promise { + const activeRunIds = await this.getActiveWorkflowRunIds(event.workspaceId); + if (isActiveWorkflowRunStatus(event.status)) { + activeRunIds.add(event.runId); + } else { + activeRunIds.delete(event.runId); + } + return activeRunIds.size; } public async emitWorkflowRunActivity(event: { @@ -1958,10 +1970,10 @@ export class WorkspaceService extends EventEmitter { }): Promise { assert(event.workspaceId.length > 0, "emitWorkflowRunActivity requires workspaceId"); assert(event.runId.length > 0, "emitWorkflowRunActivity requires runId"); - const snapshot = await this.withActiveWorkflowRunCount( - event.workspaceId, + const activeWorkflowRunCount = await this.updateActiveWorkflowRunCount(event); + const snapshot = mergeActiveWorkflowRunCount( await this.extensionMetadata.getSnapshot(event.workspaceId), - { runId: event.runId, status: event.status } + activeWorkflowRunCount ); this.emitWorkspaceActivity(event.workspaceId, snapshot); } @@ -7768,6 +7780,9 @@ export class WorkspaceService extends EventEmitter { try { const snapshots = await this.extensionMetadata.getAllSnapshots(); const workspaceIds = new Set(snapshots.keys()); + for (const workspaceId of this.activeWorkflowRunIdsByWorkspace.keys()) { + workspaceIds.add(workspaceId); + } try { for (const metadata of await this.config.getAllWorkspaceMetadata()) { workspaceIds.add(metadata.id); @@ -7787,10 +7802,7 @@ export class WorkspaceService extends EventEmitter { } return [ workspaceId, - { - ...(snapshot ?? createDefaultActivitySnapshot()), - ...(activeWorkflowRunCount > 0 ? { activeWorkflowRunCount } : {}), - }, + mergeActiveWorkflowRunCount(snapshot, activeWorkflowRunCount), ] as const; } ) From 78bbcc2c16b0080e6d569488e469a040d0a58ca9 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 16:19:07 +0000 Subject: [PATCH 3/9] Preserve workflow activity across sidebar updates --- src/node/services/workspaceService.test.ts | 14 +++++++ src/node/services/workspaceService.ts | 44 ++++++++++++++++++---- 2 files changed, 51 insertions(+), 7 deletions(-) diff --git a/src/node/services/workspaceService.test.ts b/src/node/services/workspaceService.test.ts index 0f3eaa5f5f..2d20d1381d 100644 --- a/src/node/services/workspaceService.test.ts +++ b/src/node/services/workspaceService.test.ts @@ -289,6 +289,20 @@ describe("WorkspaceService workflow activity", () => { status: "running", }); expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBe(1); + await workspaceService.updateAgentStatus(workspaceId, { + emoji: "๐Ÿ”„", + message: "Still running workflow", + }); + expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBe(1); + + workspaceService.emitWorkspaceActivity(workspaceId, { + recency: Date.now(), + streaming: false, + lastModel: null, + lastThinkingLevel: null, + }); + expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBe(1); + expect(listStatusSnapshotsSpy).toHaveBeenCalledTimes(1); } finally { listStatusSnapshotsSpy.mockRestore(); diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 6e57218726..e346d83cfc 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -1963,6 +1963,27 @@ export class WorkspaceService extends EventEmitter { return activeRunIds.size; } + private mergeCachedActiveWorkflowRunCount( + workspaceId: string, + snapshot: WorkspaceActivitySnapshot | null + ): WorkspaceActivitySnapshot | null { + const activeRunIds = this.activeWorkflowRunIdsByWorkspace.get(workspaceId); + if (activeRunIds == null) { + return snapshot; + } + if (snapshot == null && activeRunIds.size === 0) { + return null; + } + return mergeActiveWorkflowRunCount(snapshot, activeRunIds.size); + } + + private async mergeCurrentActiveWorkflowRunCount( + workspaceId: string, + snapshot: WorkspaceActivitySnapshot + ): Promise { + return mergeActiveWorkflowRunCount(snapshot, await this.getActiveWorkflowRunCount(workspaceId)); + } + public async emitWorkflowRunActivity(event: { workspaceId: string; runId: string; @@ -1987,7 +2008,10 @@ export class WorkspaceService extends EventEmitter { workspaceId: string, snapshot: WorkspaceActivitySnapshot | null ): void { - this.emit("activity", { workspaceId, activity: snapshot }); + this.emit("activity", { + workspaceId, + activity: this.mergeCachedActiveWorkflowRunCount(workspaceId, snapshot), + }); } private async emitWorkspaceActivityUpdate( @@ -1996,7 +2020,10 @@ export class WorkspaceService extends EventEmitter { update: () => Promise ): Promise { try { - this.emitWorkspaceActivity(workspaceId, await update()); + this.emitWorkspaceActivity( + workspaceId, + await this.mergeCurrentActiveWorkflowRunCount(workspaceId, await update()) + ); } catch (error) { log.error(`Failed to ${description}`, { workspaceId, error }); } @@ -2083,11 +2110,14 @@ export class WorkspaceService extends EventEmitter { const shouldTagCompaction = !streaming && this.compactionStreamGenerations.get(workspaceId) === streamGeneration; const shouldTagIdleCompaction = !streaming && this.idleCompactingWorkspaces.has(workspaceId); - this.emitWorkspaceActivity(workspaceId, { - ...snapshot, - ...(shouldTagCompaction ? { isCompaction: true } : {}), - ...(shouldTagIdleCompaction ? { isIdleCompaction: true } : {}), - }); + this.emitWorkspaceActivity( + workspaceId, + await this.mergeCurrentActiveWorkflowRunCount(workspaceId, { + ...snapshot, + ...(shouldTagCompaction ? { isCompaction: true } : {}), + ...(shouldTagIdleCompaction ? { isIdleCompaction: true } : {}), + }) + ); } catch (error) { log.error("Failed to update workspace streaming status", { workspaceId, error }); } finally { From 20829916c48e82f75cd07abd44fef18625d31f97 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 16:31:24 +0000 Subject: [PATCH 4/9] Wire workflow tool activity updates --- src/node/services/aiService.ts | 17 ++++++++++++++++- src/node/services/coreServices.ts | 7 ++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/node/services/aiService.ts b/src/node/services/aiService.ts index 01bf973c21..b2c339abf7 100644 --- a/src/node/services/aiService.ts +++ b/src/node/services/aiService.ts @@ -152,7 +152,10 @@ import { WorkflowDefinitionStore, } from "@/node/services/workflows/WorkflowDefinitionStore"; import { WorkflowRunStore } from "@/node/services/workflows/WorkflowRunStore"; -import { WorkflowService } from "@/node/services/workflows/WorkflowService"; +import { + WorkflowService, + type WorkflowRunStatusChangedEvent, +} from "@/node/services/workflows/WorkflowService"; import { WorkflowTaskServiceAdapter } from "@/node/services/workflows/WorkflowTaskServiceAdapter"; import { resolveWorkflowScratchRoots } from "@/node/services/workflows/workflowScratchRoots"; import { isProjectTrusted } from "@/node/utils/projectTrust"; @@ -438,6 +441,9 @@ export class AIService extends EventEmitter { private workflowHostActions?: ReadonlyMap; private memoryService?: MemoryService; private extraTools?: Record; + private onWorkflowRunStatusChanged?: ( + event: WorkflowRunStatusChangedEvent + ) => Promise | void; private workflowResultContinuationSender?: WorkflowResultContinuationSender; private analyticsService?: { executeRawQuery(sql: string): Promise }; private desktopSessionManager?: DesktopSessionManager; @@ -594,6 +600,12 @@ export class AIService extends EventEmitter { } } + setWorkflowRunStatusChangedHandler( + handler: (event: WorkflowRunStatusChangedEvent) => Promise | void + ): void { + this.onWorkflowRunStatusChanged = handler; + } + setWorkflowResultContinuationSender(sender: WorkflowResultContinuationSender): void { this.workflowResultContinuationSender = sender; } @@ -1754,6 +1766,9 @@ export class AIService extends EventEmitter { runStore: new WorkflowRunStore({ sessionDir: this.config.getSessionDir(workspaceId), }), + ...(this.onWorkflowRunStatusChanged != null + ? { onRunStatusChanged: this.onWorkflowRunStatusChanged } + : {}), // workspace.* built-ins run in-process with backend services. actionRunner: new WorkflowActionRunner({ hostActions: this.workflowHostActions, diff --git a/src/node/services/coreServices.ts b/src/node/services/coreServices.ts index e3f7f24f13..73bc83520a 100644 --- a/src/node/services/coreServices.ts +++ b/src/node/services/coreServices.ts @@ -160,12 +160,17 @@ export function createCoreServices(opts: CoreServicesOptions): CoreServices { opts.sessionTimingService, opts.opResolver ); + // Tool-started workflows share the same sidebar activity cache as ORPC-started workflows, + // so terminal updates must prune active run counts regardless of launch path. + aiService.setWorkflowRunStatusChangedHandler((event) => + workspaceService.emitWorkflowRunActivity(event) + ); aiService.setWorkflowResultContinuationSender(workspaceService); workspaceService.setMemoryConsolidationService(memoryConsolidationService); workspaceService.setMCPServerManager(mcpServerManager); workspaceService.setWorkspaceGoalService(workspaceGoalService); workspaceGoalService.setOnActivityChange((workspaceId, snapshot) => { - workspaceService.emit("activity", { workspaceId, activity: snapshot }); + workspaceService.emitWorkspaceActivity(workspaceId, snapshot); }); // Wire user-initiated `promoteUpcomingGoal` through `interruptStream` // so promoting mid-stream cleanly aborts the in-flight turn before From 8386b413a6c064b07884b40cd10660ea92915cbd Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 16:39:00 +0000 Subject: [PATCH 5/9] Serialize workflow activity cache bootstrap --- src/node/services/workspaceService.test.ts | 55 ++++++++++++++++++++++ src/node/services/workspaceService.ts | 42 +++++++++++++---- 2 files changed, 88 insertions(+), 9 deletions(-) diff --git a/src/node/services/workspaceService.test.ts b/src/node/services/workspaceService.test.ts index 2d20d1381d..324a6e2224 100644 --- a/src/node/services/workspaceService.test.ts +++ b/src/node/services/workspaceService.test.ts @@ -309,6 +309,61 @@ describe("WorkspaceService workflow activity", () => { await cleanup(); } }); + + test("shares initial active workflow cache bootstrap across parallel status events", async () => { + const { config, historyService, cleanup } = await createTestHistoryService(); + const scanStarted = createDeferred(); + const releaseScan = createDeferred(); + const listStatusSnapshotsSpy = spyOn( + WorkflowRunStore.prototype, + "listRunStatusSnapshots" + ).mockImplementation(async () => { + scanStarted.resolve(); + await releaseScan.promise; + return []; + }); + + try { + const workspaceId = "workflow-activity-race"; + const workspaceService = createWorkspaceServiceForTest({ + config, + historyService, + extensionMetadata: new ExtensionMetadataService( + path.join(config.rootDir, "extensionMetadata.json") + ), + }); + const activityEvents: Array<{ + workspaceId: string; + activity: WorkspaceActivitySnapshot | null; + }> = []; + workspaceService.on("activity", (event) => activityEvents.push(event)); + + const first = workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_first", + status: "running", + }); + await scanStarted.promise; + const second = workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_second", + status: "running", + }); + + releaseScan.resolve(); + await Promise.all([first, second]); + + expect(listStatusSnapshotsSpy).toHaveBeenCalledTimes(1); + expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBe(2); + expect((await workspaceService.getActivityList())[workspaceId]?.activeWorkflowRunCount).toBe( + 2 + ); + } finally { + listStatusSnapshotsSpy.mockRestore(); + releaseScan.resolve(); + await cleanup(); + } + }); }); describe("WorkspaceService workflow invocation events", () => { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index e346d83cfc..8edde9b96a 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -1495,6 +1495,10 @@ export class WorkspaceService extends EventEmitter { >(); // Lazily bootstrapped workflow activity cache so sidebar refreshes don't rescan run history. + private readonly activeWorkflowRunIdBootstrapsByWorkspace = new Map< + string, + Promise> + >(); private readonly activeWorkflowRunIdsByWorkspace = new Map>(); // Debounce post-compaction metadata refreshes (file_edit_* can fire rapidly) @@ -1914,14 +1918,10 @@ export class WorkspaceService extends EventEmitter { }); } - private async getActiveWorkflowRunIds(workspaceId: string): Promise> { - assert(workspaceId.length > 0, "getActiveWorkflowRunIds requires workspaceId"); - const cached = this.activeWorkflowRunIdsByWorkspace.get(workspaceId); - if (cached != null) { - return cached; - } - - const activeRunIds = new Set(); + private async populateActiveWorkflowRunIds( + workspaceId: string, + activeRunIds: Set + ): Promise> { try { const runStore = new WorkflowRunStore({ sessionDir: this.config.getSessionDir(workspaceId) }); const runs = await runStore.listRunStatusSnapshots(); @@ -1940,9 +1940,33 @@ export class WorkspaceService extends EventEmitter { error, }); } + return activeRunIds; + } + private async getActiveWorkflowRunIds(workspaceId: string): Promise> { + assert(workspaceId.length > 0, "getActiveWorkflowRunIds requires workspaceId"); + const cached = this.activeWorkflowRunIdsByWorkspace.get(workspaceId); + if (cached != null) { + const bootstrap = this.activeWorkflowRunIdBootstrapsByWorkspace.get(workspaceId); + if (bootstrap != null) { + await bootstrap; + } + return cached; + } + + // Install the shared Set before awaiting disk so parallel workflow status events + // mutate the same cache instead of racing to replace each other after bootstrap. + const activeRunIds = new Set(); this.activeWorkflowRunIdsByWorkspace.set(workspaceId, activeRunIds); - return activeRunIds; + const bootstrap = this.populateActiveWorkflowRunIds(workspaceId, activeRunIds); + this.activeWorkflowRunIdBootstrapsByWorkspace.set(workspaceId, bootstrap); + try { + return await bootstrap; + } finally { + if (this.activeWorkflowRunIdBootstrapsByWorkspace.get(workspaceId) === bootstrap) { + this.activeWorkflowRunIdBootstrapsByWorkspace.delete(workspaceId); + } + } } private async getActiveWorkflowRunCount(workspaceId: string): Promise { From 65bb30d8bbe7620dd90ac24490ac785cb583383d Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 16:49:22 +0000 Subject: [PATCH 6/9] Emit current workflow activity counts --- src/node/services/workspaceService.test.ts | 69 ++++++++++++++++++++++ src/node/services/workspaceService.ts | 9 ++- 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/src/node/services/workspaceService.test.ts b/src/node/services/workspaceService.test.ts index 324a6e2224..7e260a61db 100644 --- a/src/node/services/workspaceService.test.ts +++ b/src/node/services/workspaceService.test.ts @@ -364,6 +364,75 @@ describe("WorkspaceService workflow activity", () => { await cleanup(); } }); + + test("emits current workflow count after overlapping metadata snapshot reads", async () => { + const { config, historyService, cleanup } = await createTestHistoryService(); + const firstSnapshotStarted = createDeferred(); + const releaseFirstSnapshot = createDeferred(); + const extensionMetadata = new ExtensionMetadataService( + path.join(config.rootDir, "extensionMetadata.json") + ); + const getSnapshotSpy = spyOn(extensionMetadata, "getSnapshot"); + + try { + const workspaceId = "workflow-activity-overlap"; + const workspaceService = createWorkspaceServiceForTest({ + config, + historyService, + extensionMetadata, + }); + await workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_first", + status: "running", + }); + await workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_second", + status: "running", + }); + + let shouldDelayNextSnapshot = true; + getSnapshotSpy.mockImplementation(async (id: string) => { + if (shouldDelayNextSnapshot) { + shouldDelayNextSnapshot = false; + firstSnapshotStarted.resolve(); + await releaseFirstSnapshot.promise; + } + return ExtensionMetadataService.prototype.getSnapshot.call(extensionMetadata, id); + }); + const activityEvents: Array<{ + workspaceId: string; + activity: WorkspaceActivitySnapshot | null; + }> = []; + workspaceService.on("activity", (event) => activityEvents.push(event)); + + const first = workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_first", + status: "completed", + }); + await firstSnapshotStarted.promise; + const second = workspaceService.emitWorkflowRunActivity({ + workspaceId, + runId: "wfr_second", + status: "completed", + }); + + await second; + releaseFirstSnapshot.resolve(); + await first; + + expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBeUndefined(); + expect( + (await workspaceService.getActivityList())[workspaceId]?.activeWorkflowRunCount + ).toBeUndefined(); + } finally { + getSnapshotSpy.mockRestore(); + releaseFirstSnapshot.resolve(); + await cleanup(); + } + }); }); describe("WorkspaceService workflow invocation events", () => { diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index 8edde9b96a..bb596c4644 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -2015,12 +2015,11 @@ export class WorkspaceService extends EventEmitter { }): Promise { assert(event.workspaceId.length > 0, "emitWorkflowRunActivity requires workspaceId"); assert(event.runId.length > 0, "emitWorkflowRunActivity requires runId"); - const activeWorkflowRunCount = await this.updateActiveWorkflowRunCount(event); - const snapshot = mergeActiveWorkflowRunCount( - await this.extensionMetadata.getSnapshot(event.workspaceId), - activeWorkflowRunCount + await this.updateActiveWorkflowRunCount(event); + this.emitWorkspaceActivity( + event.workspaceId, + await this.extensionMetadata.getSnapshot(event.workspaceId) ); - this.emitWorkspaceActivity(event.workspaceId, snapshot); } /** From 501526ffa90328a3aa981b63bbc4e19e4d7e5075 Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 16:58:41 +0000 Subject: [PATCH 7/9] Reconcile workflow status snapshots --- .../workflows/WorkflowRunStore.test.ts | 22 ++++++++++++ .../services/workflows/WorkflowRunStore.ts | 36 ++++++++++++++----- 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/src/node/services/workflows/WorkflowRunStore.test.ts b/src/node/services/workflows/WorkflowRunStore.test.ts index 2329a6ae55..02eabb4757 100644 --- a/src/node/services/workflows/WorkflowRunStore.test.ts +++ b/src/node/services/workflows/WorkflowRunStore.test.ts @@ -82,6 +82,28 @@ describe("WorkflowRunStore", () => { expect(snapshots[1]?.parentWorkflow?.runId).toBe("wfr_123"); }); + test("reconciles active status snapshots with terminal journal events", async () => { + using tmp = new DisposableTempDir("workflow-runs-status-reconcile"); + const store = await createStore(tmp.path); + await store.appendStatus("wfr_123", "running", "2026-05-29T00:00:01.000Z"); + await store.appendStatus("wfr_123", "completed", "2026-05-29T00:00:02.000Z"); + + const runFile = path.join(tmp.path, "workflows", "wfr_123", "run.json"); + const staleRun = JSON.parse(await fs.readFile(runFile, "utf-8")) as Record; + staleRun.status = "running"; + staleRun.updatedAt = "2026-05-29T00:00:01.000Z"; + await fs.writeFile(runFile, JSON.stringify(staleRun, null, 2), "utf-8"); + + await expect(store.getRun("wfr_123")).resolves.toMatchObject({ + status: "completed", + updatedAt: "2026-05-29T00:00:02.000Z", + }); + await expect(store.getRunStatusSnapshot("wfr_123")).resolves.toMatchObject({ + status: "completed", + updatedAt: "2026-05-29T00:00:02.000Z", + }); + }); + test("rejects invalid run ids before resolving run file paths", async () => { using tmp = new DisposableTempDir("workflow-runs"); const store = new WorkflowRunStore({ sessionDir: tmp.path }); diff --git a/src/node/services/workflows/WorkflowRunStore.ts b/src/node/services/workflows/WorkflowRunStore.ts index 563a18932a..cee1eaff37 100644 --- a/src/node/services/workflows/WorkflowRunStore.ts +++ b/src/node/services/workflows/WorkflowRunStore.ts @@ -14,14 +14,15 @@ import { WorkflowRunRecordSchema, WorkflowStepRecordSchema, } from "@/common/orpc/schemas"; -import type { - StructuredTaskOutput, - WorkflowDefinitionDescriptor, - WorkflowRunEvent, - WorkflowRunParent, - WorkflowRunRecord, - WorkflowRunStatus, - WorkflowStepRecord, +import { + isActiveWorkflowRunStatus, + type StructuredTaskOutput, + type WorkflowDefinitionDescriptor, + type WorkflowRunEvent, + type WorkflowRunParent, + type WorkflowRunRecord, + type WorkflowRunStatus, + type WorkflowStepRecord, } from "@/common/types/workflow"; import assert from "@/common/utils/assert"; import { getErrorMessage } from "@/common/utils/errors"; @@ -208,7 +209,24 @@ export class WorkflowRunStore { async getRunStatusSnapshot(runId: string): Promise { assertValidWorkflowRunId(runId); const rawRun = JSON.parse(await fs.readFile(this.runFile(runId), "utf-8")) as unknown; - return WorkflowRunStatusSnapshotSchema.parse(rawRun); + const snapshot = WorkflowRunStatusSnapshotSchema.parse(rawRun); + if ( + !isActiveWorkflowRunStatus(snapshot.status) || + (await this.hasActiveWorkflowMutationLock(runId)) + ) { + return snapshot; + } + + // Crash recovery: status events hit the journal before run.json is rewritten. + // Active-looking snapshots must consult the journal so completed workflows do not + // keep the sidebar active forever after a mid-transition crash. + const events = await this.readEvents(runId); + const latestEvent = events.at(-1); + return WorkflowRunStatusSnapshotSchema.parse({ + ...snapshot, + status: getRunStatusFromEvents(events) ?? snapshot.status, + updatedAt: latestEvent?.at ?? snapshot.updatedAt, + }); } async listRunStatusSnapshots(): Promise { From 669b56c3dcc1427f49ecf22052caa8c80db63fbe Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 17:06:48 +0000 Subject: [PATCH 8/9] Reconcile resumed workflow status snapshots --- .../workflows/WorkflowRunStore.test.ts | 24 +++++++++++++++++++ .../services/workflows/WorkflowRunStore.ts | 10 +++----- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/node/services/workflows/WorkflowRunStore.test.ts b/src/node/services/workflows/WorkflowRunStore.test.ts index 02eabb4757..e6d7c2e98e 100644 --- a/src/node/services/workflows/WorkflowRunStore.test.ts +++ b/src/node/services/workflows/WorkflowRunStore.test.ts @@ -104,6 +104,30 @@ describe("WorkflowRunStore", () => { }); }); + test("reconciles inactive status snapshots with resumed journal events", async () => { + using tmp = new DisposableTempDir("workflow-runs-status-resume-reconcile"); + const store = await createStore(tmp.path); + await store.appendStatus("wfr_123", "interrupted", "2026-05-29T00:00:01.000Z"); + await store.appendStatus("wfr_123", "running", "2026-05-29T00:00:02.000Z", { + allowInterruptedResume: true, + }); + + const runFile = path.join(tmp.path, "workflows", "wfr_123", "run.json"); + const staleRun = JSON.parse(await fs.readFile(runFile, "utf-8")) as Record; + staleRun.status = "interrupted"; + staleRun.updatedAt = "2026-05-29T00:00:01.000Z"; + await fs.writeFile(runFile, JSON.stringify(staleRun, null, 2), "utf-8"); + + await expect(store.getRun("wfr_123")).resolves.toMatchObject({ + status: "running", + updatedAt: "2026-05-29T00:00:02.000Z", + }); + await expect(store.getRunStatusSnapshot("wfr_123")).resolves.toMatchObject({ + status: "running", + updatedAt: "2026-05-29T00:00:02.000Z", + }); + }); + test("rejects invalid run ids before resolving run file paths", async () => { using tmp = new DisposableTempDir("workflow-runs"); const store = new WorkflowRunStore({ sessionDir: tmp.path }); diff --git a/src/node/services/workflows/WorkflowRunStore.ts b/src/node/services/workflows/WorkflowRunStore.ts index cee1eaff37..d3bdaeff97 100644 --- a/src/node/services/workflows/WorkflowRunStore.ts +++ b/src/node/services/workflows/WorkflowRunStore.ts @@ -15,7 +15,6 @@ import { WorkflowStepRecordSchema, } from "@/common/orpc/schemas"; import { - isActiveWorkflowRunStatus, type StructuredTaskOutput, type WorkflowDefinitionDescriptor, type WorkflowRunEvent, @@ -210,16 +209,13 @@ export class WorkflowRunStore { assertValidWorkflowRunId(runId); const rawRun = JSON.parse(await fs.readFile(this.runFile(runId), "utf-8")) as unknown; const snapshot = WorkflowRunStatusSnapshotSchema.parse(rawRun); - if ( - !isActiveWorkflowRunStatus(snapshot.status) || - (await this.hasActiveWorkflowMutationLock(runId)) - ) { + if (await this.hasActiveWorkflowMutationLock(runId)) { return snapshot; } // Crash recovery: status events hit the journal before run.json is rewritten. - // Active-looking snapshots must consult the journal so completed workflows do not - // keep the sidebar active forever after a mid-transition crash. + // Status snapshots consult the journal so recovered workflows do not keep stale + // active or inactive sidebar state forever after a mid-transition crash. const events = await this.readEvents(runId); const latestEvent = events.at(-1); return WorkflowRunStatusSnapshotSchema.parse({ From de6de19208b217570086ba3ff48c420914276beb Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Wed, 17 Jun 2026 17:19:57 +0000 Subject: [PATCH 9/9] List cleared workflow activity tombstones --- src/node/services/workspaceService.test.ts | 4 ++++ src/node/services/workspaceService.ts | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/node/services/workspaceService.test.ts b/src/node/services/workspaceService.test.ts index 7e260a61db..ea46b5b146 100644 --- a/src/node/services/workspaceService.test.ts +++ b/src/node/services/workspaceService.test.ts @@ -283,6 +283,10 @@ describe("WorkspaceService workflow activity", () => { }); expect(activityEvents.at(-1)?.activity?.activeWorkflowRunCount).toBeUndefined(); + const clearedActivityList = await workspaceService.getActivityList(); + expect(clearedActivityList[workspaceId]).toBeDefined(); + expect(clearedActivityList[workspaceId]?.activeWorkflowRunCount).toBeUndefined(); + await workspaceService.emitWorkflowRunActivity({ workspaceId, runId: "wfr_next", diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index bb596c4644..3df4092952 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -7849,8 +7849,11 @@ export class WorkspaceService extends EventEmitter { workspaceIds, async (workspaceId): Promise => { const snapshot = snapshots.get(workspaceId) ?? null; + const hadWorkflowActivityCache = this.activeWorkflowRunIdsByWorkspace.has(workspaceId); const activeWorkflowRunCount = await this.getActiveWorkflowRunCount(workspaceId); - if (snapshot == null && activeWorkflowRunCount === 0) { + // Keep a zero-count tombstone for workspaces whose workflow-only activity + // was cleared while a frontend activity subscription was disconnected. + if (snapshot == null && activeWorkflowRunCount === 0 && !hadWorkflowActivityCache) { return null; } return [