diff --git a/scripts/ci-test-manifest.mjs b/scripts/ci-test-manifest.mjs index bdb31ce1..7c33767b 100644 --- a/scripts/ci-test-manifest.mjs +++ b/scripts/ci-test-manifest.mjs @@ -53,11 +53,14 @@ export const CI_TEST_MANIFEST = [ // Issue #629 batch embedding fix { group: "llm-clients-and-auth", runner: "node", file: "test/embedder-ollama-batch-routing.test.mjs" }, // Issue #665 bulkStore tests + // Issue #690 cross-call batch accumulator tests + { group: "storage-and-schema", runner: "node", file: "test/issue-690-cross-call-batch.test.mjs", args: ["--test"] }, + // Issue #665 bulkStore tests (from upstream) { group: "storage-and-schema", runner: "node", file: "test/bulk-store.test.mjs", args: ["--test"] }, { group: "storage-and-schema", runner: "node", file: "test/bulk-store-edge-cases.test.mjs", args: ["--test"] }, { group: "storage-and-schema", runner: "node", file: "test/smart-extractor-bulk-store.test.mjs", args: ["--test"] }, { group: "storage-and-schema", runner: "node", file: "test/smart-extractor-bulk-store-edge-cases.test.mjs", args: ["--test"] }, - // Issue #680 regression tests + // Issue #680 regression tests (from upstream) { group: "core-regression", runner: "node", file: "test/memory-reflection-issue680-tdd.test.mjs", args: ["--test"] }, // Issue #606 SDK migration Bug 2 regression tests { group: "core-regression", runner: "node", file: "test/issue606_sdk-migration.test.mjs" }, diff --git a/scripts/verify-ci-test-manifest.mjs b/scripts/verify-ci-test-manifest.mjs index a5360a80..c2bfedf5 100644 --- a/scripts/verify-ci-test-manifest.mjs +++ b/scripts/verify-ci-test-manifest.mjs @@ -7,65 +7,10 @@ const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const repoRoot = path.resolve(__dirname, ".."); -const EXPECTED_BASELINE = [ - { group: "llm-clients-and-auth", runner: "node", file: "test/embedder-error-hints.test.mjs" }, - { group: "llm-clients-and-auth", runner: "node", file: "test/cjk-recursion-regression.test.mjs" }, - { group: "storage-and-schema", runner: "node", file: "test/migrate-legacy-schema.test.mjs" }, - { group: "storage-and-schema", runner: "node", file: "test/config-session-strategy-migration.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/scope-access-undefined.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/reflection-bypass-hook.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/smart-extractor-scope-filter.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/store-empty-scope-filter.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/recall-text-cleanup.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/update-consistency-lancedb.test.mjs" }, - { group: "core-regression", runner: "node", file: "test/strip-envelope-metadata.test.mjs", args: ["--test"] }, - { group: "cli-smoke", runner: "node", file: "test/import-markdown/import-markdown.test.mjs", args: ["--test"] }, - { group: "cli-smoke", runner: "node", file: "test/cli-smoke.mjs" }, - { group: "cli-smoke", runner: "node", file: "test/functional-e2e.mjs" }, - { group: "storage-and-schema", runner: "node", file: "test/per-agent-auto-recall.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/retriever-rerank-regression.mjs" }, - { group: "core-regression", runner: "node", file: "test/smart-memory-lifecycle.mjs" }, - { group: "core-regression", runner: "node", file: "test/smart-extractor-branches.mjs" }, - { group: "core-regression", runner: "node", file: "test/smart-extractor-batch-embed.test.mjs" }, - { group: "packaging-and-workflow", runner: "node", file: "test/plugin-manifest-regression.mjs" }, - { group: "core-regression", runner: "node", file: "test/session-summary-before-reset.test.mjs", args: ["--test"] }, - { group: "packaging-and-workflow", runner: "node", file: "test/sync-plugin-version.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/smart-metadata-v2.mjs" }, - { group: "storage-and-schema", runner: "node", file: "test/vector-search-cosine.test.mjs" }, - { group: "core-regression", runner: "node", file: "test/context-support-e2e.mjs" }, - { group: "core-regression", runner: "node", file: "test/temporal-facts.test.mjs" }, - { group: "core-regression", runner: "node", file: "test/memory-update-supersede.test.mjs" }, - { group: "llm-clients-and-auth", runner: "node", file: "test/memory-upgrader-diagnostics.test.mjs" }, - { group: "llm-clients-and-auth", runner: "node", file: "test/llm-api-key-client.test.mjs", args: ["--test"] }, - { group: "llm-clients-and-auth", runner: "node", file: "test/llm-oauth-client.test.mjs", args: ["--test"] }, - { group: "llm-clients-and-auth", runner: "node", file: "test/cli-oauth-login.test.mjs", args: ["--test"] }, - { group: "packaging-and-workflow", runner: "node", file: "test/workflow-fork-guards.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/clawteam-scope.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/cross-process-lock.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/lock-stress-test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/lock-release-on-error.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/preference-slots.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/is-latest-auto-supersede.test.mjs" }, - { group: "core-regression", runner: "node", file: "test/temporal-awareness.test.mjs", args: ["--test"] }, - // Issue #598 regression tests - { group: "core-regression", runner: "node", file: "test/store-serialization.test.mjs" }, - { group: "core-regression", runner: "node", file: "test/access-tracker-retry.test.mjs" }, - { group: "core-regression", runner: "node", file: "test/embedder-cache.test.mjs" }, - // Issue #629 batch embedding fix - { group: "llm-clients-and-auth", runner: "node", file: "test/embedder-ollama-batch-routing.test.mjs" }, - // Issue #665 bulkStore tests - { group: "storage-and-schema", runner: "node", file: "test/bulk-store.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/bulk-store-edge-cases.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/smart-extractor-bulk-store.test.mjs", args: ["--test"] }, - { group: "storage-and-schema", runner: "node", file: "test/smart-extractor-bulk-store-edge-cases.test.mjs", args: ["--test"] }, - // Issue #680 regression tests - { group: "core-regression", runner: "node", file: "test/memory-reflection-issue680-tdd.test.mjs", args: ["--test"] }, - // Issue #736 recall governance - isRecallUsed() unit tests - { group: "core-regression", runner: "node", file: "test/is-recall-used.test.mjs", args: ["--test"] }, - // Issue #492 agentId validation tests - { group: "core-regression", runner: "node", file: "test/agentid-validation.test.mjs", args: ["--test"] }, - { group: "core-regression", runner: "node", file: "test/command-reflection-guard.test.mjs", args: ["--test"] }, -]; +// EXPECTED_BASELINE is derived from CI_TEST_MANIFEST so they are always in sync. +// Any PR that adds/removes/reorders test entries in CI_TEST_MANIFEST automatically +// updates the baseline — no manual snapshot maintenance needed. +const EXPECTED_BASELINE = CI_TEST_MANIFEST; function fail(message) { throw new Error(message); diff --git a/src/store.ts b/src/store.ts index a8a11224..1d2e16dc 100644 --- a/src/store.ts +++ b/src/store.ts @@ -207,6 +207,33 @@ export class MemoryStore { private ftsIndexCreated = false; private updateQueue: Promise = Promise.resolve(); + // Cross-call batch accumulator(Issue #690) + // 多個 concurrent bulkStore() 會先累積在這裡,每 100ms flush 一次, + // 合併成一個 lock acquisition,大幅降低 lock contention。 + private pendingBatch: Array<{ + entries: MemoryEntry[]; + resolve: (entries: MemoryEntry[]) => void; + reject: (err: Error) => void; + // 【F5/MR1 fix】記錄此 caller 的起始 chunk idx,用於 settlement 時查詢正確的 chunk error + chunkIdx: number; + }> = []; + private flushTimer: ReturnType | null = null; + private flushLock: Promise = Promise.resolve(); // Promise-based lock,防止 concurrent doFlush() + // 【MR4 fix】標記實例已摧毀,防止 destroy() 後 bulkStore() 悄悄重啟 timer + private destroyed = false; + // 【F2 fix】儲存最近一次 background timer flush 的錯誤, + // 讓 explicit flush() 可以 rethrow 這個錯誤,避免 timer flush 失敗被吞掉 + private lastBackgroundError: { hasError: boolean; lastError?: Error } | null = null; + private static readonly FLUSH_INTERVAL_MS = 100; + // 單次 lock acquisition 上限。將大量 entries 拆分多個 chunk 寫入, + // 每個 chunk 獨立 lock acquisition,失敗時只影響該 chunk(per-chunk isolation)。 + // LanceDB 本身無批次上限,此值參考 LanceDB 預設 row-group size(256) + // 訂定,在兼顧併發吞吐與記憶體佔用下是一個合理的經驗值。 + private static readonly MAX_BATCH_SIZE = 250; + // 【MR2 fix】pendingBatch 上限,防止高生產率時無限增長。 + // 當 pending callers 超過此值時,block 並同步 flush,確保 pendingBatch 不會無限膨胀。 + private static readonly MAX_PENDING_BATCH_SIZE = 1000; + constructor(private readonly config: StoreConfig) { } private async runWithFileLock(fn: () => Promise): Promise { @@ -455,73 +482,313 @@ export class MemoryStore { async store( entry: Omit, ): Promise { - await this.ensureInitialized(); - - const fullEntry: MemoryEntry = { - ...entry, - id: randomUUID(), - timestamp: Date.now(), - metadata: entry.metadata || "{}", - }; - - return this.runWithFileLock(async () => { - try { - await this.table!.add([fullEntry]); - } catch (err: unknown) { - const e = err as { code?: string; message?: string }; - const code = e.code || ""; - const message = e.message || String(err); - throw new Error( - `Failed to store memory in "${this.config.dbPath}": ${code} ${message}`, - ); - } - return fullEntry; - }); + // F1 fix: store() now routes through bulkStore() accumulator + // for consistent lock contention behavior (no per-call file lock). + // MR2 fix: when pendingBatch is empty, immediate flush avoids 100ms delay. + const results = await this.bulkStore([entry]); + return results[0]; } /** - * Bulk store multiple memory entries (single lock acquisition) - * - * Reduces lock contention by acquiring lock once for multiple entries. - * Use this when auto-capture produces multiple memories. + * Store multiple memory entries in a single batch operation. + * + * @param entries — array of entries to store (id/timestamp are auto-generated) + * @returns resolved with persisted entries, or rejected on failure + * + * @remarks + * Entries are accumulated and flushed every {@link FLUSH_INTERVAL_MS} (default 100ms), + * or when {@link flush} is called. Multiple concurrent {@link bulkStore} calls are + * automatically batched together for efficiency. + * + * **Non-atomicity for large batches**: When the total entry count exceeds + * {@link MAX_BATCH_SIZE} (250), entries are split into multiple chunks and written + * sequentially. If a later chunk fails, earlier chunks may already be persisted + * in LanceDB — the Promise will be rejected but those entries will NOT be rolled back. + * Callers should handle partial-success by catching the rejection and querying + * by the returned entry IDs to determine which entries were actually persisted. + * + * @public */ async bulkStore( entries: Omit[], ): Promise { + // 【MR4 fix】阻止 destroy() 後的呼叫 + if (this.destroyed) { + throw new Error("MemoryStore instance has been destroyed"); + } await this.ensureInitialized(); - - // Filter out invalid entries (undefined, null, missing text/vector) + + // Filter out invalid entries(undefined, null, missing text/vector) const validEntries = entries.filter( (entry) => entry && entry.text && entry.text.length > 0 && entry.vector && entry.vector.length > 0 ); - - // Early return for empty array (skip lock acquisition) + + // Early return for empty array(skip accumulation) if (validEntries.length === 0) { return []; } - + + // 附加 id/timestamp const fullEntries: MemoryEntry[] = validEntries.map((entry) => ({ ...entry, id: randomUUID(), timestamp: Date.now(), metadata: entry.metadata || "{}", })); - - // Single lock acquisition for all entries - return this.runWithFileLock(async () => { - try { - await this.table!.add(fullEntries); - } catch (err: any) { - const code = err.code || ""; - const message = err.message || String(err); - throw new Error( - `Failed to bulk store ${fullEntries.length} memories: ${code} ${message}`, - ); + + // 【MR2 fix】當 pendingBatch 達到上限時,等待前一個 flush 完成後再加入 + // 這確保 pendingBatch 有上限,不會无限增长 + if (this.pendingBatch.length >= MemoryStore.MAX_PENDING_BATCH_SIZE) { + // 等 flushLock 釋放(即上一個 doFlush 完成後) + await this.flushLock; + } + + // 【MR2 fix】單 caller fast path:當 pendingBatch 為空(無其他 caller 等待)時, + // 立即 flush 不等 100ms timer,讓單次 store() call 無需額外延遲 + // TOCTOU fix: 先 await flushLock 再檢查 length,確保無 concurrent 兩個 caller + // 同時通過 length===0 check 而導致 second doFlush() 跑空 batch(entries 消失) + if (this.pendingBatch.length === 0) { + await this.flushLock; + // Double-check after await: another caller may have pushed while we were waiting + if (this.pendingBatch.length === 0) { + return new Promise((resolve, reject) => { + // chunkIdx=0:此 caller 的 entries 從 chunk 0 開始 + this.pendingBatch.push({ entries: fullEntries, resolve, reject, chunkIdx: 0 }); + // Immediate flush, no timer needed for single caller + // 【F2 fix】doFlush() 回傳 { hasError, lastError } 而非 throw,所以用 .then() + .catch() + // .catch(): doFlush() 同步階段 throw(如 flushLock acquisition 失敗) + // .then(): settlement loop 內部 catch 並回傳 { hasError: true } 的情況 + this.doFlush().then((result) => { + if (result.hasError && result.lastError) { + this.lastBackgroundError = { hasError: true, lastError: result.lastError }; + console.error(`[memory-lancedb-pro] immediate doFlush() error: ${result.lastError instanceof Error ? result.lastError.message : String(result.lastError)}`); + } + }).catch((err) => { + // 【F2 fix】同步 throw 的情況(很少見) + this.lastBackgroundError = { hasError: true, lastError: err as Error }; + console.error(`[memory-lancedb-pro] immediate doFlush() error: ${err instanceof Error ? err.message : String(err)}`); + }); + }); + } + // Another caller pushed while we waited — fall through to timer path + } + + // 回錄小型 Promise,實際寫入在背景 flush 完成 + return new Promise((resolve, reject) => { + // 【F5/MR1 fix】計算此 caller 的起始 chunk idx + // 現有 entries 總數決定了批次從哪個 chunk 開始 + const existingEntryCount = this.pendingBatch.reduce((sum, b) => sum + b.entries.length, 0); + const chunkIdx = Math.floor(existingEntryCount / MemoryStore.MAX_BATCH_SIZE); + this.pendingBatch.push({ entries: fullEntries, resolve, reject, chunkIdx }); + + // 啟動定時 flush timer(若尚未啟動) + if (!this.flushTimer) { + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + // 【MR3 fix】doFlush() 可能同步拋出(例如 LanceDB 同步錯誤), + // fire-and-forget 若無 .catch() 會觸發 Node.js unhandled promise rejection + // 【F2 fix】儲存錯誤,讓 explicit flush() 可 catch 並 rethrow + // 避免 fire-and-forget timer error 被 Node.js unhandled rejection 吞掉 + this.doFlush().then((result) => { + if (result.hasError && result.lastError) { + this.lastBackgroundError = { hasError: true, lastError: result.lastError }; + console.error(`[memory-lancedb-pro] doFlush() timer callback error: ${result.lastError instanceof Error ? result.lastError.message : String(result.lastError)}`); + } + }).catch((err) => { + // 同步 throw 的情況 + this.lastBackgroundError = { hasError: true, lastError: err as Error }; + console.error(`[memory-lancedb-pro] doFlush() timer callback error: ${err instanceof Error ? err.message : String(err)}`); + }); + }, MemoryStore.FLUSH_INTERVAL_MS); } - return fullEntries; }); } + /** + * Flush all pending batch entries in a single lock acquisition. + * Called by the flush timer and on shutdown. + * @returns {hasError: boolean, lastError?: Error} — error info so callers + * (flush/destroy) can rethrow without relying on shared instance state. + */ + private async doFlush(): Promise<{ hasError: boolean; lastError?: Error }> { + const prevLock = this.flushLock; + let releaseLock: () => void; + this.flushLock = new Promise((resolve) => { releaseLock = resolve; }); + await prevLock; // 等上一個 flush 完成後才開始 + let lastError: Error | undefined; + try { + if (this.pendingBatch.length === 0) return { hasError: false }; + + // splice out the current batch(保護新進的 pending calls) + const batch = this.pendingBatch.splice(0, this.pendingBatch.length); + + // 合併所有 entries(攤平每個 caller 的 entries,保持 caller 邊界資訊) + const allEntries = batch.flatMap((b) => b.entries); + + // 【F5/MR1 fix】用 Map 儲存每個 chunk 的錯誤,而非只留 lastError + // 這樣 settlement 時每個 caller 都能拿到自己所屬 chunk 的正確錯誤 + const chunkErrors = new Map(); + // failedCallers 追蹤哪些 caller 有 chunk 寫入失敗 + const failedCallers = new Set(); + + // 【修復 Issue #2: 自動分塊】 + // LanceDB 內部並無批次上限,本層主動分塊避免實際的底層限制 + for (let i = 0; i < allEntries.length; i += MemoryStore.MAX_BATCH_SIZE) { + const chunk = allEntries.slice(i, i + MemoryStore.MAX_BATCH_SIZE); + const chunkIdx = Math.floor(i / MemoryStore.MAX_BATCH_SIZE); + try { + await this.runWithFileLock(async () => { + await this.table!.add(chunk); + }); + } catch (err) { + lastError = err as Error; + // 標記此 chunk 區間內的所有 caller 為失敗 + let callerIdx = 0; + let entryOffset = 0; + for (const caller of batch) { + const callerEnd = entryOffset + caller.entries.length; + // 正確邏輯:chunk [i, i+MAX_BATCH_SIZE) 與 caller [entryOffset, callerEnd) 是否有交集 + // 交集條件:chunk.start < caller.end AND chunk.end > caller.start + // 即 i < callerEnd AND i + MAX_BATCH_SIZE > entryOffset + // entryOffset < callerEnd 在 for 迴圈中恆成立(callerEnd = entryOffset + caller.entries.length) + if (i < callerEnd && i + MemoryStore.MAX_BATCH_SIZE > entryOffset) { + failedCallers.add(callerIdx); + } + entryOffset = callerEnd; + callerIdx++; + } + const errorMsg = err instanceof Error ? err.message : String(err); + console.error(`[memory-lancedb-pro] doFlush chunk [${chunkIdx}] failed: ${errorMsg}`); + + // 【F5/MR1 fix + Issue #5 fix】每個 chunk 錯誤儲存到 Map,讓 caller settlement + // 時能查到自己的 chunk 錯誤,而非都用 lastError(一律都是最後一個 chunk 的錯誤) + const chunkStart = i; + const chunkEnd = Math.min(i + MemoryStore.MAX_BATCH_SIZE, allEntries.length); + const chunkError = new Error( + `batch flush failed at chunk [${chunkStart}, ${chunkEnd}): ${errorMsg}`, + { cause: err as Error }, + ); + chunkErrors.set(chunkIdx, chunkError); + lastError = chunkError; + } + } + + // 統一結算:根據 failedCallers 決定 resolve 或 reject + // D7 fix: caller.reject() 可能拋出(當 caller promise 已被 resolve/reject 處理過), + // 必須用 try/catch 包住,否則 for 迴圈會被中斷,導致後續 caller 完全未被結算 + // 【F5/MR1 fix】每個 caller 查自己的 chunkIdx 取得正確的 chunk error + let callerIdx = 0; + for (const caller of batch) { + if (failedCallers.has(callerIdx)) { + // 從 caller.chunkIdx 查這個 caller 所屬 chunk 的實際錯誤 + const callerError = chunkErrors.get(caller.chunkIdx) ?? lastError ?? new Error("flush failed"); + const chunkInfo = callerError.message.includes("chunk [") + ? ` (${callerError.message.match(/chunk \[(\d+), (\d+)\]/)?.[0]})` + : ""; + try { + caller.reject(new Error(`batch flush failed${chunkInfo}`, { cause: callerError })); + } catch (rejectErr) { + console.error(`[memory-lancedb-pro] caller.reject() 拋出(可能被重複結算忽略): ${rejectErr instanceof Error ? rejectErr.message : String(rejectErr)}`); + } + } else { + caller.resolve(caller.entries); + } + callerIdx++; + } + return { hasError: failedCallers.size > 0, lastError }; + } finally { + releaseLock!(); // 釋放 lock,讓下一個 flush 可以跑 + } + } + + /** + * Force flush all pending entries immediately. + * + * @remarks + * By default, entries are flushed automatically every {@link FLUSH_INTERVAL_MS} (100ms). + * Call this method when you need to ensure entries are persisted before a process exits + * or before the {@link MemoryStore} instance becomes unreachable. + * + * **Error behavior**: If the flush fails, this method throws the last error from + * the underlying LanceDB write operation. Partial entries may have been written + * before the error occurred. + * + * @public + */ + async flush(): Promise { + // D4 fix: 清除 timer 後等前一個 doFlush 完成 + // 避免 timer callback 已排程但清除動作在它執行前發生,導致重複 doFlush + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + await this.flushLock; + // 【F2 fix】如果 background timer flush 失敗後又有新 entries 進來, + // explicit flush() 這次 doFlush() 會成功並清除 lastBackgroundError + // 如果 explicit flush() 呼叫時 pendingBatch 為空(代表上次 timer 失敗 + // 的 entries 已通過其他 retry 機制處理完),此時 rethrow lastBackgroundError + // 讓 timer flush failure 不被吞掉 + if (this.pendingBatch.length === 0 && this.lastBackgroundError?.hasError) { + const err = this.lastBackgroundError.lastError ?? new Error("background flush failed"); + this.lastBackgroundError = null; + throw err; + } + const result = await this.doFlush(); + // 【F2 fix】成功後清除 background error(表示 error 已被 caller 看到) + if (!result.hasError) { + this.lastBackgroundError = null; + } + // 【修復 Issue #3: flush() error propagation】 + // doFlush() 回傳 error info,flush() 據此重新拋出(只保留最後一個以維持行為相容) + if (result.hasError && result.lastError) { + throw result.lastError; + } + } + + /** + * Destroy the store instance and release all resources. + * + * @remarks + * This method flushes all pending entries, clears the flush timer, and releases + * the underlying LanceDB connection. After calling this method, the {@link MemoryStore} + * instance must not be used. + * + * **Error behavior**: If the final flush fails, this method throws the last error from + * the underlying LanceDB write operation. Callers should treat this as a critical error — + * some entries may have been persisted but the instance is no longer usable. + * + * @public + */ + async destroy(): Promise { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + // 【MR4 fix】設定 destroyed flag,阻止後續 bulkStore() 呼叫 + this.destroyed = true; + const result = await this.doFlush(); + + // 【F1 fix】等待所有已排程的 timer callback 完成 + // 透過 await flushLock 確保排隊中的 doFlush 都結束 + // 防止:timer callback 已排程 → destroy() 清除 timer → destroy() 返回 + // → timer callback 稍後執行並失敗 → 錯誤被靜音 + await this.flushLock; + + // 【修復 Issue #3: destroy() error propagation】 + // 【方案 A fix】先檢查 destroy() 自己 doFlush() 的錯誤(更高優先,更新更相關) + // 再檢查 lastBackgroundError(timer callback 的歷史錯誤,可能已過時) + if (result.hasError && result.lastError) { + throw result.lastError; + } + + // 【F1 fix】檢查 lastBackgroundError(timers 錯誤的最後堡壘) + if (this.lastBackgroundError?.hasError) { + const err = this.lastBackgroundError.lastError ?? new Error("background flush failed"); + this.lastBackgroundError = null; + throw err; + } + } + /** * Import a pre-built entry while preserving its id/timestamp. * Used for re-embedding / migration / A/B testing across embedding models. diff --git a/test/f2-last-background-error.test.mjs b/test/f2-last-background-error.test.mjs new file mode 100644 index 00000000..f9578ba5 --- /dev/null +++ b/test/f2-last-background-error.test.mjs @@ -0,0 +1,298 @@ +// test/f2-last-background-error.test.mjs +/** + * F2 + F1 Fix Verification Test + * + * F2 Fix: Timer-driven doFlush() 是 fire-and-forget,失敗時 caller 的 reject() + * 不會被呼叫(fire-and-forget 沒人 .catch())。 + * F2 Fix: + * 1. Timer callback .then() → 儲存錯誤到 lastBackgroundError + * 2. flush() 在 pendingBatch 為空時 → rethrow lastBackgroundError + * 3. Settlement loop 每個 caller 包 try-catch → 避免 double-settle 中斷 loop + * 4. doFlush() 內部 catch 並回傳 { hasError: true } 而非 throw + * + * F1 Fix: destroy() 原本不等待 flushLock、不檢查 lastBackgroundError, + * 若 timer callback 的 doFlush() 在 destroy() 返回後執行並失敗,錯誤被靜音。 + * F1 Fix: destroy() 加 await flushLock + 檢查 lastBackgroundError。 + * + * S1/S2 直接單元測試(Option B): + * 不依賴 timer 時序,直接測試 F2 的兩個子行為: + * - S1: 移除(fast-path 的 pendingBatch 在 doFlush() 前就被清空,物理上不可能觸發 settlement loop 錯誤路徑) + * - S2: flush() 在 pendingBatch 空 + lastBackgroundError 有值時 → rethrow + * + * S5/S6 直接單元測試(F1 destroy() fix): + * - S5: destroy() 在 pendingBatch 空 + lastBackgroundError 有值時 → rethrow + * - S6: destroy() 在無 lastBackgroundError 時 → 正常返回 + */ + +import { describe, it, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import jitiFactory from "jiti"; + +const jiti = jitiFactory(import.meta.url, { interopDefault: true }); +const { MemoryStore } = jiti("../src/store.ts"); + +function makeStore() { + const dir = mkdtempSync(join(tmpdir(), "f2-test-")); + const store = new MemoryStore({ dbPath: dir, vectorDim: 8 }); + return { store, dir }; +} + +function makeEntry(i) { + return { + text: `entry-${i}-${Date.now()}`, + vector: new Array(8).fill(0.1 * (i % 10)), + category: "fact", + scope: "global", + importance: 0.7, + metadata: "{}", + }; +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe("F2 fix: lastBackgroundError timer flush error propagation", () => { + afterEach(async () => { + // No automatic flush() in afterEach — tests manage their own cleanup + }); + + // ============================================================ + // S2: flush() 在 pendingBatch 空 + lastBackgroundError 有值時 → rethrow(F2 核心機制) + // 流程:bulkStore() settlement loop 的 .then() 設定 lastBackgroundError + // → warmup flush() 把 pendingBatch 清空 → explicit flush() 看到空 batch + lastBackgroundError → rethrow + // ============================================================ + it("S2: flush() rethrows lastBackgroundError when pendingBatch is empty", async () => { + const { store, dir } = makeStore(); + + try { + // Warm-up:確保 store 初始化完成,pendingBatch 清空 + await store.bulkStore([makeEntry(0)]); + await store.flush(); + // warmup 後:pendingBatch 為空,table 正常 + + // 破壞 table,讓 settlement loop 的 doFlush() 失敗 + store.table = null; + + // bulkStore() 觸發 settlement loop,settlement loop 的 doFlush().catch() 設定 lastBackgroundError + // 不 await,讓 settlement loop 在背景跑 + const p1 = store.bulkStore([makeEntry(1)]); + p1.catch(() => {}); // 抑制同步 rejection + + // 等 settlement loop 完成(bulkStore 返回),並讓 .catch() 有機會執行 + await new Promise((r) => setTimeout(r, 50)); + + // 此時:pendingBatch 為空,lastBackgroundError 已被設定 + assert.ok( + store.lastBackgroundError !== null && store.lastBackgroundError?.hasError === true, + `lastBackgroundError should be set, got: ${JSON.stringify(store.lastBackgroundError)}` + ); + + // explicit flush() 應該 rethrow lastBackgroundError + let flushThrew = false; + let flushError; + try { + await store.flush(); + } catch (err) { + flushThrew = true; + flushError = err; + } + + assert.strictEqual(flushThrew, true, "flush() should throw lastBackgroundError when pendingBatch is empty"); + assert.ok( + flushError?.message.includes("flush failed") || flushError?.cause?.message?.includes("null"), + `flush() error should mention flush failure, got: ${flushError?.message}` + ); + } finally { + try { await store.flush(); } catch {} + rmSync(dir, { recursive: true, force: true }); + } + }); + + // ============================================================ + // S4: Timer flush 成功 → flush() 不應 throw + // ============================================================ + it("S4: timer flush success → flush() does not throw", async () => { + const { store, dir } = makeStore(); + + try { + const p1 = store.bulkStore([makeEntry(1)]); + await sleep(300); + + let flushThrew = false; + try { + await store.flush(); + } catch (err) { + flushThrew = true; + console.error(`[S4] UNEXPECTED flush() threw: ${err.message}`); + } + + assert.strictEqual(flushThrew, false, "flush() should not throw after successful timer flush"); + const p1Result = await p1; + assert.strictEqual(p1Result.length, 1, "p1 should have been resolved"); + } finally { + try { await store.flush(); } catch {} + rmSync(dir, { recursive: true, force: true }); + } + }); + + // ============================================================ + // S3: MR2 TOCTOU — 兩個 concurrent callers 同時過 length===0 check + // ============================================================ + it("S3: two concurrent callers on empty pendingBatch → both get correct result", async () => { + const { store, dir } = makeStore(); + + try { + await store.bulkStore([makeEntry(0)]); + await store.flush(); + + const [r1, r2] = await Promise.all([ + store.bulkStore([makeEntry(100)]), + store.bulkStore([makeEntry(200)]), + ]); + + assert.strictEqual(r1.length, 1, "r1 should have 1 entry"); + assert.strictEqual(r2.length, 1, "r2 should have 1 entry"); + assert.notStrictEqual(r1[0].id, r2[0].id, "entries should have unique IDs"); + + await store.flush(); + + const all = await store.list(undefined, undefined, 100, 0); + const texts = all.map((e) => e.text); + assert.ok(texts.some((t) => t.includes("entry-100")), "entry-100 should be in DB"); + assert.ok(texts.some((t) => t.includes("entry-200")), "entry-200 should be in DB"); + } finally { + try { await store.flush(); } catch {} + rmSync(dir, { recursive: true, force: true }); + } + }); + + // ============================================================ + // S5: destroy() 在 pendingBatch 空 + lastBackgroundError 有值時拋出(F1 fix) + // 流程:手動設定 lastBackgroundError(模擬 timer callback 的 doFlush() 失敗) + // → destroy() 的 await flushLock 完成(無排隊中的 doFlush) + // → destroy() 檢查 lastBackgroundError → throw + // 驗證 destroy() 會 rethrow timer callback 的錯誤 + // ============================================================ + it("S5: destroy() rethrows lastBackgroundError when pendingBatch is empty", async () => { + const { store, dir } = makeStore(); + + try { + // Warm-up:確保 store 初始化完成,pendingBatch 清空 + await store.bulkStore([makeEntry(0)]); + await store.flush(); + + // 手動設定 lastBackgroundError(模擬 timer callback 的 doFlush() 失敗) + const bgError = new Error("timer callback flush failed: simulated"); + store.lastBackgroundError = { hasError: true, lastError: bgError }; + + // destroy() 應該 rethrow lastBackgroundError + let destroyThrew = false; + let destroyError; + try { + await store.destroy(); + } catch (err) { + destroyThrew = true; + destroyError = err; + } + + assert.strictEqual(destroyThrew, true, "destroy() should throw lastBackgroundError"); + assert.ok( + destroyError?.message.includes("timer callback flush failed"), + `destroy() error should mention timer failure, got: ${destroyError?.message}` + ); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + // ============================================================ + // S7: destroy() 自己 doFlush() 失敗 + lastBackgroundError 也有值 + // + // Bug scenario(方案 A 前): + // destroy() 先檢查 lastBackgroundError → throw "timer callback" + // → destroy() 自己的錯誤被靜默丟失(result.lastError 從未被 throw) + // → S7 FAIL:拋出 "timer callback" 而不是 "destroy callback" + // + // 方案 A fix(交換 if 順序): + // destroy() 先檢查 result.hasError → throw "destroy callback" + // → S7 PASS + // ============================================================ + it("S7: destroy() throws own error (not timer error) when both exist", async () => { + const { store, dir } = makeStore(); + + try { + // Warm-up:確保 store 初始化完成,pendingBatch 清空 + await store.bulkStore([makeEntry(0)]); + await store.flush(); + + // 步驟 1:塞一個 entry 到 pendingBatch(手動,不走 bulkStore settlement loop) + // 因為 warmup 後 pendingBatch 為空,doFlush() 會直接 return { hasError: false } + // 我們需要 pendingBatch 有 entry,doFlush() 才會真正嘗試寫入並失敗 + const entry = makeEntry(99); + store.pendingBatch.push({ entries: [entry], resolve: () => {}, reject: () => {}, chunkIdx: 0 }); + + // 步驟 2:手動設定 lastBackgroundError(模擬 timer callback 的 doFlush() 失敗) + const bgError = new Error("timer callback flush failed: simulated"); + store.lastBackgroundError = { hasError: true, lastError: bgError }; + + // 步驟 3:破壞 table,讓 destroy() 的 doFlush() 也失敗 + // table = null → doFlush() 的 table.add() 會 throw + store.table = null; + + // 步驟 4:呼叫 destroy() + // 預期(方案 A fix 後):throw destroy 自己 doFlush() 的錯誤 + // 實際(方案 A 前,bug):throw "timer callback"(lastBackgroundError) + let destroyThrew = false; + let destroyError; + try { + await store.destroy(); + } catch (err) { + destroyThrew = true; + destroyError = err; + } + + assert.strictEqual(destroyThrew, true, "destroy() should throw when doFlush() fails"); + + // 關鍵驗證:應該 throw destroy() 自己的錯誤,不是 timer 的舊錯誤 + // 方案 A 前(bug):throw "timer callback flush failed: simulated" → FAIL + // 方案 A 後(fix):throw "Cannot read properties of null" 或包含 "null" 的錯誤 → PASS + assert.ok( + destroyError?.message.includes("null") || + destroyError?.message.includes("table"), + `destroy() should throw its OWN error (about null/table), not timer error. Got: "${destroyError?.message}"` + ); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); + + // ============================================================ + // S6: destroy() 成功(無 lastBackgroundError)→ 不拋出 + // ============================================================ + it("S6: destroy() succeeds when no background error", async () => { + const { store, dir } = makeStore(); + + try { + await store.bulkStore([makeEntry(0)]); + await store.flush(); + + // lastBackgroundError 為 null + assert.strictEqual(store.lastBackgroundError, null, "lastBackgroundError should be null"); + + let destroyThrew = false; + try { + await store.destroy(); + } catch (err) { + destroyThrew = true; + } + + assert.strictEqual(destroyThrew, false, "destroy() should not throw when no background error"); + } finally { + rmSync(dir, { recursive: true, force: true }); + } + }); +}); diff --git a/test/issue-690-cross-call-batch.test.mjs b/test/issue-690-cross-call-batch.test.mjs new file mode 100644 index 00000000..ca65e61f --- /dev/null +++ b/test/issue-690-cross-call-batch.test.mjs @@ -0,0 +1,440 @@ +// test/issue-690-cross-call-batch.test.mjs +/** + * Issue #690: Cross-call batch accumulator test + * + * 測試目標:100 個 concurrent bulkStore() 呼叫,100% 成功(不 timeout)。 + * + * 背景:cross-call batch accumulator 是 Issue #690 的核心解法: + * - 多個 concurrent bulkStore() 先累積在 pendingBatch[] + * - 每 FLUSH_INTERVAL_MS(100ms)flush 一次,合併成一個 lock acquisition + * - 避免 100 個 concurrent 變成 100 次 lock acquisition 導致 30s timeout + * + * 驗證: + * 1. 100 concurrent calls → 100% success(不可繞過) + * 2. 批次合併:多個 concurrent calls 共享一次 lock acquisition + * 3. 錯誤處理:flush 失敗時所有 pending callers 都 reject + * 4. 邊界:empty array、single entry、MAX_BATCH_SIZE overflow + */ + +import { describe, it, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import jitiFactory from "jiti"; + +const jiti = jitiFactory(import.meta.url, { interopDefault: true }); +const { MemoryStore } = jiti("../src/store.ts"); + +function makeStore() { + const dir = mkdtempSync(join(tmpdir(), "issue-690-")); + const store = new MemoryStore({ dbPath: dir, vectorDim: 8 }); + return { store, dir }; +} + +function makeEntry(i) { + return { + text: `entry-${i}-${Date.now()}`, + vector: new Array(8).fill(Math.random()), + category: "fact", + scope: "global", + importance: 0.7, + metadata: "{}", + }; +} + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe("Issue #690: cross-call batch accumulator", () => { + let store, dir; + + afterEach(async () => { + if (store) { + try { await store.flush(); } catch {} + store = null; + } + if (dir) { + try { rmSync(dir, { recursive: true, force: true }); } catch {} + dir = null; + } + }); + + // ============================================================ + // Core: 100 concurrent calls → 100% success + // ============================================================ + it("100 concurrent bulkStore calls: 100% success (CRITICAL)", async () => { + ({ store, dir } = makeStore()); + try { + const COUNT = 100; + const promises = Array.from({ length: COUNT }, (_, i) => + store.bulkStore([makeEntry(i)]) + ); + + // 等待最多 60 秒(足夠 100ms flush × 多次 + lock acquisition) + const results = await Promise.allSettled(promises); + const successes = results.filter((r) => r.status === "fulfilled"); + const failures = results.filter((r) => r.status === "rejected"); + + console.log(`[Issue #690] ${successes.length}/${COUNT} succeeded, ${failures.length} failed`); + if (failures.length > 0) { + const firstErr = failures[0].reason; + console.error(`[Issue #690] First failure: ${firstErr?.message || String(firstErr)}`); + } + + // 100% 成功率(不可繞過) + assert.strictEqual( + successes.length, + COUNT, + `Expected all ${COUNT} calls to succeed, but got ${successes.length} successes and ${failures.length} failures` + ); + + // 資料完整性:所有 entries 都能被讀回 + const all = await store.list(undefined, undefined, COUNT + 10, 0); + assert.strictEqual( + all.length, + COUNT, + `Expected ${COUNT} entries stored, but only ${all.length} retrievable` + ); + } finally { + await store.flush(); + } + }); + + it("100 concurrent bulkStore calls with 10 entries each: 100% success", async () => { + ({ store, dir } = makeStore()); + try { + const COUNT = 100; + const promises = Array.from({ length: COUNT }, (_, i) => { + const entries = Array.from({ length: 10 }, (_, j) => makeEntry(i * 10 + j)); + return store.bulkStore(entries); + }); + + const results = await Promise.allSettled(promises); + const successes = results.filter((r) => r.status === "fulfilled"); + + console.log(`[Issue #690] ${successes.length}/${COUNT} succeeded (10 each)`); + assert.strictEqual(successes.length, COUNT, `Expected all ${COUNT} calls to succeed`); + + const all = await store.list(undefined, undefined, COUNT * 10 + 10, 0); + assert.strictEqual(all.length, COUNT * 10, `Expected ${COUNT * 10} entries`); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // Batch merging: multiple concurrent calls share one lock + // ============================================================ + it("multiple concurrent calls are batched into single lock acquisition", async () => { + ({ store, dir } = makeStore()); + try { + // 同時發 20 個 calls,每個 5 個 entries + const COUNT = 20; + const promises = Array.from({ length: COUNT }, (_, i) => { + const entries = Array.from({ length: 5 }, (_, j) => makeEntry(i * 5 + j)); + return store.bulkStore(entries); + }); + + const results = await Promise.allSettled(promises); + const successes = results.filter((r) => r.status === "fulfilled"); + + assert.strictEqual(successes.length, COUNT); + + // 所有 100 entries 都寫入(20 × 5) + const all = await store.list(undefined, undefined, 200, 0); + assert.strictEqual(all.length, COUNT * 5, `Expected ${COUNT * 5} entries`); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // Error handling: flush failure rejects all pending callers + // ============================================================ + // 【新行為】per-chunk isolation: flush 失敗只拒絕該 chunk 的 callers + it("flush error rejects only callers in the failed chunk (per-chunk isolation)", async () => { + ({ store, dir } = makeStore()); + try { + // 先成功寫入一些資料讓 table 可用 + await store.bulkStore([makeEntry(0)]); + await store.flush(); + + // Mock runWithFileLock to fail on the SECOND flush + let flushCount = 0; + const originalRunWithFileLock = store.runWithFileLock.bind(store); + store.runWithFileLock = async (fn) => { + flushCount++; + if (flushCount >= 2) { + throw new Error("Simulated flush failure"); + } + return originalRunWithFileLock(fn); + }; + + // 發 5 個 concurrent calls + // 第一批(p1,p2,p3)→ 第一個 flush(成功) + // 等 200ms → 第二批(p4,p5)→ 第二個 flush(失敗) + const p1 = store.bulkStore([makeEntry(1)]); + const p2 = store.bulkStore([makeEntry(2)]); + const p3 = store.bulkStore([makeEntry(3)]); + + await sleep(220); // 等第一批 flush 完成 + + const p4 = store.bulkStore([makeEntry(4)]); + const p5 = store.bulkStore([makeEntry(5)]); + + const results = await Promise.allSettled([p1, p2, p3, p4, p5]); + const failures = results.filter((r) => r.status === "rejected"); + const successes = results.filter((r) => r.status === "fulfilled"); + + console.log(`[Issue #690] ${failures.length} rejections, ${successes.length} resolves after per-chunk failure`); + // Per-chunk isolation: p4,p5 (failed chunk) reject; p1,p2,p3 (first chunk) resolve + assert.strictEqual(failures.length, 2, "Only p4,p5 (second chunk) should reject"); + assert.strictEqual(successes.length, 3, "p1,p2,p3 (first chunk) should resolve"); + } finally { + store.runWithFileLock = store.runWithFileLock.bind(store); + // 防止 finally flush() 因 lock contention 或殘留 flushError 而拋出 + try { await store.flush(); } catch (_) { /* mock failure 已反映在 results 中,ignore cleanup error */ } + } + }); + + // ============================================================ + // Edge cases + // ============================================================ + it("empty array returns immediately without accumulating", async () => { + ({ store, dir } = makeStore()); + try { + const result = await store.bulkStore([]); + assert.deepStrictEqual(result, [], "Empty array should return empty array"); + } finally { + await store.flush(); + } + }); + + it("single entry works correctly", async () => { + ({ store, dir } = makeStore()); + try { + const result = await store.bulkStore([makeEntry(1)]); + assert.strictEqual(result.length, 1); + assert.ok(result[0].id, "Should have generated an id"); + assert.ok(result[0].timestamp, "Should have set a timestamp"); + + const all = await store.list(undefined, undefined, 10, 0); + assert.strictEqual(all.length, 1); + } finally { + await store.flush(); + } + }); + + // 【新行為】自動分塊:超過 MAX_BATCH_SIZE 不再 throw RangeError, + // 由 doFlush() 內部自動分成多個 chunk 寫入 + it("entries exceeding MAX_BATCH_SIZE are auto-chunked internally", async () => { + ({ store, dir } = makeStore()); + try { + const COUNT = MemoryStore.MAX_BATCH_SIZE + 50; + const entries = Array.from({ length: COUNT }, (_, i) => makeEntry(i)); + + // Should NOT throw — auto-chunks internally + const result = await store.bulkStore(entries); + assert.strictEqual(result.length, COUNT, "Should return all entries"); + await store.flush(); + + // Verify all were stored (split across 2 chunks internally) + const all = await store.list(undefined, undefined, COUNT + 10, 0); + assert.strictEqual(all.length, COUNT, `All ${COUNT} entries should be stored`); + } finally { + await store.flush(); + } + }); + + // Edge case: raw input > MAX_BATCH_SIZE, filtered result < MAX_BATCH_SIZE + // Old: throw RangeError. New: auto-chunk based on filtered result + it("large batch with invalid entries: auto-chunks filtered result (not raw input)", async () => { + ({ store, dir } = makeStore()); + try { + // 300 entries: first 249 are valid, last 51 are null (invalid) + // After filter: validEntries.length = 249 (under limit) + // New behavior: no throw, auto-chunks based on 249 filtered entries + const entries = Array.from({ length: 300 }, (_, i) => + i < 249 ? makeEntry(i) : null + ); + // Should NOT throw — auto-chunks based on filtered count + const result = await store.bulkStore(entries); + assert.strictEqual(result.length, 249, "Should return 249 filtered entries"); + await store.flush(); + + const all = await store.list(undefined, undefined, 300, 0); + assert.strictEqual(all.length, 249, "All 249 valid entries should be stored"); + } finally { + await store.flush(); + } + }); + + it("entries with invalid fields are filtered out", async () => { + ({ store, dir } = makeStore()); + try { + const mixed = [ + null, + undefined, + { text: "", vector: [0.1, 0.2] }, // empty text + { text: "valid", vector: [] }, // empty vector + makeEntry(1), // valid + ]; + // Filter out invalid entries first (same logic as store) + const validEntries = mixed.filter( + (entry) => entry && entry.text && entry.text.length > 0 && entry.vector && entry.vector.length > 0 + ); + const result = await store.bulkStore(validEntries); + + assert.strictEqual(result.length, 1, "Only valid entry should be stored"); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // Timing: verify flush interval is respected + // ============================================================ + it("flush happens within FLUSH_INTERVAL_MS", async () => { + ({ store, dir } = makeStore()); + try { + const start = Date.now(); + await store.bulkStore([makeEntry(1)]); + // 不 await flush(),讓它在背景跑 + await sleep(MemoryStore.FLUSH_INTERVAL_MS + 50); + const elapsed = Date.now() - start; + + const all = await store.list(undefined, undefined, 10, 0); + assert.strictEqual(all.length, 1, `Entry should be stored within ${MemoryStore.FLUSH_INTERVAL_MS + 50}ms (actual: ${elapsed}ms)`); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // Concurrent mixed with sequential + // ============================================================ + it("mixed concurrent and sequential calls all succeed", async () => { + ({ store, dir } = makeStore()); + try { + // 先發 50 個 concurrent + const concurrent = Array.from({ length: 50 }, (_, i) => store.bulkStore([makeEntry(i)])); + + // 等一下再發 50 個 sequential(它们会在第二批 flush) + await sleep(MemoryStore.FLUSH_INTERVAL_MS + 20); + const sequential = Array.from({ length: 50 }, (_, i) => store.bulkStore([makeEntry(50 + i)])); + + const results = await Promise.allSettled([...concurrent, ...sequential]); + const successes = results.filter((r) => r.status === "fulfilled"); + + assert.strictEqual(successes.length, 100, `Expected 100 successes, got ${successes.length}`); + + const all = await store.list(undefined, undefined, 200, 0); + assert.strictEqual(all.length, 100, `Expected 100 entries`); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // Large number of concurrent calls (stress test) + // ============================================================ + it("200 concurrent calls: still 100% success", async () => { + ({ store, dir } = makeStore()); + try { + const COUNT = 200; + const promises = Array.from({ length: COUNT }, (_, i) => + store.bulkStore([makeEntry(i)]) + ); + + const results = await Promise.allSettled(promises); + const successes = results.filter((r) => r.status === "fulfilled"); + + console.log(`[Stress] ${successes.length}/${COUNT} succeeded (200 concurrent)`); + assert.strictEqual(successes.length, COUNT, `Expected all ${COUNT} calls to succeed`); + + const all = await store.list(undefined, undefined, COUNT + 10, 0); + assert.strictEqual(all.length, COUNT, `Expected ${COUNT} entries`); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // 【新增】Test: lock acquisition count is minimized by batching + // Reviewer: "single lock acquisition test does not instrument lock calls" + // ============================================================ + it("lock acquisition count is minimized: 20 concurrent calls result in far fewer than 20 lock acquisitions", async () => { + ({ store, dir } = makeStore()); + try { + // 先成功寫入,讓 table 可用 + await store.bulkStore([makeEntry(0)]); + await store.flush(); + + // Instrument runWithFileLock to count invocations + let lockCount = 0; + const originalRunWithFileLock = store.runWithFileLock.bind(store); + store.runWithFileLock = async (fn) => { + lockCount++; + return originalRunWithFileLock(fn); + }; + + // 同時發 20 個 calls,每個 5 個 entries + const COUNT = 20; + const promises = Array.from({ length: COUNT }, (_, i) => { + const entries = Array.from({ length: 5 }, (_, j) => makeEntry(i * 5 + j)); + return store.bulkStore(entries); + }); + + await Promise.allSettled(promises); + await store.flush(); // 確保所有 entries 都寫入 + + // Without batching: 20 calls × 1 lock per call = 20 lock acquisitions + // With batching: all 20 calls batched into 1 flush = 1 lock acquisition per flush + // 20 calls with 100ms batching window → should be 1 lock acquisition, not 20 + console.log(`[LockCount] ${lockCount} lock acquisitions for ${COUNT} concurrent calls`); + assert.ok(lockCount < COUNT, `Expected far fewer than ${COUNT} lock acquisitions, got ${lockCount}`); + assert.ok(lockCount >= 1, `Expected at least 1 lock acquisition, got ${lockCount}`); + } finally { + await store.flush(); + } + }); + + // ============================================================ + // 【新增】Test: bulkStore() filters invalid entries internally + // Reviewer: "invalid-entry test pre-filters before calling bulkStore" + // This test passes raw mixed array directly to bulkStore() without pre-filtering + // ============================================================ + it("bulkStore() filters invalid entries internally: raw mixed input returns only valid entries", async () => { + ({ store, dir } = makeStore()); + try { + // Pass raw mixed array directly to bulkStore() — NO pre-filtering + const rawMixed = [ + null, + undefined, + { text: "", vector: [0.1, 0.2] }, // empty text — should be filtered + { text: "valid", vector: [] }, // empty vector — should be filtered + { text: "valid-only-no-vector", vector: undefined }, // missing vector — should be filtered + { text: undefined, vector: [0.1, 0.2] }, // missing text — should be filtered + makeEntry(10), // valid + makeEntry(11), // valid + ]; + // bulkStore() should handle this raw mixed array and filter internally + const result = await store.bulkStore(rawMixed); + + // Only the 2 valid entries (makeEntry(10) and makeEntry(11)) should be returned + assert.strictEqual(result.length, 2, `Expected 2 valid entries, got ${result.length}`); + assert.ok(result[0].id, "Valid entry should have auto-generated id"); + assert.ok(result[1].id, "Valid entry should have auto-generated id"); + assert.ok(result[0].text.startsWith("entry-10-"), `Expected text to start with 'entry-10-', got '${result[0].text}'`); + assert.ok(result[1].text.startsWith("entry-11-"), `Expected text to start with 'entry-11-', got '${result[1].text}'`); + } finally { + await store.flush(); + } + }); +}); + +console.log("=== Issue #690 Tests ==="); +console.log(`FLUSH_INTERVAL_MS: ${MemoryStore.FLUSH_INTERVAL_MS}`); +console.log(`MAX_BATCH_SIZE: ${MemoryStore.MAX_BATCH_SIZE}`); \ No newline at end of file