From cb3c846083e325772084ba416d0a5ea24d4eb1cb Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Mon, 15 Jun 2026 18:04:39 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20fix:=20read=20compaction=20epoch?= =?UTF-8?q?s=20from=20archived=20history?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Memory harvest reads compaction epoch evidence after sealed-history rotation. The epoch can now span chat-archive.jsonl and chat.jsonl, so use the archive-aware full-history iterator instead of passing the workspace ID to the file-path iterator. Validation: - bun test src/node/services/historyService.test.ts - bun test src/node/services/memoryConsolidationService.test.ts - make typecheck - MUX_ESLINT_CONCURRENCY=1 make static-check --- Generated with mux • Model: openai:gpt-5.5 • Thinking: xhigh • Cost: $3.31 --- src/node/services/historyService.test.ts | 150 +++++++++++++++++++++++ src/node/services/historyService.ts | 46 ++++--- 2 files changed, 177 insertions(+), 19 deletions(-) diff --git a/src/node/services/historyService.test.ts b/src/node/services/historyService.test.ts index 9e77709374..297717bbc5 100644 --- a/src/node/services/historyService.test.ts +++ b/src/node/services/historyService.test.ts @@ -1149,6 +1149,156 @@ describe("HistoryService", () => { } }); + it("deduplicates rotation replay rows across archive and active history", async () => { + const workspaceId = "ws-compaction-epoch-rotation-replay"; + const workspaceDir = config.getSessionDir(workspaceId); + await fs.mkdir(workspaceDir, { recursive: true }); + + const replayedPrefix = [ + messageLine( + workspaceId, + createMuxMessage("old-boundary", "assistant", "old summary", { + historySequence: 0, + compactionBoundary: true, + compacted: "user", + compactionEpoch: 1, + }) + ), + messageLine( + workspaceId, + createMuxMessage("kept-user", "user", "durable preference", { historySequence: 1 }) + ), + messageLine( + workspaceId, + createMuxMessage("compact-request", "user", "Please compact", { + historySequence: 2, + muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} }, + }) + ), + ]; + const summary = messageLine( + workspaceId, + createMuxMessage("new-summary", "assistant", "new summary", { + historySequence: 3, + compactionBoundary: true, + compacted: "user", + compactionEpoch: 2, + }) + ); + + await fs.writeFile( + path.join(workspaceDir, "chat-archive.jsonl"), + replayedPrefix.join("\n") + "\n" + ); + await fs.writeFile( + path.join(workspaceDir, "chat.jsonl"), + [...replayedPrefix, summary].join("\n") + "\n" + ); + + const result = await service.getMessagesForCompactionEpoch(workspaceId, { + workspaceId, + summaryMessageId: "new-summary", + summaryHistorySequence: 3, + compactionEpoch: 2, + previousBoundaryHistorySequence: 0, + compactionRequestMessageId: "compact-request", + }); + + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.messages.map((message) => message.id)).toEqual(["kept-user"]); + expect(result.data.summary.id).toBe("new-summary"); + } + }); + + it("holds the workspace lock while scanning archive and active history", async () => { + const workspaceId = "ws-compaction-epoch-lock"; + await writeHistoryLines(config, workspaceId, [ + messageLine( + workspaceId, + createMuxMessage("old-boundary", "assistant", "old summary", { + historySequence: 0, + compactionBoundary: true, + compacted: "user", + compactionEpoch: 1, + }) + ), + messageLine( + workspaceId, + createMuxMessage("kept-user", "user", "durable preference", { historySequence: 1 }) + ), + messageLine( + workspaceId, + createMuxMessage("compact-request", "user", "Please compact", { + historySequence: 2, + muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} }, + }) + ), + messageLine( + workspaceId, + createMuxMessage("summary", "assistant", "summary", { + historySequence: 3, + compactionBoundary: true, + compacted: "user", + compactionEpoch: 2, + }) + ), + ]); + + let releaseScan!: () => void; + const scanReleased = new Promise((resolve) => { + releaseScan = resolve; + }); + let markScanStarted!: () => void; + const scanStarted = new Promise((resolve) => { + markScanStarted = resolve; + }); + const originalIterateFullHistory: HistoryService["iterateFullHistory"] = + service.iterateFullHistory.bind(service); + const blockingIterateFullHistory: HistoryService["iterateFullHistory"] = async ( + workspaceIdArg, + direction, + visitor + ) => { + markScanStarted(); + await scanReleased; + return originalIterateFullHistory(workspaceIdArg, direction, visitor); + }; + service.iterateFullHistory = blockingIterateFullHistory; + + const scan = service.getMessagesForCompactionEpoch(workspaceId, { + workspaceId, + summaryMessageId: "summary", + summaryHistorySequence: 3, + compactionEpoch: 2, + previousBoundaryHistorySequence: 0, + compactionRequestMessageId: "compact-request", + }); + await scanStarted; + + interface WorkspaceLockProbe { + fileLocks: { + withLock(key: string, operation: () => Promise): Promise; + }; + } + const { fileLocks } = service as unknown as WorkspaceLockProbe; + let probeStarted = false; + const probe = fileLocks.withLock(workspaceId, () => { + probeStarted = true; + return Promise.resolve(); + }); + await Promise.resolve(); + + expect(probeStarted).toBe(false); + releaseScan(); + + const result = await scan; + await probe; + + expect(result.success).toBe(true); + expect(probeStarted).toBe(true); + }); + it("uses reset boundaries as lower bounds and excludes the reset marker", async () => { const workspaceId = "ws-compaction-reset-epoch"; await writeHistoryLines(config, workspaceId, [ diff --git a/src/node/services/historyService.ts b/src/node/services/historyService.ts index 8a9f6e6c87..8bd35d18e2 100644 --- a/src/node/services/historyService.ts +++ b/src/node/services/historyService.ts @@ -900,27 +900,35 @@ export class HistoryService { const messages: MuxMessage[] = []; let summary: MuxMessage | undefined; const lowerBound = metadata.previousBoundaryHistorySequence; + const seenHistorySequences = new Set(); + + // The just-compacted epoch can straddle chat-archive.jsonl and chat.jsonl after + // sealed-history rotation, so scan the full logical history under the workspace + // lock; otherwise a concurrent boundary rotation can move rows between files mid-scan. + const iteration = await this.fileLocks.withLock(workspaceId, () => + this.iterateFullHistory(workspaceId, "forward", (chunk) => { + for (const message of chunk) { + const sequence = message.metadata?.historySequence; + if (!isNonNegativeInteger(sequence)) continue; + if (seenHistorySequences.has(sequence)) continue; + seenHistorySequences.add(sequence); + + if ( + sequence === metadata.summaryHistorySequence && + message.id === metadata.summaryMessageId + ) { + summary = message; + continue; + } - const iteration = await this.iterateFullHistory(workspaceId, "forward", (chunk) => { - for (const message of chunk) { - const sequence = message.metadata?.historySequence; - if (!isNonNegativeInteger(sequence)) continue; - - if ( - sequence === metadata.summaryHistorySequence && - message.id === metadata.summaryMessageId - ) { - summary = message; - continue; + if (sequence >= metadata.summaryHistorySequence) continue; + if (lowerBound !== undefined && sequence <= lowerBound) continue; + if (message.id === metadata.compactionRequestMessageId) continue; + if (isDurableContextBoundaryMarker(message)) continue; + messages.push(message); } - - if (sequence >= metadata.summaryHistorySequence) continue; - if (lowerBound !== undefined && sequence <= lowerBound) continue; - if (message.id === metadata.compactionRequestMessageId) continue; - if (isDurableContextBoundaryMarker(message)) continue; - messages.push(message); - } - }); + }) + ); if (!iteration.success) { return Err(`Failed to read compaction epoch messages: ${iteration.error}`); }