Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions src/node/services/historyService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,156 @@ describe("HistoryService", () => {
}
});

it("deduplicates rotation replay rows across archive and active history", async () => {
const workspaceId = "ws-compaction-epoch-rotation-replay";
const workspaceDir = config.getSessionDir(workspaceId);
await fs.mkdir(workspaceDir, { recursive: true });

const replayedPrefix = [
messageLine(
workspaceId,
createMuxMessage("old-boundary", "assistant", "old summary", {
historySequence: 0,
compactionBoundary: true,
compacted: "user",
compactionEpoch: 1,
})
),
messageLine(
workspaceId,
createMuxMessage("kept-user", "user", "durable preference", { historySequence: 1 })
),
messageLine(
workspaceId,
createMuxMessage("compact-request", "user", "Please compact", {
historySequence: 2,
muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} },
})
),
];
const summary = messageLine(
workspaceId,
createMuxMessage("new-summary", "assistant", "new summary", {
historySequence: 3,
compactionBoundary: true,
compacted: "user",
compactionEpoch: 2,
})
);

await fs.writeFile(
path.join(workspaceDir, "chat-archive.jsonl"),
replayedPrefix.join("\n") + "\n"
);
await fs.writeFile(
path.join(workspaceDir, "chat.jsonl"),
[...replayedPrefix, summary].join("\n") + "\n"
);

const result = await service.getMessagesForCompactionEpoch(workspaceId, {
workspaceId,
summaryMessageId: "new-summary",
summaryHistorySequence: 3,
compactionEpoch: 2,
previousBoundaryHistorySequence: 0,
compactionRequestMessageId: "compact-request",
});

expect(result.success).toBe(true);
if (result.success) {
expect(result.data.messages.map((message) => message.id)).toEqual(["kept-user"]);
expect(result.data.summary.id).toBe("new-summary");
}
});

it("holds the workspace lock while scanning archive and active history", async () => {
const workspaceId = "ws-compaction-epoch-lock";
await writeHistoryLines(config, workspaceId, [
messageLine(
workspaceId,
createMuxMessage("old-boundary", "assistant", "old summary", {
historySequence: 0,
compactionBoundary: true,
compacted: "user",
compactionEpoch: 1,
})
),
messageLine(
workspaceId,
createMuxMessage("kept-user", "user", "durable preference", { historySequence: 1 })
),
messageLine(
workspaceId,
createMuxMessage("compact-request", "user", "Please compact", {
historySequence: 2,
muxMetadata: { type: "compaction-request", rawCommand: "/compact", parsed: {} },
})
),
messageLine(
workspaceId,
createMuxMessage("summary", "assistant", "summary", {
historySequence: 3,
compactionBoundary: true,
compacted: "user",
compactionEpoch: 2,
})
),
]);

let releaseScan!: () => void;
const scanReleased = new Promise<void>((resolve) => {
releaseScan = resolve;
});
let markScanStarted!: () => void;
const scanStarted = new Promise<void>((resolve) => {
markScanStarted = resolve;
});
const originalIterateFullHistory: HistoryService["iterateFullHistory"] =
service.iterateFullHistory.bind(service);
const blockingIterateFullHistory: HistoryService["iterateFullHistory"] = async (
workspaceIdArg,
direction,
visitor
) => {
markScanStarted();
await scanReleased;
return originalIterateFullHistory(workspaceIdArg, direction, visitor);
};
service.iterateFullHistory = blockingIterateFullHistory;

const scan = service.getMessagesForCompactionEpoch(workspaceId, {
workspaceId,
summaryMessageId: "summary",
summaryHistorySequence: 3,
compactionEpoch: 2,
previousBoundaryHistorySequence: 0,
compactionRequestMessageId: "compact-request",
});
await scanStarted;

interface WorkspaceLockProbe {
fileLocks: {
withLock<T>(key: string, operation: () => Promise<T>): Promise<T>;
};
}
const { fileLocks } = service as unknown as WorkspaceLockProbe;
let probeStarted = false;
const probe = fileLocks.withLock(workspaceId, () => {
probeStarted = true;
return Promise.resolve();
});
await Promise.resolve();

expect(probeStarted).toBe(false);
releaseScan();

const result = await scan;
await probe;

expect(result.success).toBe(true);
expect(probeStarted).toBe(true);
});

it("uses reset boundaries as lower bounds and excludes the reset marker", async () => {
const workspaceId = "ws-compaction-reset-epoch";
await writeHistoryLines(config, workspaceId, [
Expand Down
46 changes: 27 additions & 19 deletions src/node/services/historyService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -900,27 +900,35 @@ export class HistoryService {
const messages: MuxMessage[] = [];
let summary: MuxMessage | undefined;
const lowerBound = metadata.previousBoundaryHistorySequence;
const seenHistorySequences = new Set<number>();

// The just-compacted epoch can straddle chat-archive.jsonl and chat.jsonl after
// sealed-history rotation, so scan the full logical history under the workspace
// lock; otherwise a concurrent boundary rotation can move rows between files mid-scan.
const iteration = await this.fileLocks.withLock(workspaceId, () =>
this.iterateFullHistory(workspaceId, "forward", (chunk) => {
for (const message of chunk) {
const sequence = message.metadata?.historySequence;
if (!isNonNegativeInteger(sequence)) continue;
if (seenHistorySequences.has(sequence)) continue;
seenHistorySequences.add(sequence);

if (
sequence === metadata.summaryHistorySequence &&
message.id === metadata.summaryMessageId
) {
summary = message;
continue;
}

const iteration = await this.iterateFullHistory(workspaceId, "forward", (chunk) => {
for (const message of chunk) {
const sequence = message.metadata?.historySequence;
if (!isNonNegativeInteger(sequence)) continue;

if (
sequence === metadata.summaryHistorySequence &&
message.id === metadata.summaryMessageId
) {
summary = message;
continue;
if (sequence >= metadata.summaryHistorySequence) continue;
if (lowerBound !== undefined && sequence <= lowerBound) continue;
if (message.id === metadata.compactionRequestMessageId) continue;
if (isDurableContextBoundaryMarker(message)) continue;
messages.push(message);
}

if (sequence >= metadata.summaryHistorySequence) continue;
if (lowerBound !== undefined && sequence <= lowerBound) continue;
if (message.id === metadata.compactionRequestMessageId) continue;
if (isDurableContextBoundaryMarker(message)) continue;
messages.push(message);
}
});
})
);
if (!iteration.success) {
return Err(`Failed to read compaction epoch messages: ${iteration.error}`);
}
Expand Down
Loading