Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/node/services/agentSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ interface AgentSessionOptions {
keepBackgroundProcesses?: boolean;
/** Called when compaction completes (e.g., to clear idle compaction pending state) */
onCompactionComplete?: (metadata: CompactionCompletionMetadata) => void;
/** Called with the terminal outcome of an idle compaction (persisted success / post-stream failure) */
onIdleCompactionOutcome?: (success: boolean) => void;
/** Called when post-compaction context state may have changed (plan/file edits) */
onPostCompactionStateChange?: () => void;
}
Expand Down Expand Up @@ -515,6 +517,7 @@ export class AgentSession {
workspaceGoalService,
keepBackgroundProcesses,
onCompactionComplete,
onIdleCompactionOutcome,
onPostCompactionStateChange,
} = options;

Expand All @@ -539,6 +542,7 @@ export class AgentSession {
telemetryService,
emitter: this.emitter,
onCompactionComplete,
onIdleCompactionOutcome,
});

this.compactionMonitor = new CompactionMonitor(
Expand Down
68 changes: 68 additions & 0 deletions src/node/services/compactionHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,74 @@ describe("CompactionHandler", () => {
expect(metadata?.previousBoundaryHistorySequence).toBe(1);
});

describe("onIdleCompactionOutcome", () => {
const createIdleCompactionRequest = (id = "idle-req"): MuxMessage =>
createMuxMessage(id, "user", "Please summarize the conversation", {
muxMetadata: {
type: "compaction-request",
rawCommand: "/compact",
parsed: {},
source: "idle-compaction",
},
});

it("reports success only after the summary is persisted", async () => {
const onIdleCompactionOutcome = mock((_success: boolean) => undefined);
handler = new CompactionHandler({
workspaceId,
historyService,
sessionDir,
telemetryService,
emitter: mockEmitter,
onIdleCompactionOutcome,
});
await seedHistory(createIdleCompactionRequest());

const handled = await handler.handleCompletion(createStreamEndEvent("Summary"));

expect(handled).toBe(true);
expect(onIdleCompactionOutcome.mock.calls).toEqual([[true]]);
});

it("reports failure when the post-stream summary is empty", async () => {
const onIdleCompactionOutcome = mock((_success: boolean) => undefined);
handler = new CompactionHandler({
workspaceId,
historyService,
sessionDir,
telemetryService,
emitter: mockEmitter,
onIdleCompactionOutcome,
});
await seedHistory(createIdleCompactionRequest());

// An empty summary means the provider stream ended but produced no usable
// content, so compaction cannot be persisted.
const handled = await handler.handleCompletion(createStreamEndEvent(" "));

expect(handled).toBe(false);
expect(onIdleCompactionOutcome.mock.calls).toEqual([[false]]);
});

it("does not report for a non-idle (manual) compaction", async () => {
const onIdleCompactionOutcome = mock((_success: boolean) => undefined);
handler = new CompactionHandler({
workspaceId,
historyService,
sessionDir,
telemetryService,
emitter: mockEmitter,
onIdleCompactionOutcome,
});
await seedHistory(createCompactionRequest());

const handled = await handler.handleCompletion(createStreamEndEvent("Summary"));

expect(handled).toBe(true);
expect(onIdleCompactionOutcome).not.toHaveBeenCalled();
});
});

it("should capture compaction_completed telemetry on successful compaction", async () => {
const compactionReq = createCompactionRequest();
await seedHistory(compactionReq);
Expand Down
25 changes: 21 additions & 4 deletions src/node/services/compactionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ interface CompactionHandlerOptions {
emitter: EventEmitter;
/** Called when compaction completes successfully (e.g., to clear idle compaction pending state) */
onCompactionComplete?: (metadata: CompactionCompletionMetadata) => void;
/**
* Called with the terminal outcome of an idle compaction (source === "idle-compaction"),
* after the summary is actually persisted (success) or a post-stream persistence failure
* (empty/invalid summary, history write error). Lets the idle loop stop re-attempting a
* workspace whose compaction keeps failing even though the provider stream ended cleanly.
*/
onIdleCompactionOutcome?: (success: boolean) => void;
}

/**
Expand All @@ -353,6 +360,7 @@ export class CompactionHandler {
private readonly processedCompactionRequestIds: Set<string> = new Set<string>();

private readonly onCompactionComplete?: (metadata: CompactionCompletionMetadata) => void;
private readonly onIdleCompactionOutcome?: (success: boolean) => void;

/** Flag indicating post-compaction attachments should be generated on next turn */
private postCompactionAttachmentsPending = false;
Expand All @@ -376,6 +384,7 @@ export class CompactionHandler {
this.telemetryService = options.telemetryService;
this.emitter = options.emitter;
this.onCompactionComplete = options.onCompactionComplete;
this.onIdleCompactionOutcome = options.onIdleCompactionOutcome;
}

private async loadPersistedPendingStateIfNeeded(): Promise<void> {
Expand Down Expand Up @@ -752,6 +761,11 @@ export class CompactionHandler {
return false;
}

// Determine idle-compaction (auto-triggered due to inactivity) up-front so the
// post-stream failure paths below can report a terminal outcome to the idle loop.
const isIdleCompaction =
muxMeta?.type === "compaction-request" && muxMeta.source === "idle-compaction";

// Dedupe: If we've already processed this compaction-request, skip
if (this.processedCompactionRequestIds.has(lastUserMsg.id)) {
return true;
Expand All @@ -777,6 +791,7 @@ export class CompactionHandler {
parts: partsSummary,
});
// Don't mark as processed so user can retry
if (isIdleCompaction) this.onIdleCompactionOutcome?.(false);
return false;
}

Expand All @@ -792,13 +807,10 @@ export class CompactionHandler {
}
);
// Don't mark as processed so user can retry
if (isIdleCompaction) this.onIdleCompactionOutcome?.(false);
return false;
}

// Check if this was an idle-compaction (auto-triggered due to inactivity)
const isIdleCompaction =
muxMeta?.type === "compaction-request" && muxMeta.source === "idle-compaction";

// Extract follow-up content to attach to summary for crash-safe dispatch
const pendingFollowUp = getCompactionFollowUpContent(muxMeta);

Expand All @@ -825,6 +837,7 @@ export class CompactionHandler {
);
if (!result.success) {
log.error("Compaction failed:", result.error);
if (isIdleCompaction) this.onIdleCompactionOutcome?.(false);
return false;
}

Expand All @@ -849,6 +862,10 @@ export class CompactionHandler {
// Notify that compaction completed (clears idle compaction pending state)
this.onCompactionComplete?.(result.data);

// Report the idle-compaction success only after the summary is actually persisted,
// so the idle loop's failure streak is reset on real success (not just stream end).
if (isIdleCompaction) this.onIdleCompactionOutcome?.(true);

// Emit a sanitized stream-end so UI can close streaming state without
// re-introducing stale provider metadata from the pre-compaction row.
this.emitChatEvent(this.sanitizeCompactionStreamEndEvent(event));
Expand Down
62 changes: 62 additions & 0 deletions src/node/services/idleCompactionService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,66 @@ describe("IdleCompactionService", () => {
expect(executeIdleCompactionMock).not.toHaveBeenCalled();
});
});

describe("recordOutcome (failure suppression)", () => {
const threshold24h = 24 * oneHourMs;

test("stops the loop after two consecutive failures", async () => {
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: false });

// One failure is not enough to suppress.
const afterOne = await service.checkEligibility(testWorkspaceId, threshold24h, now);
expect(afterOne.eligible).toBe(true);

service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: false });

const afterTwo = await service.checkEligibility(testWorkspaceId, threshold24h, now);
expect(afterTwo.eligible).toBe(false);
expect(afterTwo.reason).toBe("suppressed_after_failures");
});

test("stops the loop immediately on a model_not_found failure", async () => {
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: true });

const result = await service.checkEligibility(testWorkspaceId, threshold24h, now);
expect(result.eligible).toBe(false);
expect(result.reason).toBe("suppressed_after_failures");
});

test("a success between failures resets the consecutive failure streak", async () => {
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: false });
service.recordOutcome(testWorkspaceId, { success: true });
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: false });

// Only one failure since the last success, so the workspace is still eligible.
const result = await service.checkEligibility(testWorkspaceId, threshold24h, now);
expect(result.eligible).toBe(true);
});

test("a later success lifts suppression (self-healing)", async () => {
// Two failures suppress the workspace.
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: false });
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: false });
expect((await service.checkEligibility(testWorkspaceId, threshold24h, now)).eligible).toBe(
false
);

// An in-flight retry that actually persists a compaction clears suppression.
service.recordOutcome(testWorkspaceId, { success: true });

const result = await service.checkEligibility(testWorkspaceId, threshold24h, now);
expect(result.eligible).toBe(true);
});

test("checkAllWorkspaces no longer queues a suppressed workspace", async () => {
// A non-recoverable failure suppresses the workspace immediately.
service.recordOutcome(testWorkspaceId, { success: false, modelNotFound: true });

await service.checkAllWorkspaces();

// Give the (fire-and-forget) queue a chance to run; it must not execute.
await new Promise((resolve) => setTimeout(resolve, 20));
expect(executeIdleCompactionMock).not.toHaveBeenCalled();
});
});
});
62 changes: 62 additions & 0 deletions src/node/services/idleCompactionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,29 @@ const INITIAL_CHECK_DELAY_MS = 60 * 1000; // 1 minute - let startup initializati
const CHECK_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
const HOURS_TO_MS = 60 * 60 * 1000;

/**
* Stop attempting idle compaction for a workspace once it has failed this many
* times in a row. The hourly loop would otherwise re-queue a persistently
* failing workspace forever (a failed compaction neither marks the workspace
* `compacted` nor refreshes recency, so it stays eligible).
*/
const MAX_CONSECUTIVE_IDLE_COMPACTION_FAILURES = 2;

interface QueuedIdleCompaction {
workspaceId: string;
thresholdMs: number;
}

/**
* Terminal outcome of an idle compaction attempt, reported back to the service
* so it can decide whether to keep attempting compaction for a workspace.
*
* `modelNotFound` is treated as non-recoverable: the configured compaction model
* does not exist / is not available, and retrying will keep failing the same way,
* so we stop after a single occurrence rather than waiting for two failures.
*/
export type IdleCompactionOutcome = { success: true } | { success: false; modelNotFound: boolean };

/**
* IdleCompactionService monitors workspaces for idle time and executes
* compaction directly through a backend callback.
Expand All @@ -33,6 +51,11 @@ export class IdleCompactionService {
private readonly activeWorkspaceIds = new Set<string>();
private isProcessingQueue = false;
private stopped = false;
// Per-workspace count of consecutive failed idle compactions (reset on success).
private readonly consecutiveFailures = new Map<string, number>();
// Workspaces for which the idle compaction loop has been stopped after repeated
// (or non-recoverable) failures. Sticky for the service lifetime; cleared on restart.
private readonly suppressedWorkspaceIds = new Set<string>();

constructor(
config: Config,
Expand Down Expand Up @@ -223,6 +246,12 @@ export class IdleCompactionService {
thresholdMs: number,
now: number
): Promise<{ eligible: boolean; reason?: string }> {
// 0. Has the loop been stopped for this workspace after repeated/non-recoverable
// failures? Skip before touching history so a failing workspace stops re-queuing.
if (this.suppressedWorkspaceIds.has(workspaceId)) {
return { eligible: false, reason: "suppressed_after_failures" };
}

// 1. Has messages? Only need tail messages — recency + last-message checks don't need full history.
const historyResult = await this.historyService.getLastMessages(workspaceId, 50);
if (!historyResult.success || historyResult.data.length === 0) {
Expand Down Expand Up @@ -260,4 +289,37 @@ export class IdleCompactionService {

return { eligible: true };
}

/**
* Record the terminal outcome of an idle compaction attempt so the loop can
* stop re-attempting a persistently failing workspace.
*
* - Success resets the failure count AND lifts any suppression — a compaction that
* actually persisted means the workspace is healthy again (self-healing). This also
* covers an in-flight retry that succeeds after suppression was set.
* - A `model_not_found` failure is non-recoverable, so the loop stops immediately.
* - Any other failure stops the loop once it happens twice in a row.
*
* Suppression is in-memory (also cleared on restart, e.g. after fixing the model config).
*/
recordOutcome(workspaceId: string, outcome: IdleCompactionOutcome): void {
if (outcome.success) {
this.consecutiveFailures.delete(workspaceId);
this.suppressedWorkspaceIds.delete(workspaceId);
return;
Comment thread
ammar-agent marked this conversation as resolved.
}

const failures = (this.consecutiveFailures.get(workspaceId) ?? 0) + 1;
this.consecutiveFailures.set(workspaceId, failures);

if (outcome.modelNotFound || failures >= MAX_CONSECUTIVE_IDLE_COMPACTION_FAILURES) {
this.suppressedWorkspaceIds.add(workspaceId);
this.consecutiveFailures.delete(workspaceId);
log.warn("Stopping idle compaction for workspace after failure", {
workspaceId,
reason: outcome.modelNotFound ? "model_not_found" : "consecutive_failures",
failures,
});
}
}
}
6 changes: 6 additions & 0 deletions src/node/services/serviceContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ export class ServiceContainer {
this.extensionMetadata,
(workspaceId) => this.workspaceService.executeIdleCompaction(workspaceId)
);
// Forward terminal idle-compaction outcomes so the loop stops re-attempting a
// persistently failing workspace (immediately on model_not_found, otherwise after
// two consecutive failures).
this.workspaceService.setIdleCompactionOutcomeListener((workspaceId, outcome) =>
this.idleCompactionService.recordOutcome(workspaceId, outcome)
);
// IdleDispatcher + goal continuation bridge are owned by createCoreServices
// so the wiring works for `mux run` too. Share the same dispatcher with
// HeartbeatService — its priority ordering ensures an active goal
Expand Down
Loading
Loading