From ae311e25c741df51feabc15e31196b8226be0c80 Mon Sep 17 00:00:00 2001 From: Ziyang Guo <121015044+RerankerGuo@users.noreply.github.com> Date: Wed, 10 Jun 2026 22:56:11 +0800 Subject: [PATCH] fix(checkpoint): recalibrate counters after cleanup Signed-off-by: Ziyang Guo <121015044+RerankerGuo@users.noreply.github.com> --- index.ts | 2 + src/core/tdai-core.ts | 17 ++++++++ src/utils/checkpoint.test.ts | 76 ++++++++++++++++++++++++++++++++++++ src/utils/checkpoint.ts | 35 +++++++++++++++++ src/utils/memory-cleaner.ts | 26 ++++++++++++ 5 files changed, 156 insertions(+) create mode 100644 src/utils/checkpoint.test.ts diff --git a/index.ts b/index.ts index 868a770..6c96616 100644 --- a/index.ts +++ b/index.ts @@ -31,6 +31,7 @@ import { } from "./src/utils/clean-context-runner.js"; import { SessionFilter } from "./src/utils/session-filter.js"; import { LocalMemoryCleaner } from "./src/utils/memory-cleaner.js"; +import { CheckpointManager } from "./src/utils/checkpoint.js"; import { registerMemoryTdaiCli } from "./src/cli/index.js"; import { initDataDirectories, resetStores } from "./src/utils/pipeline-factory.js"; import { getOrCreateInstanceId, initReporter, report, resetReporter } from "./src/core/report/reporter.js"; @@ -305,6 +306,7 @@ export default function register(api: OpenClawPluginApi) { retentionDays: cfg.memoryCleanup.retentionDays, cleanTime: cfg.memoryCleanup.cleanTime, logger: api.logger, + checkpoint: new CheckpointManager(pluginDataDir, api.logger), }); sharedMemoryCleaner.start(); api.logger.debug?.(`${TAG} Memory cleaner started (singleton)`); diff --git a/src/core/tdai-core.ts b/src/core/tdai-core.ts index 977d4a2..a0fb6f5 100644 --- a/src/core/tdai-core.ts +++ b/src/core/tdai-core.ts @@ -513,6 +513,23 @@ export class TdaiCore { this.schedulerStartPromise = (async () => { try { const checkpoint = new CheckpointManager(this.dataDir, this.logger); + if (this.vectorStore && !this.vectorStore.isDegraded()) { + try { + const [actualL0, actualL1] = await Promise.all([ + this.vectorStore.countL0(), + this.vectorStore.countL1(), + ]); + await checkpoint.recalibrateCounts({ + l0ConversationsCount: actualL0, + totalMemoriesExtracted: actualL1, + }); + } catch (err) { + this.logger.warn( + `${TAG} Checkpoint recalibration failed on scheduler start (non-fatal): ` + + `${err instanceof Error ? err.message : String(err)}`, + ); + } + } const cp = await checkpoint.read(); scheduler.start(checkpoint.getAllPipelineStates(cp)); this.logger.debug?.(`${TAG} Scheduler started`); diff --git a/src/utils/checkpoint.test.ts b/src/utils/checkpoint.test.ts new file mode 100644 index 0000000..58fbac9 --- /dev/null +++ b/src/utils/checkpoint.test.ts @@ -0,0 +1,76 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; + +import { CheckpointManager } from "./checkpoint.js"; + +const tempDirs: string[] = []; + +async function makeTempDir(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "tdai-checkpoint-")); + tempDirs.push(dir); + return dir; +} + +afterEach(async () => { + await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); +}); + +describe("CheckpointManager.recalibrateCounts", () => { + it("reconciles L0 and L1 counters with actual storage counts", async () => { + const dataDir = await makeTempDir(); + const checkpoint = new CheckpointManager(dataDir); + + await checkpoint.write({ + last_captured_timestamp: 0, + total_processed: 12, + last_persona_at: 0, + last_persona_time: "", + request_persona_update: false, + persona_update_reason: "", + memories_since_last_persona: 8, + scenes_processed: 0, + runner_states: {}, + pipeline_states: {}, + l0_conversations_count: 12, + total_memories_extracted: 10, + }); + + await checkpoint.recalibrateCounts({ + l0ConversationsCount: 4, + totalMemoriesExtracted: 6, + }); + + const cp = await checkpoint.read(); + expect(cp.l0_conversations_count).toBe(4); + expect(cp.total_memories_extracted).toBe(6); + expect(cp.memories_since_last_persona).toBe(4); + }); + + it("does not let memories_since_last_persona go below zero", async () => { + const dataDir = await makeTempDir(); + const checkpoint = new CheckpointManager(dataDir); + + await checkpoint.write({ + last_captured_timestamp: 0, + total_processed: 0, + last_persona_at: 0, + last_persona_time: "", + request_persona_update: false, + persona_update_reason: "", + memories_since_last_persona: 2, + scenes_processed: 0, + runner_states: {}, + pipeline_states: {}, + l0_conversations_count: 0, + total_memories_extracted: 10, + }); + + await checkpoint.recalibrateCounts({ totalMemoriesExtracted: 1 }); + + const cp = await checkpoint.read(); + expect(cp.total_memories_extracted).toBe(1); + expect(cp.memories_since_last_persona).toBe(0); + }); +}); diff --git a/src/utils/checkpoint.ts b/src/utils/checkpoint.ts index 301fc0d..8df218e 100644 --- a/src/utils/checkpoint.ts +++ b/src/utils/checkpoint.ts @@ -332,6 +332,41 @@ export class CheckpointManager { this.logger.info(`[checkpoint] incrementScenesProcessed: scenes_processed=${cp.scenes_processed}`); } + /** + * Reconcile aggregate counters with the current storage backend. + * + * L0/L1 records can be removed outside the normal append path (TTL cleanup, + * manual JSONL pruning, SQLite maintenance). The checkpoint counters are + * increment-only during capture/extraction, so they otherwise drift upward + * forever after deletes. + */ + async recalibrateCounts(actual: { + l0ConversationsCount?: number; + totalMemoriesExtracted?: number; + }): Promise { + const cp = await this.mutate((cp) => { + if (actual.l0ConversationsCount !== undefined) { + cp.l0_conversations_count = Math.max(0, actual.l0ConversationsCount); + } + + if (actual.totalMemoriesExtracted !== undefined) { + const nextTotal = Math.max(0, actual.totalMemoriesExtracted); + const removedMemories = Math.max(0, cp.total_memories_extracted - nextTotal); + cp.total_memories_extracted = nextTotal; + if (removedMemories > 0) { + cp.memories_since_last_persona = Math.max(0, cp.memories_since_last_persona - removedMemories); + } + } + }); + + this.logger.info( + `[checkpoint] recalibrateCounts: ` + + `l0_conversations_count=${cp.l0_conversations_count}, ` + + `total_memories_extracted=${cp.total_memories_extracted}, ` + + `memories_since_last_persona=${cp.memories_since_last_persona}`, + ); + } + // ============================ // Per-session helpers — runner state (L0/L1 owned) // ============================ diff --git a/src/utils/memory-cleaner.ts b/src/utils/memory-cleaner.ts index 509f4fb..c7abbac 100644 --- a/src/utils/memory-cleaner.ts +++ b/src/utils/memory-cleaner.ts @@ -5,6 +5,7 @@ import type { IMemoryStore } from "../core/store/types.js"; import { ManagedTimer } from "./managed-timer.js"; import type { Logger } from "../core/types.js"; import { formatLocalDateTime, startOfLocalDay } from "./time.js"; +import { CheckpointManager } from "./checkpoint.js"; export interface MemoryCleanerOptions { baseDir: string; @@ -12,6 +13,7 @@ export interface MemoryCleanerOptions { cleanTime: string; logger?: Logger; vectorStore?: IMemoryStore; + checkpoint?: CheckpointManager; } interface CleanupStats { @@ -33,16 +35,22 @@ export class LocalMemoryCleaner { private readonly timer: ManagedTimer; private destroyed = false; private vectorStore?: IMemoryStore; + private checkpoint?: CheckpointManager; constructor(private readonly opts: MemoryCleanerOptions) { this.timer = new ManagedTimer("memory-tdai-cleaner", () => this.destroyed); this.vectorStore = opts.vectorStore; + this.checkpoint = opts.checkpoint; } setVectorStore(vectorStore: IMemoryStore | undefined): void { this.vectorStore = vectorStore; } + setCheckpoint(checkpoint: CheckpointManager | undefined): void { + this.checkpoint = checkpoint; + } + start(): void { if (this.destroyed) return; @@ -165,6 +173,24 @@ export class LocalMemoryCleaner { total.changedFiles += 1; } + if ((removedL1 > 0 || removedL0 > 0) && this.checkpoint) { + try { + const [actualL0, actualL1] = await Promise.all([ + vectorStore.countL0(), + vectorStore.countL1(), + ]); + await this.checkpoint.recalibrateCounts({ + l0ConversationsCount: actualL0, + totalMemoriesExtracted: actualL1, + }); + } catch (err) { + this.opts.logger?.warn( + `${TAG} Checkpoint recalibration failed after cleanup (non-fatal): ` + + `${err instanceof Error ? err.message : String(err)}`, + ); + } + } + // ── Post-delete: audit summary ── const durationMs = Date.now() - startMs; const remainingL0 = totalL0 - removedL0;