Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
} from "./src/utils/clean-context-runner.js";
import { SessionFilter } from "./src/utils/session-filter.js";
import { LocalMemoryCleaner } from "./src/utils/memory-cleaner.js";
import { CheckpointManager } from "./src/utils/checkpoint.js";
import { registerMemoryTdaiCli } from "./src/cli/index.js";
import { initDataDirectories, resetStores } from "./src/utils/pipeline-factory.js";
import { getOrCreateInstanceId, initReporter, report, resetReporter } from "./src/core/report/reporter.js";
Expand Down Expand Up @@ -305,6 +306,7 @@ export default function register(api: OpenClawPluginApi) {
retentionDays: cfg.memoryCleanup.retentionDays,
cleanTime: cfg.memoryCleanup.cleanTime,
logger: api.logger,
checkpoint: new CheckpointManager(pluginDataDir, api.logger),
});
sharedMemoryCleaner.start();
api.logger.debug?.(`${TAG} Memory cleaner started (singleton)`);
Expand Down
17 changes: 17 additions & 0 deletions src/core/tdai-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,23 @@ export class TdaiCore {
this.schedulerStartPromise = (async () => {
try {
const checkpoint = new CheckpointManager(this.dataDir, this.logger);
if (this.vectorStore && !this.vectorStore.isDegraded()) {
try {
const [actualL0, actualL1] = await Promise.all([
this.vectorStore.countL0(),
this.vectorStore.countL1(),
]);
await checkpoint.recalibrateCounts({
l0ConversationsCount: actualL0,
totalMemoriesExtracted: actualL1,
});
} catch (err) {
this.logger.warn(
`${TAG} Checkpoint recalibration failed on scheduler start (non-fatal): ` +
`${err instanceof Error ? err.message : String(err)}`,
);
}
}
const cp = await checkpoint.read();
scheduler.start(checkpoint.getAllPipelineStates(cp));
this.logger.debug?.(`${TAG} Scheduler started`);
Expand Down
76 changes: 76 additions & 0 deletions src/utils/checkpoint.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";

import { CheckpointManager } from "./checkpoint.js";

const tempDirs: string[] = [];

async function makeTempDir(): Promise<string> {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "tdai-checkpoint-"));
tempDirs.push(dir);
return dir;
}

afterEach(async () => {
await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })));
});

describe("CheckpointManager.recalibrateCounts", () => {
it("reconciles L0 and L1 counters with actual storage counts", async () => {
const dataDir = await makeTempDir();
const checkpoint = new CheckpointManager(dataDir);

await checkpoint.write({
last_captured_timestamp: 0,
total_processed: 12,
last_persona_at: 0,
last_persona_time: "",
request_persona_update: false,
persona_update_reason: "",
memories_since_last_persona: 8,
scenes_processed: 0,
runner_states: {},
pipeline_states: {},
l0_conversations_count: 12,
total_memories_extracted: 10,
});

await checkpoint.recalibrateCounts({
l0ConversationsCount: 4,
totalMemoriesExtracted: 6,
});

const cp = await checkpoint.read();
expect(cp.l0_conversations_count).toBe(4);
expect(cp.total_memories_extracted).toBe(6);
expect(cp.memories_since_last_persona).toBe(4);
});

it("does not let memories_since_last_persona go below zero", async () => {
const dataDir = await makeTempDir();
const checkpoint = new CheckpointManager(dataDir);

await checkpoint.write({
last_captured_timestamp: 0,
total_processed: 0,
last_persona_at: 0,
last_persona_time: "",
request_persona_update: false,
persona_update_reason: "",
memories_since_last_persona: 2,
scenes_processed: 0,
runner_states: {},
pipeline_states: {},
l0_conversations_count: 0,
total_memories_extracted: 10,
});

await checkpoint.recalibrateCounts({ totalMemoriesExtracted: 1 });

const cp = await checkpoint.read();
expect(cp.total_memories_extracted).toBe(1);
expect(cp.memories_since_last_persona).toBe(0);
});
});
35 changes: 35 additions & 0 deletions src/utils/checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,41 @@ export class CheckpointManager {
this.logger.info(`[checkpoint] incrementScenesProcessed: scenes_processed=${cp.scenes_processed}`);
}

/**
* Reconcile aggregate counters with the current storage backend.
*
* L0/L1 records can be removed outside the normal append path (TTL cleanup,
* manual JSONL pruning, SQLite maintenance). The checkpoint counters are
* increment-only during capture/extraction, so they otherwise drift upward
* forever after deletes.
*/
async recalibrateCounts(actual: {
l0ConversationsCount?: number;
totalMemoriesExtracted?: number;
}): Promise<void> {
const cp = await this.mutate((cp) => {
if (actual.l0ConversationsCount !== undefined) {
cp.l0_conversations_count = Math.max(0, actual.l0ConversationsCount);
}

if (actual.totalMemoriesExtracted !== undefined) {
const nextTotal = Math.max(0, actual.totalMemoriesExtracted);
const removedMemories = Math.max(0, cp.total_memories_extracted - nextTotal);
cp.total_memories_extracted = nextTotal;
if (removedMemories > 0) {
cp.memories_since_last_persona = Math.max(0, cp.memories_since_last_persona - removedMemories);
}
}
});

this.logger.info(
`[checkpoint] recalibrateCounts: ` +
`l0_conversations_count=${cp.l0_conversations_count}, ` +
`total_memories_extracted=${cp.total_memories_extracted}, ` +
`memories_since_last_persona=${cp.memories_since_last_persona}`,
);
}

// ============================
// Per-session helpers — runner state (L0/L1 owned)
// ============================
Expand Down
26 changes: 26 additions & 0 deletions src/utils/memory-cleaner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import type { IMemoryStore } from "../core/store/types.js";
import { ManagedTimer } from "./managed-timer.js";
import type { Logger } from "../core/types.js";
import { formatLocalDateTime, startOfLocalDay } from "./time.js";
import { CheckpointManager } from "./checkpoint.js";

export interface MemoryCleanerOptions {
baseDir: string;
retentionDays: number;
cleanTime: string;
logger?: Logger;
vectorStore?: IMemoryStore;
checkpoint?: CheckpointManager;
}

interface CleanupStats {
Expand All @@ -33,16 +35,22 @@ export class LocalMemoryCleaner {
private readonly timer: ManagedTimer;
private destroyed = false;
private vectorStore?: IMemoryStore;
private checkpoint?: CheckpointManager;

constructor(private readonly opts: MemoryCleanerOptions) {
this.timer = new ManagedTimer("memory-tdai-cleaner", () => this.destroyed);
this.vectorStore = opts.vectorStore;
this.checkpoint = opts.checkpoint;
}

setVectorStore(vectorStore: IMemoryStore | undefined): void {
this.vectorStore = vectorStore;
}

setCheckpoint(checkpoint: CheckpointManager | undefined): void {
this.checkpoint = checkpoint;
}

start(): void {
if (this.destroyed) return;

Expand Down Expand Up @@ -165,6 +173,24 @@ export class LocalMemoryCleaner {
total.changedFiles += 1;
}

if ((removedL1 > 0 || removedL0 > 0) && this.checkpoint) {
try {
const [actualL0, actualL1] = await Promise.all([
vectorStore.countL0(),
vectorStore.countL1(),
]);
await this.checkpoint.recalibrateCounts({
l0ConversationsCount: actualL0,
totalMemoriesExtracted: actualL1,
});
} catch (err) {
this.opts.logger?.warn(
`${TAG} Checkpoint recalibration failed after cleanup (non-fatal): ` +
`${err instanceof Error ? err.message : String(err)}`,
);
}
}

// ── Post-delete: audit summary ──
const durationMs = Date.now() - startMs;
const remainingL0 = totalL0 - removedL0;
Expand Down