diff --git a/README.md b/README.md index 959311c..c02022d 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,17 @@ state libraries start to strain: same schema, selectors, and actions run against a persistent store on the server (SQLite today, pg/mongodb in future). The runtime reads only the rows a selector touches instead of hydrating the whole dataset into memory. +- **Lazy persistent reads when you need them.** `HybridDB` pairs a persistent + primary store with an in-memory cache: reads check memory first, fall through + to persistent storage on a miss, then cache the covered index range for next + time. Cache fills, write-through mutations, and transactions are serialized + per HybridDB instance so async selectors and actions do not overlap against + the in-memory cache tier. - **Synchronous on the frontend.** Against the in-memory driver, selectors and actions execute **synchronously** (no `await`, no microtask hop), so a click - updates the store and the UI in the same tick. + updates the store and the UI in the same tick. `useAsyncSelector` keeps this + fast path when a run completes from memory, then promotes to async only if a + command yields a promise. - **JavaScript selectors and actions.** Selectors and actions are ordinary JS: loops, conditionals, function calls. You get fast indexed lookups underneath, not a query language to learn. @@ -64,7 +72,11 @@ npm install @will-be-done/hyperdb ``` The React devtool ships separately. It traces every selector run and mutation -into a browsable call tree, so you can see which index a slow view scanned: +into a browsable call tree, so you can see which index a slow view scanned. For +HybridDB reads, select nodes are labeled `in-mem` or `persist` to show whether +the returned rows came from the memory cache or the primary persistent store. +When you switch traces, the active detail tab stays selected so comparison stays +focused: ```bash npm install @will-be-done/hyperdb-devtool diff --git a/TODO.md b/TODO.md index 1e461d8..33a61a0 100644 --- a/TODO.md +++ b/TODO.md @@ -10,9 +10,11 @@ Devtool: 1. resetcss 1. fix tabl selection (like call tree) not persisted to local storage +1. for hybrid db - mark whic data was loaded from cahce and which from persistent storage Doc: +1. Make font be local 1. Maybe reframe it and remove mention about sync nature? amybe even async by default? 1. review index.mdx, start/\*(execpt why), index.md, in-memory-persistence.md 1. Add performance compare doc @@ -40,3 +42,11 @@ Others: 1. Understand when normalisation happen. Does it happened in-mem? Indexeddb? When validation happen? 1. intent skills css tanstack support 1. On release - cp readme.md to hyperdb/readme.md + +DB: + +1. start readonly transaction for one selector, if not cached data appeared for hybrid db, and reuse it for other selectors too. Also, don't wait commit to finish for readonly txes. It also means that now beginTx() will accpes modes - readonly | readwrite +2. (? maybe) CoW if new cloned btree appeared. But it still will be locked by idb transaction +3. Allow disable/enable logging of sqlite/idb. Disabled by default +4. Renamer upsert -> put ? +5. Is there any bulk insert for indexeddb? diff --git a/packages/hyperdb-demo/index.html b/packages/hyperdb-demo/index.html index 6aea52f..c8dd80e 100644 --- a/packages/hyperdb-demo/index.html +++ b/packages/hyperdb-demo/index.html @@ -4,12 +4,12 @@ - + - + + + + HyperDB ยท Demo diff --git a/packages/hyperdb-demo/src/BenchmarkApp.tsx b/packages/hyperdb-demo/src/BenchmarkApp.tsx index eb5f514..41a9f4b 100644 --- a/packages/hyperdb-demo/src/BenchmarkApp.tsx +++ b/packages/hyperdb-demo/src/BenchmarkApp.tsx @@ -89,29 +89,37 @@ export function BenchmarkApp() { : 0; const visibleProjectCount = Math.min(projectLimit, dashboard.totalProjects); const directDriver = - storeMode === "idb" || storeMode === "idb-inmem" + storeMode === "idb" || + storeMode === "idb-inmem" || + storeMode === "idb-hybrid" ? "IndexedDB" : "WA-SQLite OPFS"; const hybrid = storeMode === "idb-inmem" || storeMode === "wa-sqlite-inmem"; - const storageStatus = hybrid + const runtimeHybrid = storeMode === "idb-hybrid"; + const storageStatus = runtimeHybrid ? { - dot: - persistence?.draining || persistence?.pendingBatches - ? "animate-blip bg-amber" - : "bg-green", - text: - persistence?.draining || persistence?.pendingBatches - ? `Saving... ${formatNumber(persistence.pendingOps)} ops queued` - : persistence?.lastDurationMs != null - ? `Saved ${formatNumber( - persistence.lastOpCount ?? 0, - )} ops in ${formatDuration(persistence.lastDurationMs)} ms` - : `${directDriver} mirrored behind in-memory reads/writes.`, - } - : { dot: "bg-green", - text: `Direct async ${directDriver} driver; changes survive reloads.`, - }; + text: "HybridDB uses IndexedDB as primary storage with an in-memory cache.", + } + : hybrid + ? { + dot: + persistence?.draining || persistence?.pendingBatches + ? "animate-blip bg-amber" + : "bg-green", + text: + persistence?.draining || persistence?.pendingBatches + ? `Saving... ${formatNumber(persistence.pendingOps)} ops queued` + : persistence?.lastDurationMs != null + ? `Saved ${formatNumber( + persistence.lastOpCount ?? 0, + )} ops in ${formatDuration(persistence.lastDurationMs)} ms` + : `${directDriver} mirrored behind in-memory reads/writes.`, + } + : { + dot: "bg-green", + text: `Direct async ${directDriver} driver; changes survive reloads.`, + }; const runMeasured = ( label: string, @@ -187,6 +195,7 @@ export function BenchmarkApp() { > + diff --git a/packages/hyperdb-demo/src/store-mode.ts b/packages/hyperdb-demo/src/store-mode.ts index e0d298d..05dfade 100644 --- a/packages/hyperdb-demo/src/store-mode.ts +++ b/packages/hyperdb-demo/src/store-mode.ts @@ -1,9 +1,15 @@ -export type StoreMode = "idb" | "idb-inmem" | "wa-sqlite" | "wa-sqlite-inmem"; +export type StoreMode = + | "idb" + | "idb-inmem" + | "idb-hybrid" + | "wa-sqlite" + | "wa-sqlite-inmem"; const STORAGE_KEY = "hyperdb-demo-mode"; const modes = new Set([ "idb", "idb-inmem", + "idb-hybrid", "wa-sqlite", "wa-sqlite-inmem", ]); diff --git a/packages/hyperdb-demo/src/stores.ts b/packages/hyperdb-demo/src/stores.ts index 7865559..5f2c825 100644 --- a/packages/hyperdb-demo/src/stores.ts +++ b/packages/hyperdb-demo/src/stores.ts @@ -7,6 +7,7 @@ import { } from "@will-be-done/hyperdb/drivers/sqlite"; import { DB, + HybridDB, SubscribableDB, asyncDispatch, createAction, @@ -166,23 +167,33 @@ class WaSQLiteWorkerDB implements AsyncSQLiteDB { } function getBackend(mode: StoreMode): PersistentBackend { - return mode === "idb" || mode === "idb-inmem" ? "idb" : "wa-sqlite"; + return mode === "idb" || mode === "idb-inmem" || mode === "idb-hybrid" + ? "idb" + : "wa-sqlite"; } function isHybridMode(mode: StoreMode): boolean { return mode === "idb-inmem" || mode === "wa-sqlite-inmem"; } +function isRuntimeHybridMode(mode: StoreMode): boolean { + return mode === "idb-hybrid"; +} + async function createPersistentDriver(mode: StoreMode) { const backend = getBackend(mode); - const hybrid = isHybridMode(mode); + const hybrid = isHybridMode(mode) || isRuntimeHybridMode(mode); if (backend === "idb") { return { driver: await openIndexedDBDriver( hybrid ? IDB_HYBRID_NAME : IDB_DIRECT_NAME, ), - dbName: hybrid ? "demo-idb-inmem:persistent" : "demo-idb", + dbName: isRuntimeHybridMode(mode) + ? "demo-idb-hybrid:primary" + : hybrid + ? "demo-idb-inmem:persistent" + : "demo-idb", }; } @@ -363,6 +374,26 @@ export type InitResult = { }; export async function initStore(mode: StoreMode): Promise { + if (isRuntimeHybridMode(mode)) { + const { driver, dbName } = await createPersistentDriver(mode); + const primaryDB = new DB(driver, { + freezeArgs: false, + freezeRows: false, + dbName, + }); + const cacheDB = new DB(new BptreeInmemDriver(), { + freezeArgs: false, + freezeRows: false, + dbName: "demo-idb-hybrid:cache", + }); + const db = new SubscribableDB(new HybridDB(primaryDB, cacheDB)); + + await execAsync(db.loadTables(ALL_TABLES)); + installTaskStatsHooks(db); + + return { db, persistence: null }; + } + if (isHybridMode(mode)) { const memDB = createMemDb(mode); const { driver, dbName } = await createPersistentDriver(mode); diff --git a/packages/hyperdb-devtool/README.md b/packages/hyperdb-devtool/README.md index c8975bb..90a3a84 100644 --- a/packages/hyperdb-devtool/README.md +++ b/packages/hyperdb-devtool/README.md @@ -1,6 +1,7 @@ # @will-be-done/hyperdb-devtool -React devtools for HyperDB. +React devtools for HyperDB. The trace details tab stays selected as you switch +between traces, making it easier to compare call trees, queries, and mutations. ```tsx import { HyperDBDevtools } from "@will-be-done/hyperdb-devtool/react"; diff --git a/packages/hyperdb-devtool/src/components.test.tsx b/packages/hyperdb-devtool/src/components.test.tsx index e1d1daf..cd9d87a 100644 --- a/packages/hyperdb-devtool/src/components.test.tsx +++ b/packages/hyperdb-devtool/src/components.test.tsx @@ -23,6 +23,7 @@ import { createTraceFrameMeta, endSelectEventSuccess, endTraceSuccess, + getTraceDBInfo, hyperDBTraceStore, markTraceFrameCached, startRootTrace, @@ -40,6 +41,63 @@ const createDB = (dbName?: string): SubscribableDB => { return db; }; +const traceWithChild = ({ + id, + name, + childName, + startedAt, + dbId, + dbLabel, +}: { + id: string; + name: string; + childName: string; + startedAt: number; + dbId?: string; + dbLabel?: string; +}): RootTrace => { + const childFrame: TraceFrame = { + id: `${id}-child`, + parentId: `${id}-root`, + kind: "selector", + name: childName, + arg: undefined, + startedAt: startedAt + 1, + durationMs: 2, + status: "success", + children: [], + commandIds: [], + mutationIds: [], + }; + const rootFrame: TraceFrame = { + id: `${id}-root`, + kind: "action", + name, + arg: undefined, + startedAt, + durationMs: 5, + status: "success", + children: [childFrame], + commandIds: [], + mutationIds: [], + }; + + return { + id, + ...(dbId !== undefined ? { dbId } : {}), + ...(dbLabel !== undefined ? { dbLabel } : {}), + kind: "action", + name, + arg: undefined, + startedAt, + durationMs: 5, + status: "success", + frames: [rootFrame], + commandEvents: [], + mutationEvents: [], + }; +}; + type HyperDBRegistryGlobal = typeof globalThis & { __hyperdb?: unknown; }; @@ -120,6 +178,82 @@ describe("HyperDBDevtools", () => { expect(html).toContain("Overview"); }); + it("keeps the selected details tab when switching traces", async () => { + const db = createDB("Trace tab test"); + const dbInfo = getTraceDBInfo(db); + + hyperDBTraceStore.addTrace( + traceWithChild({ + id: "older-trace", + name: "olderTrace", + childName: "olderTraceChild", + startedAt: 100, + dbId: dbInfo.id, + dbLabel: dbInfo.label, + }), + ); + hyperDBTraceStore.addTrace( + traceWithChild({ + id: "newer-trace", + name: "newerTrace", + childName: "newerTraceChild", + startedAt: 200, + dbId: dbInfo.id, + dbLabel: dbInfo.label, + }), + ); + const host = document.createElement("div"); + host.style.width = "1000px"; + host.style.height = "600px"; + document.body.append(host); + const root = createRoot(host); + const getDetailTab = (name: string): HTMLButtonElement => { + const tab = Array.from( + host.querySelectorAll('[role="tab"]'), + ).find((element) => element.textContent === name); + + if (!tab) throw new Error(`Missing ${name} tab`); + return tab; + }; + const getButtonContaining = (text: string): HTMLButtonElement => { + const button = Array.from(host.querySelectorAll("button")).find( + (element) => element.textContent?.includes(text), + ); + + if (!button) throw new Error(`Missing button containing ${text}`); + return button; + }; + + await act(async () => { + root.render(); + }); + + expect(getDetailTab("Overview").getAttribute("aria-selected")).toBe("true"); + + await act(async () => { + getDetailTab("Call Tree").click(); + }); + + expect(getDetailTab("Call Tree").getAttribute("aria-selected")).toBe( + "true", + ); + expect(host.textContent).toContain("@newerTraceChild"); + + await act(async () => { + getButtonContaining("olderTrace").click(); + }); + + expect(getDetailTab("Call Tree").getAttribute("aria-selected")).toBe( + "true", + ); + expect(host.textContent).toContain("@olderTraceChild"); + + await act(async () => { + root.unmount(); + }); + host.remove(); + }); + it("renders queried and mutated row totals in the trace list and overview", async () => { activateTraceStore(); const context = startRootTrace( @@ -711,6 +845,105 @@ describe("HyperDBDevtools", () => { ]); }); + it.each(["in-mem", "persist"] as const)( + "adds a %s badge for HybridDB select sources", + (source) => { + const rootFrame: TraceFrame = { + id: "frame-1", + kind: "selector", + name: "readTasks", + arg: undefined, + startedAt: 100, + durationMs: 50, + status: "success", + children: [], + commandIds: ["cmd-1"], + mutationIds: [], + }; + const selectEvent: SelectCommandEvent = { + id: "cmd-1", + frameId: "frame-1", + kind: "select", + tableName: "tasks", + index: "byProject", + where: [], + bounds: [], + startedAt: 110, + durationMs: 7, + status: "success", + resultCount: 3, + source, + }; + const trace: RootTrace = { + id: "trace-1", + kind: "selector", + name: "readTasks", + arg: undefined, + startedAt: 100, + durationMs: 50, + status: "success", + frames: [rootFrame], + commandEvents: [selectEvent], + mutationEvents: [], + }; + + const [operation] = getCallTreeOperations(rootFrame, trace); + + expect(getCallTreeOperationBadges(operation!)).toEqual([ + { text: "7ms", tone: "duration" }, + { text: "3 rows", tone: "rows" }, + { text: source, tone: "source" }, + ]); + }, + ); + + it("keeps old select traces without source on the existing badge shape", () => { + const rootFrame: TraceFrame = { + id: "frame-1", + kind: "selector", + name: "readTasks", + arg: undefined, + startedAt: 100, + durationMs: 50, + status: "success", + children: [], + commandIds: ["cmd-1"], + mutationIds: [], + }; + const selectEvent: SelectCommandEvent = { + id: "cmd-1", + frameId: "frame-1", + kind: "select", + tableName: "tasks", + index: "byProject", + where: [], + bounds: [], + startedAt: 110, + durationMs: 7, + status: "success", + resultCount: 3, + }; + const trace: RootTrace = { + id: "trace-1", + kind: "selector", + name: "readTasks", + arg: undefined, + startedAt: 100, + durationMs: 50, + status: "success", + frames: [rootFrame], + commandEvents: [selectEvent], + mutationEvents: [], + }; + + const [operation] = getCallTreeOperations(rootFrame, trace); + + expect(getCallTreeOperationBadges(operation!)).toEqual([ + { text: "7ms", tone: "duration" }, + { text: "3 rows", tone: "rows" }, + ]); + }); + it("adds a cached badge for memoized selector frames", () => { const cachedFrame: TraceFrame = { id: "frame-cached", diff --git a/packages/hyperdb-devtool/src/components.tsx b/packages/hyperdb-devtool/src/components.tsx index c8e5679..ba30b45 100644 --- a/packages/hyperdb-devtool/src/components.tsx +++ b/packages/hyperdb-devtool/src/components.tsx @@ -63,6 +63,8 @@ export type HyperDBDevtoolsPanelProps = { onClose?: () => void; }; +type TraceDetailsTab = "overview" | "data" | "mutations" | "tree"; + type DevtoolsPanelInnerProps = HyperDBDevtoolsPanelProps & { discoverRegisteredDBs?: boolean; }; @@ -458,7 +460,15 @@ const ButtonElement = ( const SpanElement = ( props: React.HTMLAttributes & { - tone?: "green" | "blue" | "red" | "amber" | "duration" | "rows" | "cached"; + tone?: + | "green" + | "blue" + | "red" + | "amber" + | "duration" + | "rows" + | "cached" + | "source"; }, ) => { const { tone, ...domProps } = props; @@ -1613,14 +1623,17 @@ const TreeLabel = styled("span")` color: var(--hdb-text); `; -const treeBadgeColor = (tone: "duration" | "rows" | "cached"): string => { +type CallTreeBadgeTone = "duration" | "rows" | "cached" | "source"; + +const treeBadgeColor = (tone: CallTreeBadgeTone): string => { if (tone === "rows") return "var(--hdb-accent)"; if (tone === "cached") return "var(--hdb-blue)"; + if (tone === "source") return "var(--hdb-warn)"; return "var(--hdb-border)"; }; const TreeBadge = styled(SpanElement)<{ - tone: "duration" | "rows" | "cached"; + tone: CallTreeBadgeTone; }>` flex: 0 0 auto; display: inline-flex; @@ -2079,8 +2092,8 @@ export const getMutationDisplay = ( export const getCallTreeOperationBadges = ( operation: CallTreeOperation, -): { text: string; tone: "duration" | "rows" | "cached" }[] => { - const badges: { text: string; tone: "duration" | "rows" | "cached" }[] = [ +): { text: string; tone: CallTreeBadgeTone }[] => { + const badges: { text: string; tone: CallTreeBadgeTone }[] = [ { text: callTreeOperationDuration(operation), tone: "duration" }, ]; const recordCount = callTreeOperationRecordCount(operation); @@ -2089,6 +2102,10 @@ export const getCallTreeOperationBadges = ( badges.push({ text: formatRowCount(recordCount), tone: "rows" }); } + if (operation.kind === "select" && operation.event.source !== undefined) { + badges.push({ text: operation.event.source, tone: "source" }); + } + if (operation.kind === "frame" && operation.frame.cached) { badges.push({ text: "cached", tone: "cached" }); } @@ -2609,10 +2626,17 @@ const CallTree = React.memo(({ trace }: { trace: RootTrace }) => { CallTree.displayName = "CallTree"; const TraceDetails = React.memo( - ({ trace, onBack }: { trace: RootTrace; onBack?: () => void }) => { - const [tab, setTab] = useState<"overview" | "data" | "mutations" | "tree">( - "overview", - ); + ({ + trace, + tab, + onTabChange, + onBack, + }: { + trace: RootTrace; + tab: TraceDetailsTab; + onTabChange: (tab: TraceDetailsTab) => void; + onBack?: () => void; + }) => { const contentRef = useRef(null); const tone = statusTone(trace.status); @@ -2640,20 +2664,37 @@ const TraceDetails = React.memo( - - setTab("overview")}> + + onTabChange("overview")} + > Overview - setTab("data")}> + onTabChange("data")} + > Queries setTab("mutations")} + onClick={() => onTabChange("mutations")} > Mutations - setTab("tree")}> + onTabChange("tree")} + > Call Tree @@ -2692,6 +2733,8 @@ const DevtoolsPanelInner = ({ const [sortField, setSortField] = useState(readStoredSortField); const [sortDir, setSortDir] = useState(readStoredSortDir); + const [selectedTraceTab, setSelectedTraceTab] = + useState("overview"); const [optionsOpen, setOptionsOpen] = useState(false); const [isNarrow, setIsNarrow] = useState(false); const isDraggingRef = useRef(false); @@ -3066,11 +3109,18 @@ const DevtoolsPanelInner = ({ setSelectedTraceId(undefined)} /> ) : ( - + ) ) : isNarrow ? null : ( No traces diff --git a/packages/hyperdb-doc/src/content/docs/integrations/devtools.md b/packages/hyperdb-doc/src/content/docs/integrations/devtools.md index 45b1ee2..2b052e5 100644 --- a/packages/hyperdb-doc/src/content/docs/integrations/devtools.md +++ b/packages/hyperdb-doc/src/content/docs/integrations/devtools.md @@ -53,6 +53,14 @@ selector frames, the index ranges scanned, and the mutations performed. Cache hits are recorded too, so you can see when a selector was reused instead of recomputed. +The details pane keeps the active tab while you move between traces, so you can +compare Overview, Queries, Mutations, or Call Tree output without reselecting +the same tab each time. + +When a selector reads through `HybridDB`, select nodes in the call tree also show +where the returned rows came from: `in-mem` for the memory cache, or `persist` +for the primary persistent database. + ### Per-DB tracer Set a tracer when constructing a [`DB`](/runtime/db/). Pass the shared store, the diff --git a/packages/hyperdb-doc/src/content/docs/integrations/react.md b/packages/hyperdb-doc/src/content/docs/integrations/react.md index 975ae8f..d7ecb15 100644 --- a/packages/hyperdb-doc/src/content/docs/integrations/react.md +++ b/packages/hyperdb-doc/src/content/docs/integrations/react.md @@ -80,6 +80,10 @@ For asynchronous drivers (IndexedDB, async SQLite). Same shape, but the result arrives asynchronously, so it returns `defaultValue` (or `undefined`) until the first run resolves, and re-runs on relevant changes. +Each run starts synchronously. If the selector completes from memory or cache, +the result is applied in the same tick; if a command yields a promise, that run +continues asynchronously. + ```tsx const tasks = useAsyncSelector({ selector: projectTasks, diff --git a/packages/hyperdb-doc/src/content/docs/runtime/db.md b/packages/hyperdb-doc/src/content/docs/runtime/db.md index 1277cce..0bb7df9 100644 --- a/packages/hyperdb-doc/src/content/docs/runtime/db.md +++ b/packages/hyperdb-doc/src/content/docs/runtime/db.md @@ -101,6 +101,47 @@ Because hooks run within the transaction, anything they write commits atomically with the change that triggered them, and a throw rolls the whole thing back. +## `HybridDB` + +`HybridDB` wraps two DBs: a persistent primary DB and an in-memory cache DB. It +is for datasets that are too large to hydrate eagerly, or apps where startup +time matters more than synchronous reads. + +On read, `HybridDB` checks the in-memory cache first. If the requested index +range is not cached, it runs the same scan against the persistent primary, +upserts the returned rows into memory, and records that range as cached for +later reads. Empty misses are cached too. Limited B-tree reads cache the covered +prefix or suffix when the runtime can prove the returned rows are enough to +answer the same limited query from memory. + +```ts +import { DB, HybridDB, SubscribableDB, execAsync } from "@will-be-done/hyperdb"; +import { BptreeInmemDriver } from "@will-be-done/hyperdb/drivers/inmemory"; +import { AsyncSqlDriver } from "@will-be-done/hyperdb/drivers/sqlite"; + +const primary = new DB(new AsyncSqlDriver(sqlite, sqliteDb)); +const cache = new DB(new BptreeInmemDriver()); + +const db = new SubscribableDB(new HybridDB(primary, cache)); + +await execAsync(db.loadTables([tasksTable])); +``` + +The trade-off is async reads. A selector may fall through to disk, so use +`selectAsync`, `asyncDispatch`, `useAsyncSelector`, and `useAsyncDispatch` with a +hybrid runtime. Once a working set is cached, repeated reads are served from the +in-memory tier. + +Writes go to both tiers in the same operation. That means cached rows stay +current immediately, while uncached ranges still load lazily on first access. +Transactions open transactions against both tiers; scan coverage discovered +inside a transaction is published to the outer cache only after commit. + +HybridDB serializes cache fills, write-through mutations, coverage updates, and +transaction lifetimes per instance. This keeps async selector misses and actions +from overlapping against the in-memory cache tier while a primary read or +transaction is still in flight. + ## Executing commands Selectors and actions are generators. The dispatch and select helpers run them diff --git a/packages/hyperdb-doc/src/styles/theme.css b/packages/hyperdb-doc/src/styles/theme.css index 431f6c9..f7d095d 100644 --- a/packages/hyperdb-doc/src/styles/theme.css +++ b/packages/hyperdb-doc/src/styles/theme.css @@ -3,7 +3,9 @@ Single solid brand color (violet). No gradients. ========================================================================= */ -@import url("https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700;800&display=swap"); +/* TODO: use local */ + +/* @import url("https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700;800&display=swap"); */ :root { /* One solid brand color used for buttons, logo, and accents */ diff --git a/packages/hyperdb/README.md b/packages/hyperdb/README.md index b72fe25..7d52633 100644 --- a/packages/hyperdb/README.md +++ b/packages/hyperdb/README.md @@ -29,9 +29,15 @@ state libraries start to strain: same schema, selectors, and actions run against a persistent store on the server (SQLite today). The runtime reads only the rows a selector touches instead of hydrating the whole dataset into memory. +- **Lazy persistent reads when you need them.** `HybridDB` pairs a persistent + primary store with an in-memory cache: reads check memory first, fall through + to persistent storage on a miss, then cache the covered index range for next + time. - **Synchronous on the frontend.** Against the in-memory driver, selectors and actions execute **synchronously** (no `await`, no microtask hop), so a click - updates the store and the UI in the same tick. + updates the store and the UI in the same tick. `useAsyncSelector` keeps this + fast path when a run completes from memory, then promotes to async only if a + command yields a promise. - **JavaScript selectors and actions.** Selectors and actions are ordinary JS: loops, conditionals, function calls. You get fast indexed lookups underneath, not a query language to learn. @@ -188,15 +194,15 @@ export function App() { ## Entry points -| Import path | Contents | -| ---------------------------------------- | ------------------------------------------------------------------------ | -| `@will-be-done/hyperdb` | Core: `defineTable`, `v`, `selectFrom`, builders, `DB`, `SubscribableDB` | -| `@will-be-done/hyperdb/react` | React hooks and `DBProvider` | -| `@will-be-done/hyperdb/tracing` | Tracing store and tracer configuration | -| `@will-be-done/hyperdb/drivers/inmemory` | `BptreeInmemDriver` | -| `@will-be-done/hyperdb/drivers/sqlite` | `SqlDriver`, `AsyncSqlDriver` | -| `@will-be-done/hyperdb/drivers/idb` | `openIndexedDBDriver`, `IdbDriver` | -| `@will-be-done/hyperdb-devtool/react` | `HyperDBDevtools`, `HyperDBDevtoolsPanel` (separate package) | +| Import path | Contents | +| ---------------------------------------- | ------------------------------------------------------------------------------------ | +| `@will-be-done/hyperdb` | Core: `defineTable`, `v`, `selectFrom`, builders, `DB`, `HybridDB`, `SubscribableDB` | +| `@will-be-done/hyperdb/react` | React hooks and `DBProvider` | +| `@will-be-done/hyperdb/tracing` | Tracing store and tracer configuration | +| `@will-be-done/hyperdb/drivers/inmemory` | `BptreeInmemDriver` | +| `@will-be-done/hyperdb/drivers/sqlite` | `SqlDriver`, `AsyncSqlDriver` | +| `@will-be-done/hyperdb/drivers/idb` | `openIndexedDBDriver`, `IdbDriver` | +| `@will-be-done/hyperdb-devtool/react` | `HyperDBDevtools`, `HyperDBDevtoolsPanel` (separate package) | ## Learn more diff --git a/packages/hyperdb/src/hyperdb/commands/runner.ts b/packages/hyperdb/src/hyperdb/commands/runner.ts index a615572..ac72588 100644 --- a/packages/hyperdb/src/hyperdb/commands/runner.ts +++ b/packages/hyperdb/src/hyperdb/commands/runner.ts @@ -20,6 +20,7 @@ import type { Op } from "../runtime/ops"; import { anonymousTraceMeta, getTracerForDB, + withCurrentSelectEventTrait, withTraceContextTrait, type HyperDBTracer, type TraceContext, @@ -251,7 +252,10 @@ export function* runCommandGenerator( : undefined; try { - const rows = yield* scopedDB.intervalScan( + const selectDB = selectEvent + ? withCurrentSelectEventTrait(scopedDB, selectEvent) + : scopedDB; + const rows = yield* selectDB.intervalScan( table, index, selectQuery.where, diff --git a/packages/hyperdb/src/hyperdb/commands/selector/selector.ts b/packages/hyperdb/src/hyperdb/commands/selector/selector.ts index 6c770c4..5e1a5fa 100644 --- a/packages/hyperdb/src/hyperdb/commands/selector/selector.ts +++ b/packages/hyperdb/src/hyperdb/commands/selector/selector.ts @@ -1,6 +1,6 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { SubscribableDB, Op } from "../../runtime/subscribable-db"; -import { execAsync, execSync } from "../../core/executor"; +import { execAsync, execMaybeAsync, execSync } from "../../core/executor"; import type { HyperDB } from "../../core/contracts"; import { deepFreeze } from "../../deep-freeze"; import type { Row } from "../../core/primitives"; @@ -384,6 +384,34 @@ export async function runSelectorAsync( return result; } +export function runSelectorMaybeAsync( + db: HyperDB, + gen: () => Generator, + selectRangeCmds: SelectRangeCmd[] = [], + options: RunSelectorOptions = {}, +): TReturn | Promise { + selectRangeCmds.splice(0, selectRangeCmds.length); + + const visited = makeVisited(options); + const result = execMaybeAsync( + runCommandGenerator(db, gen(), { ...options, selectRangeCmds, visited }), + ); + + if (result instanceof Promise) { + return result.then((value) => { + if (options.childMemo && visited) { + pruneChildMemo(options.childMemo, visited); + } + return value; + }); + } + + if (options.childMemo && visited) { + pruneChildMemo(options.childMemo, visited); + } + return result; +} + export function initSelector( db: SubscribableDB, gen: () => Generator, diff --git a/packages/hyperdb/src/hyperdb/core/executor.test.ts b/packages/hyperdb/src/hyperdb/core/executor.test.ts index 9528d75..ff0b442 100644 --- a/packages/hyperdb/src/hyperdb/core/executor.test.ts +++ b/packages/hyperdb/src/hyperdb/core/executor.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "vitest"; import { noop, unwrap, type DBCmd } from "../commands/async"; -import { execAsync, execSync } from "./executor"; +import { execAsync, execMaybeAsync, execSync } from "./executor"; function* unexpectedCommand(): Generator { yield { type: "unexpected" } as unknown as DBCmd; @@ -64,4 +64,31 @@ describe("executor", () => { await expect(execAsync(command())).resolves.toBe("done"); }); + + it("returns synchronously from execMaybeAsync when no async command is yielded", () => { + function* command(): Generator { + yield* noop(); + return "done"; + } + + expect(execMaybeAsync(command())).toBe("done"); + }); + + it("continues execMaybeAsync asynchronously from the first async command", async () => { + const steps: string[] = []; + + function* command(): Generator { + steps.push("start"); + const value = yield* unwrap(Promise.resolve("async")); + steps.push(value); + return "done"; + } + + const result = execMaybeAsync(command()); + + expect(result).toBeInstanceOf(Promise); + expect(steps).toEqual(["start"]); + await expect(result).resolves.toBe("done"); + expect(steps).toEqual(["start", "async"]); + }); }); diff --git a/packages/hyperdb/src/hyperdb/core/executor.ts b/packages/hyperdb/src/hyperdb/core/executor.ts index 698b80f..59c5c45 100644 --- a/packages/hyperdb/src/hyperdb/core/executor.ts +++ b/packages/hyperdb/src/hyperdb/core/executor.ts @@ -4,6 +4,17 @@ function unexpectedCmdError(cmd: unknown): Error { return new Error(`Unexpected DBCmd yielded: ${String(cmd)}`); } +const isPromiseLike = (value: unknown): value is PromiseLike => { + if ( + value === null || + (typeof value !== "object" && typeof value !== "function") + ) { + return false; + } + + return typeof (value as { then?: unknown }).then === "function"; +}; + export function execSync(cmd: Generator): T { let result = cmd.next(); @@ -20,6 +31,25 @@ export function execSync(cmd: Generator): T { return result.value as T; } +async function execAsyncFrom( + cmd: Generator, + value: unknown, +): Promise { + let result = cmd.next(await value); + + while (!result.done) { + if (isUnwrapCmd(result.value)) { + result = cmd.next(await result.value.data); + } else if (isNoopCmd(result.value)) { + result = cmd.next(); + } else { + throw unexpectedCmdError(result.value); + } + } + + return result.value as T; +} + export async function execAsync(cmd: Generator): Promise { let result = cmd.next(); @@ -35,3 +65,23 @@ export async function execAsync(cmd: Generator): Promise { return result.value as T; } + +export function execMaybeAsync(cmd: Generator): T | Promise { + let result = cmd.next(); + + while (!result.done) { + if (isUnwrapCmd(result.value)) { + const data = result.value.data; + if (isPromiseLike(data)) { + return execAsyncFrom(cmd, data); + } + result = cmd.next(data); + } else if (isNoopCmd(result.value)) { + result = cmd.next(); + } else { + throw unexpectedCmdError(result.value); + } + } + + return result.value as T; +} diff --git a/packages/hyperdb/src/hyperdb/core/tracer.ts b/packages/hyperdb/src/hyperdb/core/tracer.ts index 4667e3b..121d125 100644 --- a/packages/hyperdb/src/hyperdb/core/tracer.ts +++ b/packages/hyperdb/src/hyperdb/core/tracer.ts @@ -12,6 +12,7 @@ export type TraceQueryOrder = "asc" | "desc"; export type CommandEventKind = "select"; export type MutationEventKind = "insert" | "upsert" | "delete"; export type TraceStartOn = "devtoolOpen" | "load"; +export type SelectScanSource = "in-mem" | "persist"; export type TraceOptions = { enabled: boolean; @@ -66,6 +67,7 @@ export type SelectCommandEvent = { bounds: SelectTraceInput["bounds"]; limit?: SelectOptions["limit"]; order?: TraceQueryOrder; + source?: SelectScanSource; resultCount?: number; result?: unknown[]; startedAt: number; @@ -284,4 +286,55 @@ export const withTraceContextTrait = < return db.withTraits(traceContextTrait(context)) as TDB; }; +export const currentSelectEventTraitType = "hyperdb.currentSelectEvent"; + +export type CurrentSelectEventTrait = Trait & { + type: typeof currentSelectEventTraitType; + event: SelectCommandEvent; +}; + +export const currentSelectEventTrait = ( + event: SelectCommandEvent, +): CurrentSelectEventTrait => ({ + type: currentSelectEventTraitType, + event, +}); + +export const isCurrentSelectEventTrait = ( + trait: Trait, +): trait is CurrentSelectEventTrait => + trait.type === currentSelectEventTraitType && "event" in trait; + +export const getCurrentSelectEventFromTraits = ( + traits: Trait[], +): SelectCommandEvent | undefined => { + for (let index = traits.length - 1; index >= 0; index -= 1) { + const trait = traits[index]; + if (trait && isCurrentSelectEventTrait(trait)) { + return trait.event; + } + } +}; + +export const getCurrentSelectEventForDB = (db: { + getTraits(): Trait[]; +}): SelectCommandEvent | undefined => + getCurrentSelectEventFromTraits(db.getTraits()); + +export const withCurrentSelectEventTrait = < + TDB extends { + getTraits(): Trait[]; + withTraits(...trait: Trait[]): unknown; + }, +>( + db: TDB, + event: SelectCommandEvent, +): TDB => { + if (getCurrentSelectEventForDB(db) === event) { + return db; + } + + return db.withTraits(currentSelectEventTrait(event)) as TDB; +}; + export type SerializableTraceValue = string | number | boolean | null | Value; diff --git a/packages/hyperdb/src/hyperdb/index.ts b/packages/hyperdb/src/hyperdb/index.ts index 4473d49..6fe85ea 100644 --- a/packages/hyperdb/src/hyperdb/index.ts +++ b/packages/hyperdb/src/hyperdb/index.ts @@ -5,6 +5,7 @@ export * from "./commands/selector/selector"; export { noop } from "./commands/async"; export * from "./core/query/bounds"; export * from "./runtime/subscribable-db"; +export * from "./runtime/hybrid-db"; export * from "./schema/table"; export * from "./schema/values"; export * from "./tracing"; diff --git a/packages/hyperdb/src/hyperdb/runtime/hybrid-db-intervals.test.ts b/packages/hyperdb/src/hyperdb/runtime/hybrid-db-intervals.test.ts new file mode 100644 index 0000000..8c51a09 --- /dev/null +++ b/packages/hyperdb/src/hyperdb/runtime/hybrid-db-intervals.test.ts @@ -0,0 +1,374 @@ +import { describe, expect, it, vi } from "vitest"; +import { DB } from "./db"; +import { HybridDB } from "./hybrid-db"; +import { + boundToInterval, + canServeLimitedResultFromCache, + createHybridIntervalCache, + hasOverlap, + intervalFromClauses, + intervalsForLimitedRows, + isEmptyInterval, + mergeCoverage, + mergeCoverageMaps, + mergeIntervals, + subtractIntervals, + subtractOne, + type NormalizedInterval, +} from "./hybrid-db-intervals"; +import { MIN, MAX, type Row } from "../core/primitives"; +import { BptreeInmemDriver } from "../drivers/inmemory/bptree-inmem-driver"; +import { defineTable } from "../schema/table"; +import { v } from "../schema/values"; +import { AsyncDB } from "../test-utils/async-db"; + +type Task = { + id: string; + title: string; + value: number; + projectId: string; +}; + +const intervalTable = defineTable("hybridIntervalTasks", { + id: v.string(), + title: v.string(), + value: v.number(), + projectId: v.string(), +}) + .index("byValue", ["value"]) + .index("byProjectValue", ["projectId", "value"]) + .index("byTitle", ["title"], { type: "hash" }) + .index("byIdHash", ["id"], { type: "hash" }); + +const createTask = (value: number, title = `Task ${value}`): Task => ({ + id: String(value).padStart(3, "0"), + title, + value, + projectId: value <= 3 ? "a" : "b", +}); + +const createDBs = async () => { + const primary = new DB(new BptreeInmemDriver()); + const cache = new DB(new BptreeInmemDriver()); + const db = new AsyncDB(new HybridDB(primary, cache)); + + await db.loadTables([intervalTable]); + + return { db, primary, cache }; +}; + +const i = ( + lower: NormalizedInterval["lower"], + upper: NormalizedInterval["upper"], + lowerInclusive = true, + upperInclusive = true, +): NormalizedInterval => ({ + lower, + lowerInclusive, + upper, + upperInclusive, +}); + +describe("HybridDB interval helpers", () => { + it("normalizes tuple scan bounds into closed and open intervals", () => { + expect(boundToInterval({ gte: [1], lt: [3] }, 2)).toEqual( + i([1, MIN], [3, MIN], true, false), + ); + expect(boundToInterval({ gt: [1], lte: [3] }, 2)).toEqual( + i([1, MAX], [3, MAX], false, true), + ); + expect(boundToInterval({}, 2)).toEqual( + i([MIN, MIN], [MAX, MAX], true, true), + ); + }); + + it("builds interval targets from table indexes", () => { + expect( + intervalFromClauses(intervalTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]), + ).toEqual({ + key: "hybridIntervalTasks:byValue", + intervals: [i([1, MIN], [2, MAX])], + indexCols: ["value", "id"], + supportsPartialLimitCoverage: true, + }); + + expect( + intervalFromClauses(intervalTable, "byTitle", [ + { eq: [{ col: "title", val: "same" }] }, + ]), + ).toEqual({ + key: "hybridIntervalTasks:byTitle", + intervals: [i(["same"], ["same"])], + indexCols: ["title"], + supportsPartialLimitCoverage: false, + }); + + expect( + intervalFromClauses(intervalTable, "byIdHash", [ + { eq: [{ col: "id", val: "001" }] }, + ]).supportsPartialLimitCoverage, + ).toBe(true); + }); + + it("rejects invalid interval targets and drops impossible intervals", () => { + expect(() => intervalFromClauses(intervalTable, "byValue", [])).toThrow( + "scan clauses must be provided", + ); + expect(() => intervalFromClauses(intervalTable, "missing", [{}])).toThrow( + "Index not found: missing for table: hybridIntervalTasks", + ); + + expect( + intervalFromClauses(intervalTable, "byValue", [ + { gt: [{ col: "value", val: 2 }], lte: [{ col: "value", val: 2 }] }, + ]).intervals, + ).toEqual([]); + }); + + it("detects empty intervals at reversed and exclusive equal boundaries", () => { + expect(isEmptyInterval(i([3], [2]))).toBe(true); + expect(isEmptyInterval(i([2], [2], false, true))).toBe(true); + expect(isEmptyInterval(i([2], [2], true, false))).toBe(true); + expect(isEmptyInterval(i([2], [2], true, true))).toBe(false); + }); + + it("merges overlapping and touching intervals without mutating input order", () => { + const intervals = [ + i([5], [6]), + i([1], [2], true, false), + i([2], [3], true, true), + i([3], [4], false, true), + ]; + + expect(mergeIntervals(intervals)).toEqual([ + i([1], [4], true, true), + i([5], [6]), + ]); + expect(intervals[0]).toEqual(i([5], [6])); + + expect( + mergeIntervals([i([1], [2], true, false), i([2], [3], false, true)]), + ).toEqual([i([1], [2], true, false), i([2], [3], false, true)]); + }); + + it("handles overlap boundaries with inclusive and exclusive endpoints", () => { + expect(hasOverlap(i([1], [2], true, false), i([2], [3]))).toBe(false); + expect(hasOverlap(i([1], [2]), i([2], [3]))).toBe(true); + expect(hasOverlap(i([1], [2]), i([2], [3], false, true))).toBe(false); + expect(hasOverlap(i([3], [4]), i([1], [2]))).toBe(false); + }); + + it("subtracts one interval while preserving boundary inclusivity", () => { + expect(subtractOne(i([1], [5]), i([2], [4]))).toEqual([ + i([1], [2], true, false), + i([4], [5], false, true), + ]); + + expect(subtractOne(i([1], [5]), i([2], [4], false, false))).toEqual([ + i([1], [2]), + i([4], [5]), + ]); + + expect(subtractOne(i([1], [2], true, false), i([2], [3]))).toEqual([ + i([1], [2], true, false), + ]); + }); + + it("subtracts multiple covers and merges the remaining gaps", () => { + expect( + subtractIntervals([i([1], [10])], [i([2], [3]), i([5], [7])]), + ).toEqual([ + i([1], [2], true, false), + i([3], [5], false, false), + i([7], [10], false, true), + ]); + + expect( + subtractIntervals([i([1], [2]), i([2], [3])], [i([1], [3])]), + ).toEqual([]); + }); + + it("derives cached coverage from ascending limited rows across disjoint intervals", () => { + const queryIntervals = [i([1], [2]), i([5], [8])]; + const rows = [ + { id: "001", value: 1 }, + { id: "002", value: 2 }, + { id: "006", value: 6 }, + ] satisfies Row[]; + + expect( + intervalsForLimitedRows(queryIntervals, rows, ["value"], "asc"), + ).toEqual([i([1], [2]), i([5], [6])]); + expect( + intervalsForLimitedRows(queryIntervals, [], ["value"], "asc"), + ).toEqual([]); + }); + + it("derives cached coverage from descending limited rows across disjoint intervals", () => { + const queryIntervals = [i([1], [2]), i([5], [8])]; + const rows = [ + { id: "008", value: 8 }, + { id: "007", value: 7 }, + { id: "002", value: 2 }, + ] satisfies Row[]; + + expect( + intervalsForLimitedRows(queryIntervals, rows, ["value"], "desc"), + ).toEqual([i([2], [2]), i([5], [8])]); + }); + + it("decides when a limited result can be satisfied before the first uncovered gap", () => { + expect( + canServeLimitedResultFromCache( + [ + { id: "001", value: 1 }, + { id: "002", value: 2 }, + ], + 2, + [i([3], [10])], + ["value"], + "asc", + ), + ).toBe(true); + + expect( + canServeLimitedResultFromCache( + [ + { id: "001", value: 1 }, + { id: "003", value: 3 }, + ], + 2, + [i([3], [10])], + ["value"], + "asc", + ), + ).toBe(false); + + expect( + canServeLimitedResultFromCache( + [ + { id: "010", value: 10 }, + { id: "006", value: 6 }, + ], + 2, + [i([1], [5])], + ["value"], + "desc", + ), + ).toBe(true); + + expect( + canServeLimitedResultFromCache( + [{ id: "010", value: 10 }], + 2, + [i([1], [5])], + ["value"], + "desc", + ), + ).toBe(false); + }); + + it("merges interval coverage maps", () => { + const parent = createHybridIntervalCache(); + const child = createHybridIntervalCache(); + + mergeCoverage(parent, "tasks:byValue", [i([1], [2])]); + mergeCoverage(child, "tasks:byValue", [i([2], [4], false, true)]); + mergeCoverage(child, "tasks:byTitle", [i(["a"], ["a"])]); + mergeCoverageMaps(parent, child); + mergeCoverage(parent, "tasks:byValue", []); + + expect(parent.get("tasks:byValue")).toEqual([i([1], [4])]); + expect(parent.get("tasks:byTitle")).toEqual([i(["a"], ["a"])]); + }); +}); + +describe("HybridDB interval cache edge cases", () => { + it("caches empty unlimited misses so repeated empty reads stay in memory", async () => { + const { db, primary } = await createDBs(); + const primaryScanSpy = vi.spyOn(primary, "intervalScan"); + + await expect( + db.intervalScan(intervalTable, "byValue", [ + { eq: [{ col: "value", val: 404 }] }, + ]), + ).resolves.toEqual([]); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan(intervalTable, "byValue", [ + { eq: [{ col: "value", val: 404 }] }, + ]), + ).resolves.toEqual([]); + expect(primaryScanSpy).not.toHaveBeenCalled(); + }); + + it("serves descending limited btree reads from cached coverage", async () => { + const { db, primary } = await createDBs(); + const primaryScanSpy = vi.spyOn(primary, "intervalScan"); + const tasks = Array.from({ length: 5 }, (_, index) => + createTask(index + 1), + ); + await new AsyncDB(primary).insert(intervalTable, tasks); + + await expect( + db.intervalScan( + intervalTable, + "byValue", + [{ gte: [{ col: "value", val: 1 }] }], + { order: "desc", limit: 2 }, + ), + ).resolves.toEqual([tasks[4], tasks[3]]); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan( + intervalTable, + "byValue", + [{ gte: [{ col: "value", val: 1 }] }], + { order: "desc", limit: 2 }, + ), + ).resolves.toEqual([tasks[4], tasks[3]]); + expect(primaryScanSpy).not.toHaveBeenCalled(); + + await expect( + db.intervalScan(intervalTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 3 }] }, + ]), + ).resolves.toEqual(tasks.slice(0, 3)); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + }); + + it("clears cached interval coverage when tables are reloaded", async () => { + const { db, primary } = await createDBs(); + const primaryScanSpy = vi.spyOn(primary, "intervalScan"); + + await expect( + db.intervalScan(intervalTable, "byValue", [ + { eq: [{ col: "value", val: 404 }] }, + ]), + ).resolves.toEqual([]); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan(intervalTable, "byValue", [ + { eq: [{ col: "value", val: 404 }] }, + ]), + ).resolves.toEqual([]); + expect(primaryScanSpy).not.toHaveBeenCalled(); + + primaryScanSpy.mockClear(); + await db.loadTables([intervalTable]); + await expect( + db.intervalScan(intervalTable, "byValue", [ + { eq: [{ col: "value", val: 404 }] }, + ]), + ).resolves.toEqual([]); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/hyperdb/src/hyperdb/runtime/hybrid-db-intervals.ts b/packages/hyperdb/src/hyperdb/runtime/hybrid-db-intervals.ts new file mode 100644 index 0000000..691eb73 --- /dev/null +++ b/packages/hyperdb/src/hyperdb/runtime/hybrid-db-intervals.ts @@ -0,0 +1,424 @@ +import type { DBCmd } from "../commands/async"; +import { convertWhereToBound } from "../core/query/bounds"; +import { compareTuple, normalizeTupleBounds } from "../core/query/tuple"; +import type { HyperDB } from "../core/contracts"; +import { + MAX, + MIN, + type Row, + type SelectOptions, + type Tuple, + type TupleScanOptions, + type Value, + type WhereClause, +} from "../core/primitives"; +import type { SelectCommandEvent, SelectScanSource } from "../core/tracer"; +import type { + ExtractIndexes, + ExtractSchema, + TableDefinition, +} from "../schema/table"; + +export type NormalizedInterval = { + lower: Tuple; + lowerInclusive: boolean; + upper: Tuple; + upperInclusive: boolean; +}; + +export type IntervalTarget = { + key: string; + intervals: NormalizedInterval[]; + indexCols: string[]; + supportsPartialLimitCoverage: boolean; +}; + +export type HybridIntervalCache = Map; + +export const createHybridIntervalCache = (): HybridIntervalCache => new Map(); + +/* + * HybridDB cache coverage is tracked per table index as normalized tuple + * intervals. The current cache plan is intentionally conservative: + * + * - fully covered scans are served from the in-memory cache; + * - sub-ranges of previously loaded ranges are covered; + * - overlapping and touching coverage is merged; + * - disjoint multi-clause scans use cache when every target interval is covered; + * - unlimited empty misses still mark the requested range as covered; + * - limited btree scans cache the proven prefix or suffix up to the last + * returned row, including duplicate index values via an appended id column; + * - full hash/equality lookups are cached, while partial limited coverage is + * only trusted for btree indexes or indexes whose first column is id; + * - transaction coverage is merged into the parent DB only on commit; + * - loadTables clears coverage because table contents may have changed. + * + * Not supported yet: mixed cache/persistent plans. If an unlimited query is + * only partially covered, HybridDB currently scans the whole query from the + * primary store instead of fetching only uncovered intervals and merging them + * with cached rows. + */ + +const cacheKey = (table: TableDefinition, indexName: string) => + `${table.tableName}:${indexName}`; + +const minTuple = (count: number): Tuple => + Array.from({ length: count }, () => MIN); + +const maxTuple = (count: number): Tuple => + Array.from({ length: count }, () => MAX); + +const rowToTuple = (row: Row, indexCols: string[]): Tuple => + indexCols.map((col) => row[col] as Value); + +export const boundToInterval = ( + bound: TupleScanOptions, + tupleCount: number, +): NormalizedInterval => { + const normalized = normalizeTupleBounds(bound, tupleCount); + return { + lower: normalized.gte ?? normalized.gt ?? minTuple(tupleCount), + lowerInclusive: normalized.gte !== undefined || normalized.gt === undefined, + upper: normalized.lte ?? normalized.lt ?? maxTuple(tupleCount), + upperInclusive: normalized.lte !== undefined || normalized.lt === undefined, + }; +}; + +export const intervalFromClauses = ( + table: TableDefinition, + indexName: string, + clauses: WhereClause[], +): IntervalTarget => { + if (clauses.length === 0) { + throw new Error("scan clauses must be provided"); + } + + const indexConfig = table.indexes[indexName]; + if (!indexConfig) { + throw new Error( + `Index not found: ${indexName} for table: ${table.tableName}`, + ); + } + + const indexCols = indexConfig.cols as string[]; + const sortCols = + indexConfig.type === "btree" && indexCols[indexCols.length - 1] !== "id" + ? [...indexCols, "id"] + : indexCols; + const intervals = convertWhereToBound(indexCols, clauses) + .map((bound) => boundToInterval(bound, sortCols.length)) + .filter((interval) => !isEmptyInterval(interval)); + + return { + key: cacheKey(table, indexName), + intervals: mergeIntervals(intervals), + indexCols: sortCols, + supportsPartialLimitCoverage: + indexConfig.type === "btree" || indexCols[0] === "id", + }; +}; + +export const isEmptyInterval = (interval: NormalizedInterval) => { + const cmp = compareTuple(interval.lower, interval.upper); + return ( + cmp > 0 || + (cmp === 0 && (!interval.lowerInclusive || !interval.upperInclusive)) + ); +}; + +const compareLower = (a: NormalizedInterval, b: NormalizedInterval) => { + const cmp = compareTuple(a.lower, b.lower); + if (cmp !== 0) return cmp; + if (a.lowerInclusive === b.lowerInclusive) return 0; + return a.lowerInclusive ? -1 : 1; +}; + +const compareUpper = (a: NormalizedInterval, b: NormalizedInterval) => { + const cmp = compareTuple(a.upper, b.upper); + if (cmp !== 0) return cmp; + if (a.upperInclusive === b.upperInclusive) return 0; + return a.upperInclusive ? 1 : -1; +}; + +const canMerge = (a: NormalizedInterval, b: NormalizedInterval) => { + const cmp = compareTuple(a.upper, b.lower); + return cmp > 0 || (cmp === 0 && (a.upperInclusive || b.lowerInclusive)); +}; + +const mergeTwo = ( + a: NormalizedInterval, + b: NormalizedInterval, +): NormalizedInterval => { + const upperCmp = compareUpper(a, b); + return { + lower: a.lower, + lowerInclusive: a.lowerInclusive, + upper: upperCmp >= 0 ? a.upper : b.upper, + upperInclusive: upperCmp >= 0 ? a.upperInclusive : b.upperInclusive, + }; +}; + +export const mergeIntervals = ( + intervals: NormalizedInterval[], +): NormalizedInterval[] => { + const sorted = [...intervals].sort(compareLower); + const merged: NormalizedInterval[] = []; + + for (const interval of sorted) { + const last = merged.at(-1); + if (!last || !canMerge(last, interval)) { + merged.push(interval); + } else { + merged[merged.length - 1] = mergeTwo(last, interval); + } + } + + return merged; +}; + +export const hasOverlap = (a: NormalizedInterval, b: NormalizedInterval) => { + const leftCmp = compareTuple(a.upper, b.lower); + if ( + leftCmp < 0 || + (leftCmp === 0 && (!a.upperInclusive || !b.lowerInclusive)) + ) { + return false; + } + + const rightCmp = compareTuple(b.upper, a.lower); + if ( + rightCmp < 0 || + (rightCmp === 0 && (!b.upperInclusive || !a.lowerInclusive)) + ) { + return false; + } + + return true; +}; + +export const subtractOne = ( + target: NormalizedInterval, + cover: NormalizedInterval, +): NormalizedInterval[] => { + if (!hasOverlap(target, cover)) return [target]; + + const result: NormalizedInterval[] = []; + const lowerCmp = compareTuple(target.lower, cover.lower); + if ( + lowerCmp < 0 || + (lowerCmp === 0 && target.lowerInclusive && !cover.lowerInclusive) + ) { + result.push({ + lower: target.lower, + lowerInclusive: target.lowerInclusive, + upper: cover.lower, + upperInclusive: !cover.lowerInclusive, + }); + } + + const upperCmp = compareTuple(cover.upper, target.upper); + if ( + upperCmp < 0 || + (upperCmp === 0 && !cover.upperInclusive && target.upperInclusive) + ) { + result.push({ + lower: cover.upper, + lowerInclusive: !cover.upperInclusive, + upper: target.upper, + upperInclusive: target.upperInclusive, + }); + } + + return result.filter((interval) => !isEmptyInterval(interval)); +}; + +export const subtractIntervals = ( + targets: NormalizedInterval[], + covers: NormalizedInterval[], +) => { + let remaining = targets; + for (const cover of covers) { + remaining = remaining.flatMap((target) => subtractOne(target, cover)); + } + return mergeIntervals(remaining); +}; + +export const intervalsForLimitedRows = ( + queryIntervals: NormalizedInterval[], + rows: Row[], + indexCols: string[], + order: SelectOptions["order"], +) => { + if (rows.length === 0) return []; + + const lastTuple = rowToTuple(rows[rows.length - 1], indexCols); + + if (order === "desc") { + const intervals = [...queryIntervals].sort((a, b) => -compareUpper(a, b)); + const covered: NormalizedInterval[] = []; + for (const interval of intervals) { + if (compareTuple(interval.upper, lastTuple) < 0) break; + + if (compareTuple(interval.lower, lastTuple) >= 0) { + covered.push(interval); + } else { + covered.push({ + lower: lastTuple, + lowerInclusive: true, + upper: interval.upper, + upperInclusive: interval.upperInclusive, + }); + break; + } + } + return mergeIntervals(covered); + } + + const intervals = [...queryIntervals].sort(compareLower); + const covered: NormalizedInterval[] = []; + for (const interval of intervals) { + if (compareTuple(interval.lower, lastTuple) > 0) break; + + if (compareTuple(interval.upper, lastTuple) <= 0) { + covered.push(interval); + } else { + covered.push({ + lower: interval.lower, + lowerInclusive: interval.lowerInclusive, + upper: lastTuple, + upperInclusive: true, + }); + break; + } + } + return mergeIntervals(covered); +}; + +export const mergeCoverage = ( + cachedIntervals: HybridIntervalCache, + key: string, + intervals: NormalizedInterval[], +) => { + if (intervals.length === 0) return; + cachedIntervals.set( + key, + mergeIntervals([...(cachedIntervals.get(key) ?? []), ...intervals]), + ); +}; + +export const mergeCoverageMaps = ( + parent: HybridIntervalCache, + child: HybridIntervalCache, +) => { + for (const [key, intervals] of child) { + mergeCoverage(parent, key, intervals); + } +}; + +export const canServeLimitedResultFromCache = ( + cacheRows: Row[], + limit: number | undefined, + uncovered: NormalizedInterval[], + indexCols: string[], + order: SelectOptions["order"], +) => { + if ( + limit === undefined || + cacheRows.length < limit || + uncovered.length === 0 + ) { + return false; + } + + const lastTuple = rowToTuple(cacheRows[cacheRows.length - 1], indexCols); + if (order === "desc") { + const firstGap = [...uncovered].sort((a, b) => -compareUpper(a, b))[0]; + const cmp = compareTuple(lastTuple, firstGap.upper); + return cmp > 0 || (cmp === 0 && !firstGap.upperInclusive); + } + + const firstGap = [...uncovered].sort(compareLower)[0]; + const cmp = compareTuple(lastTuple, firstGap.lower); + return cmp < 0 || (cmp === 0 && !firstGap.lowerInclusive); +}; + +const setSelectSource = ( + event: SelectCommandEvent | undefined, + source: SelectScanSource, +): void => { + if (event) { + event.source = source; + } +}; + +export function* hybridIntervalScan( + primary: HyperDB, + cache: HyperDB, + cachedIntervals: HybridIntervalCache, + selectEvent: SelectCommandEvent | undefined, + table: TTable, + indexName: keyof ExtractIndexes, + clauses: WhereClause[], + selectOptions?: SelectOptions, +): Generator[]> { + if (selectOptions?.limit === 0) return []; + + const target = intervalFromClauses(table, indexName as string, clauses); + const cached = cachedIntervals.get(target.key) ?? []; + const uncovered = subtractIntervals(target.intervals, cached); + + if (uncovered.length === 0) { + setSelectSource(selectEvent, "in-mem"); + return yield* cache.intervalScan(table, indexName, clauses, selectOptions); + } + + if (selectOptions?.limit !== undefined) { + setSelectSource(selectEvent, "in-mem"); + const cacheRows = yield* cache.intervalScan( + table, + indexName, + clauses, + selectOptions, + ); + if ( + canServeLimitedResultFromCache( + cacheRows as Row[], + selectOptions.limit, + uncovered, + target.indexCols, + selectOptions.order, + ) + ) { + return cacheRows; + } + } + + setSelectSource(selectEvent, "persist"); + const primaryRows = yield* primary.intervalScan( + table, + indexName, + clauses, + selectOptions, + ); + + if (primaryRows.length > 0) { + yield* cache.upsert(table, primaryRows); + } + + const fullyLoaded = + selectOptions?.limit === undefined || + primaryRows.length < selectOptions.limit; + const loadedIntervals = fullyLoaded + ? target.intervals + : target.supportsPartialLimitCoverage + ? intervalsForLimitedRows( + target.intervals, + primaryRows as Row[], + target.indexCols, + selectOptions.order, + ) + : []; + mergeCoverage(cachedIntervals, target.key, loadedIntervals); + + return primaryRows; +} diff --git a/packages/hyperdb/src/hyperdb/runtime/hybrid-db.test.ts b/packages/hyperdb/src/hyperdb/runtime/hybrid-db.test.ts new file mode 100644 index 0000000..7d6bc73 --- /dev/null +++ b/packages/hyperdb/src/hyperdb/runtime/hybrid-db.test.ts @@ -0,0 +1,504 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { DB } from "./db"; +import { HybridDB } from "./hybrid-db"; +import { SubscribableDB } from "./subscribable-db"; +import { AsyncDB } from "../test-utils/async-db"; +import { BptreeInmemDriver } from "../drivers/inmemory/bptree-inmem-driver"; +import { defineTable } from "../schema/table"; +import { v } from "../schema/values"; +import { select, selectAsync } from "../commands/selector/selector"; +import { selectFrom } from "../commands/selector/builder"; +import { unwrap } from "../commands/async"; +import { + hyperDBTraceStore, + traceRootsRuntimeTable, + type RootTrace, + type SelectScanSource, +} from "../tracing"; + +type Task = { + id: string; + title: string; + value: number; + projectId: string; +}; + +const tasksTable = defineTable("hybridTasks", { + id: v.string(), + title: v.string(), + value: v.number(), + projectId: v.string(), +}) + .index("byValue", ["value"]) + .index("byTitle", ["title"], { type: "hash" }) + .index("byProjectValue", ["projectId", "value"]); + +const createTask = (value: number, title = `Task ${value}`): Task => ({ + id: String(value).padStart(3, "0"), + title, + value, + projectId: value <= 3 ? "a" : "b", +}); + +const createDBs = async () => { + const primary = new DB(new BptreeInmemDriver()); + const cache = new DB(new BptreeInmemDriver()); + const primaryScanSpy = vi.spyOn(primary, "intervalScan"); + const cacheScanSpy = vi.spyOn(cache, "intervalScan"); + const hybrid = new HybridDB(primary, cache); + const db = new AsyncDB(hybrid); + + await db.loadTables([tasksTable]); + + return { db, hybrid, primary, cache, primaryScanSpy, cacheScanSpy }; +}; + +const deferred = () => { + let resolve!: () => void; + let reject!: (error: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + return { promise, resolve, reject }; +}; + +const waitOneTurn = () => new Promise((resolve) => setTimeout(resolve, 0)); + +const selectCommittedTraces = (limit = 20): RootTrace[] => + select( + hyperDBTraceStore.getDB(), + (function* () { + const rows = yield* selectFrom(traceRootsRuntimeTable, "byCreatedSeq") + .order("desc") + .limit(limit); + return hyperDBTraceStore.resolveTraceRows(rows); + })(), + ); + +let deactivateTraceStore: (() => void) | undefined; + +const lastSelectSource = (): SelectScanSource | undefined => { + hyperDBTraceStore.flushTraceCommits(); + return selectCommittedTraces()[0]?.commandEvents[0]?.source; +}; + +const selectByValue = ( + minValue: number, + maxValue?: number, + limit?: number, +): Generator => + (function* () { + let query = selectFrom(tasksTable, "byValue").where((q) => { + const minQuery = q.gte("value", minValue); + return maxValue === undefined + ? minQuery + : minQuery.lte("value", maxValue); + }); + + if (limit !== undefined) { + query = query.limit(limit); + } + + return yield* query; + })(); + +describe("HybridDB", () => { + beforeEach(() => { + deactivateTraceStore = hyperDBTraceStore.activate(); + hyperDBTraceStore.setMaxTraces(200); + hyperDBTraceStore.clear(); + }); + + afterEach(() => { + hyperDBTraceStore.clear(); + deactivateTraceStore?.(); + deactivateTraceStore = undefined; + }); + + it("loads range misses from primary and serves repeated reads from cache", async () => { + const { db, primary, primaryScanSpy, cacheScanSpy } = await createDBs(); + const tasks = [createTask(1), createTask(2), createTask(3)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + const first = await db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 3 }] }, + ]); + expect(first).toEqual(tasks); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + + primaryScanSpy.mockClear(); + cacheScanSpy.mockClear(); + + const second = await db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 3 }] }, + ]); + expect(second).toEqual(tasks); + expect(primaryScanSpy).not.toHaveBeenCalled(); + expect(cacheScanSpy).toHaveBeenCalledTimes(1); + }); + + it("caches limited btree reads up to the unique returned row", async () => { + const { db, primary, primaryScanSpy } = await createDBs(); + const tasks = Array.from({ length: 6 }, (_, index) => + createTask(index + 1), + ); + await new AsyncDB(primary).insert(tasksTable, tasks); + + await expect( + db.intervalScan( + tasksTable, + "byValue", + [{ gte: [{ col: "value", val: 1 }] }], + { limit: 2 }, + ), + ).resolves.toEqual(tasks.slice(0, 2)); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan( + tasksTable, + "byValue", + [{ gte: [{ col: "value", val: 1 }] }], + { limit: 2 }, + ), + ).resolves.toEqual(tasks.slice(0, 2)); + expect(primaryScanSpy).not.toHaveBeenCalled(); + + await expect( + db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 3 }], lte: [{ col: "value", val: 3 }] }, + ]), + ).resolves.toEqual([tasks[2]]); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + }); + + it("records persist for the first traced HybridDB scan", async () => { + const { hybrid, primary } = await createDBs(); + const tasks = [createTask(1), createTask(2), createTask(3)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + await expect(selectAsync(hybrid, selectByValue(1, 3))).resolves.toEqual( + tasks, + ); + + expect(lastSelectSource()).toBe("persist"); + }); + + it("records in-mem for repeated traced HybridDB scans served from cache", async () => { + const { hybrid, primary } = await createDBs(); + const tasks = [createTask(1), createTask(2), createTask(3)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + await selectAsync(hybrid, selectByValue(1, 3)); + hyperDBTraceStore.clear(); + + await expect(selectAsync(hybrid, selectByValue(1, 3))).resolves.toEqual( + tasks, + ); + + expect(lastSelectSource()).toBe("in-mem"); + }); + + it("records sources through SubscribableDB-wrapped HybridDB scans", async () => { + const { hybrid, primary } = await createDBs(); + const db = new SubscribableDB(hybrid); + const tasks = [createTask(1), createTask(2), createTask(3)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + await expect(selectAsync(db, selectByValue(1, 3))).resolves.toEqual(tasks); + + expect(lastSelectSource()).toBe("persist"); + }); + + it("records persist when a limited cache probe falls back to primary", async () => { + const { hybrid, primary } = await createDBs(); + const tasks = Array.from({ length: 4 }, (_, index) => + createTask(index + 1), + ); + await new AsyncDB(primary).insert(tasksTable, tasks); + + await selectAsync(hybrid, selectByValue(1, undefined, 2)); + hyperDBTraceStore.clear(); + + await expect( + selectAsync(hybrid, selectByValue(1, undefined, 3)), + ).resolves.toEqual(tasks.slice(0, 3)); + + expect(lastSelectSource()).toBe("persist"); + }); + + it("does not record a scan source for limit 0 traced HybridDB scans", async () => { + const { hybrid, primary } = await createDBs(); + await new AsyncDB(primary).insert(tasksTable, [createTask(1)]); + + await expect( + selectAsync(hybrid, selectByValue(1, undefined, 0)), + ).resolves.toEqual([]); + + expect(lastSelectSource()).toBeUndefined(); + }); + + it("uses the appended id cursor for limited duplicate btree values", async () => { + const { db, primary, primaryScanSpy } = await createDBs(); + const tasks: Task[] = [ + { ...createTask(1, "A"), id: "a" }, + { ...createTask(1, "B"), id: "b" }, + { ...createTask(1, "C"), id: "c" }, + ]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + await expect( + db.intervalScan( + tasksTable, + "byValue", + [{ gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 1 }] }], + { limit: 2 }, + ), + ).resolves.toEqual(tasks.slice(0, 2)); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan( + tasksTable, + "byValue", + [{ gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 1 }] }], + { limit: 2 }, + ), + ).resolves.toEqual(tasks.slice(0, 2)); + expect(primaryScanSpy).not.toHaveBeenCalled(); + + await expect( + db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 1 }] }, + ]), + ).resolves.toEqual(tasks); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + }); + + it("serves hash equality lookups from cache after the first read", async () => { + const { db, primary, primaryScanSpy } = await createDBs(); + const tasks = [createTask(1, "same"), createTask(2, "same")]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + await expect( + db.intervalScan(tasksTable, "byTitle", [ + { eq: [{ col: "title", val: "same" }] }, + ]), + ).resolves.toEqual(tasks); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan(tasksTable, "byTitle", [ + { eq: [{ col: "title", val: "same" }] }, + ]), + ).resolves.toEqual(tasks); + expect(primaryScanSpy).not.toHaveBeenCalled(); + }); + + it("does not mark a limited non-id hash lookup as fully cached", async () => { + const { db, primary, primaryScanSpy } = await createDBs(); + const tasks = [createTask(1, "same"), createTask(2, "same")]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + await expect( + db.intervalScan( + tasksTable, + "byTitle", + [{ eq: [{ col: "title", val: "same" }] }], + { limit: 1 }, + ), + ).resolves.toEqual([tasks[0]]); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan(tasksTable, "byTitle", [ + { eq: [{ col: "title", val: "same" }] }, + ]), + ).resolves.toEqual(tasks); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + }); + + it("serializes read miss cache fill against transactions", async () => { + const { db, primary } = await createDBs(); + const tasks = [createTask(1), createTask(2)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + const scanStarted = deferred(); + const resumeScan = deferred(); + const originalPrimaryScan = DB.prototype.intervalScan.bind(primary); + vi.spyOn(primary, "intervalScan").mockImplementation( + function* (table, indexName, clauses, selectOptions) { + scanStarted.resolve(); + yield* unwrap(resumeScan.promise); + return yield* originalPrimaryScan( + table, + indexName, + clauses, + selectOptions, + ); + }, + ); + + const readPromise = db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]); + await scanStarted.promise; + + let txResolved = false; + const txPromise = db.beginTx().then((tx) => { + txResolved = true; + return tx; + }); + await waitOneTurn(); + expect(txResolved).toBe(false); + + resumeScan.resolve(); + await expect(readPromise).resolves.toEqual(tasks); + + const tx = await txPromise; + expect(txResolved).toBe(true); + await tx.rollback(); + }); + + it("shares the concurrency lock across withTraits wrappers", async () => { + const { hybrid, primary } = await createDBs(); + const db = new AsyncDB(hybrid); + const traitedDb = new AsyncDB(hybrid.withTraits({ type: "traited" })); + const tasks = [createTask(1), createTask(2)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + const scanStarted = deferred(); + const resumeScan = deferred(); + const originalPrimaryScan = DB.prototype.intervalScan.bind(primary); + vi.spyOn(primary, "intervalScan").mockImplementation( + function* (table, indexName, clauses, selectOptions) { + scanStarted.resolve(); + yield* unwrap(resumeScan.promise); + return yield* originalPrimaryScan( + table, + indexName, + clauses, + selectOptions, + ); + }, + ); + + const readPromise = traitedDb.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]); + await scanStarted.promise; + + let txResolved = false; + const txPromise = db.beginTx().then((tx) => { + txResolved = true; + return tx; + }); + await waitOneTurn(); + expect(txResolved).toBe(false); + + resumeScan.resolve(); + await expect(readPromise).resolves.toEqual(tasks); + + const tx = await txPromise; + expect(txResolved).toBe(true); + await tx.rollback(); + }); + + it("writes through to primary and cache synchronously", async () => { + const { db, primary, cache } = await createDBs(); + const task = createTask(1); + + await db.insert(tasksTable, [task]); + await expect( + new AsyncDB(primary).intervalScan(tasksTable, "byValue", [ + { eq: [{ col: "value", val: 1 }] }, + ]), + ).resolves.toEqual([task]); + await expect( + new AsyncDB(cache).intervalScan(tasksTable, "byValue", [ + { eq: [{ col: "value", val: 1 }] }, + ]), + ).resolves.toEqual([task]); + + const updated = { ...task, title: "updated" }; + await db.upsert(tasksTable, [updated]); + await expect( + new AsyncDB(primary).intervalScan(tasksTable, "byTitle", [ + { eq: [{ col: "title", val: "updated" }] }, + ]), + ).resolves.toEqual([updated]); + await expect( + new AsyncDB(cache).intervalScan(tasksTable, "byTitle", [ + { eq: [{ col: "title", val: "updated" }] }, + ]), + ).resolves.toEqual([updated]); + + await db.delete(tasksTable, [task.id]); + await expect( + new AsyncDB(primary).intervalScan(tasksTable, "byValue", [ + { eq: [{ col: "value", val: 1 }] }, + ]), + ).resolves.toEqual([]); + await expect( + new AsyncDB(cache).intervalScan(tasksTable, "byValue", [ + { eq: [{ col: "value", val: 1 }] }, + ]), + ).resolves.toEqual([]); + }); + + it("keeps transaction reads and writes consistent before commit", async () => { + const { db, primary, primaryScanSpy } = await createDBs(); + const tasks = [createTask(1), createTask(2)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + const tx = await db.beginTx(); + await expect( + tx.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]), + ).resolves.toEqual(tasks); + + const inserted = createTask(3); + await tx.insert(tasksTable, [inserted]); + await expect( + tx.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 3 }] }, + ]), + ).resolves.toEqual([...tasks, inserted]); + + await tx.commit(); + primaryScanSpy.mockClear(); + await expect( + db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]), + ).resolves.toEqual(tasks); + expect(primaryScanSpy).not.toHaveBeenCalled(); + }); + + it("does not publish transaction cache coverage on rollback", async () => { + const { db, primary, primaryScanSpy } = await createDBs(); + const tasks = [createTask(1), createTask(2)]; + await new AsyncDB(primary).insert(tasksTable, tasks); + + const tx = await db.beginTx(); + await expect( + tx.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]), + ).resolves.toEqual(tasks); + await tx.rollback(); + + primaryScanSpy.mockClear(); + await expect( + db.intervalScan(tasksTable, "byValue", [ + { gte: [{ col: "value", val: 1 }], lte: [{ col: "value", val: 2 }] }, + ]), + ).resolves.toEqual(tasks); + expect(primaryScanSpy).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/hyperdb/src/hyperdb/runtime/hybrid-db.ts b/packages/hyperdb/src/hyperdb/runtime/hybrid-db.ts new file mode 100644 index 0000000..a3ecb9e --- /dev/null +++ b/packages/hyperdb/src/hyperdb/runtime/hybrid-db.ts @@ -0,0 +1,402 @@ +import type { DBCmd } from "../commands/async"; +import { unwrap } from "../commands/async"; +import type { HyperDB, HyperDBTx } from "../core/contracts"; +import { + type SelectOptions, + type Trait, + type WhereClause, +} from "../core/primitives"; +import { + getCurrentSelectEventForDB, + type HyperDBTracerOption, +} from "../core/tracer"; +import { DEFAULT_CODEC_OPTIONS, type CodecOptions } from "../storage/codec"; +import type { + ExtractIndexes, + ExtractSchema, + TableDefinition, +} from "../schema/table"; +import AwaitLock from "../utils/await-lock"; +import { refVar, type RefVar } from "../utils"; +import { + createHybridIntervalCache, + hybridIntervalScan, + mergeCoverageMaps, + type HybridIntervalCache, +} from "./hybrid-db-intervals"; + +type HybridDBState = { + cachedIntervals: HybridIntervalCache; + lock: AwaitLock; +}; + +type HybridDBTxState = { + cachedIntervals: HybridIntervalCache; + committed: RefVar; + rollbacked: RefVar; + txCounter: RefVar; + releaseLock: () => void; +}; + +const createHybridDBState = (): HybridDBState => ({ + cachedIntervals: createHybridIntervalCache(), + lock: new AwaitLock(), +}); + +const createHybridDBTxState = (releaseLock: () => void): HybridDBTxState => ({ + cachedIntervals: createHybridIntervalCache(), + committed: refVar(false), + rollbacked: refVar(false), + txCounter: refVar(1), + releaseLock, +}); + +export type HybridDBOptions = { + traits?: Trait[]; +}; + +function* acquireHybridLock( + state: HybridDBState, +): Generator void> { + if (!state.lock.tryAcquire()) { + yield* unwrap(state.lock.acquireAsync()); + } + + let released = false; + return () => { + if (released) return; + released = true; + state.lock.release(); + }; +} + +function* withHybridLock( + state: HybridDBState, + run: () => Generator, +): Generator { + const release = yield* acquireHybridLock(state); + try { + return yield* run(); + } finally { + release(); + } +} + +export class HybridDB implements HyperDB { + primary: HyperDB; + cache: HyperDB; + traits: Trait[] = []; + private state: HybridDBState; + + constructor(primary: HyperDB, cache: HyperDB, options: HybridDBOptions = {}) { + this.primary = primary; + this.cache = cache; + this.traits = options.traits ?? []; + this.state = createHybridDBState(); + } + + withTraits(...traits: Trait[]): HyperDB { + const db = new HybridDB(this.primary, this.cache, { + traits: [...this.traits, ...traits], + }); + db.state = this.state; + return db; + } + + getTraits(): Trait[] { + return [...this.traits, ...this.primary.getTraits()]; + } + + getId(): string { + return this.primary.getId(); + } + + getDBName(): string | undefined { + return this.primary.getDBName?.(); + } + + getTracer(): HyperDBTracerOption | undefined { + return this.primary.getTracer?.(); + } + + getOptions(): CodecOptions { + return this.primary.getOptions?.() ?? DEFAULT_CODEC_OPTIONS; + } + + *loadTables(tables: TableDefinition[]): Generator { + yield* withHybridLock( + this.state, + function* () { + yield* this.primary.loadTables(tables); + yield* this.cache.loadTables(tables); + this.state.cachedIntervals.clear(); + }.bind(this), + ); + } + + *beginTx(): Generator { + const release = yield* acquireHybridLock(this.state); + let primaryTx: HyperDBTx | undefined; + try { + primaryTx = yield* this.primary.beginTx(); + const cacheTx = yield* this.cache.beginTx(); + return new HybridDBTx(this, primaryTx, cacheTx, release); + } catch (error) { + if (primaryTx) { + try { + yield* primaryTx.rollback(); + } catch { + // Preserve the original beginTx error. + } + } + release(); + throw error; + } + } + + *intervalScan< + TTable extends TableDefinition, + K extends keyof ExtractIndexes, + >( + table: TTable, + indexName: K, + clauses: WhereClause[], + selectOptions?: SelectOptions, + ): Generator[]> { + return yield* withHybridLock( + this.state, + function* () { + return yield* hybridIntervalScan( + this.primary, + this.cache, + this.state.cachedIntervals, + getCurrentSelectEventForDB(this), + table, + indexName, + clauses, + selectOptions, + ); + }.bind(this), + ); + } + + *insert( + table: TTable, + records: ExtractSchema[], + ): Generator { + yield* withHybridLock( + this.state, + function* () { + yield* this.primary.insert(table, records); + yield* this.cache.insert(table, records); + }.bind(this), + ); + } + + *upsert( + table: TTable, + records: ExtractSchema[], + ): Generator { + yield* withHybridLock( + this.state, + function* () { + yield* this.primary.upsert(table, records); + yield* this.cache.upsert(table, records); + }.bind(this), + ); + } + + *delete( + table: TTable, + ids: string[], + ): Generator { + yield* withHybridLock( + this.state, + function* () { + yield* this.primary.delete(table, ids); + yield* this.cache.delete(table, ids); + }.bind(this), + ); + } + + mergeTxCoverage(intervals: HybridIntervalCache): void { + mergeCoverageMaps(this.state.cachedIntervals, intervals); + } +} + +class HybridDBTx implements HyperDBTx { + private hybridDB: HybridDB; + private primaryTx: HyperDBTx; + private cacheTx: HyperDBTx; + private state: HybridDBTxState; + private traits: Trait[]; + + constructor( + hybridDB: HybridDB, + primaryTx: HyperDBTx, + cacheTx: HyperDBTx, + releaseLock: () => void, + state: HybridDBTxState = createHybridDBTxState(releaseLock), + traits: Trait[] = [], + ) { + this.hybridDB = hybridDB; + this.primaryTx = primaryTx; + this.cacheTx = cacheTx; + this.state = state; + this.traits = traits; + } + + withTraits(...traits: Trait[]): HyperDBTx { + return new HybridDBTx( + this.hybridDB, + this.primaryTx, + this.cacheTx, + this.state.releaseLock, + this.state, + [...this.traits, ...traits], + ); + } + + getTraits(): Trait[] { + return [...this.traits, ...this.hybridDB.getTraits()]; + } + + getId(): string { + return this.hybridDB.getId(); + } + + getDBName(): string | undefined { + return this.hybridDB.getDBName?.(); + } + + getTracer(): HyperDBTracerOption | undefined { + return this.hybridDB.getTracer?.(); + } + + getOptions(): CodecOptions { + return this.hybridDB.getOptions?.() ?? DEFAULT_CODEC_OPTIONS; + } + + *loadTables(): Generator { + throw new Error("Not supported"); + } + + *beginTx(): Generator { + this.throwIfDone(); + this.state.txCounter.val++; + return this; + } + + *intervalScan< + TTable extends TableDefinition, + K extends keyof ExtractIndexes, + >( + table: TTable, + indexName: K, + clauses: WhereClause[], + selectOptions?: SelectOptions, + ): Generator[]> { + this.throwIfDone(); + return yield* hybridIntervalScan( + this.primaryTx, + this.cacheTx, + this.state.cachedIntervals, + getCurrentSelectEventForDB(this), + table, + indexName, + clauses, + selectOptions, + ); + } + + *insert( + table: TTable, + records: ExtractSchema[], + ): Generator { + this.throwIfDone(); + yield* this.primaryTx.insert(table, records); + yield* this.cacheTx.insert(table, records); + } + + *upsert( + table: TTable, + records: ExtractSchema[], + ): Generator { + this.throwIfDone(); + yield* this.primaryTx.upsert(table, records); + yield* this.cacheTx.upsert(table, records); + } + + *delete( + table: TTable, + ids: string[], + ): Generator { + this.throwIfDone(); + yield* this.primaryTx.delete(table, ids); + yield* this.cacheTx.delete(table, ids); + } + + *commit(): Generator { + this.throwIfDone(); + this.state.txCounter.val--; + if (this.state.txCounter.val !== 0) return; + + let primaryCommitted = false; + try { + yield* this.primaryTx.commit(); + primaryCommitted = true; + yield* this.cacheTx.commit(); + this.hybridDB.mergeTxCoverage(this.state.cachedIntervals); + this.state.committed.val = true; + } catch (error) { + if (!primaryCommitted) { + try { + yield* this.primaryTx.rollback(); + } catch { + // Preserve the original commit error. + } + } + try { + yield* this.cacheTx.rollback(); + } catch { + // Preserve the original commit error. + } + throw error; + } finally { + this.state.releaseLock(); + } + } + + *rollback(): Generator { + this.throwIfDone(); + let rollbackError: unknown; + try { + try { + yield* this.primaryTx.rollback(); + } catch (error) { + rollbackError = error; + } + try { + yield* this.cacheTx.rollback(); + } catch (error) { + rollbackError ??= error; + } + this.state.rollbacked.val = true; + if (rollbackError) { + throw rollbackError; + } + } finally { + this.state.releaseLock(); + } + } + + private throwIfDone() { + if (this.state.committed.val) { + throw new Error("Cannot modify a committed tx"); + } + + if (this.state.rollbacked.val) { + throw new Error("Cannot modify a rollbacked tx"); + } + } +} diff --git a/packages/hyperdb/src/hyperdb/runtime/subscribable-db.ts b/packages/hyperdb/src/hyperdb/runtime/subscribable-db.ts index 839a76d..048e48d 100644 --- a/packages/hyperdb/src/hyperdb/runtime/subscribable-db.ts +++ b/packages/hyperdb/src/hyperdb/runtime/subscribable-db.ts @@ -175,12 +175,10 @@ export class SubscribableDBTx implements HyperDBTx { ): Generator[]> { this.throwIfDone(); - return yield* this.txDb.intervalScan( - table, - indexName, - clauses, - selectOptions, - ); + const txDb = + this.traits.length > 0 ? this.txDb.withTraits(...this.traits) : this.txDb; + + return yield* txDb.intervalScan(table, indexName, clauses, selectOptions); } *insert>( @@ -497,12 +495,18 @@ export class SubscribableDB implements HyperDB { return this.state.revision.val; } + private delegateDB(): HyperDB { + return this.traits.length > 0 + ? this.db.withTraits(...this.traits) + : this.db; + } + loadTables(tables: TableDefinition[]): Generator { return this.db.loadTables(tables); } *beginTx(): Generator { - return new SubscribableDBTx(this, yield* this.db.beginTx()); + return new SubscribableDBTx(this, yield* this.delegateDB().beginTx()); } withTraits(...traits: Trait[]): HyperDB { @@ -585,7 +589,7 @@ export class SubscribableDB implements HyperDB { return []; } - return yield* this.db.intervalScan( + return yield* this.delegateDB().intervalScan( table, indexName, clauses, diff --git a/packages/hyperdb/src/hyperdb/tracing/context.ts b/packages/hyperdb/src/hyperdb/tracing/context.ts index 145e72b..24c5f0f 100644 --- a/packages/hyperdb/src/hyperdb/tracing/context.ts +++ b/packages/hyperdb/src/hyperdb/tracing/context.ts @@ -1,9 +1,18 @@ export { + currentSelectEventTrait, + currentSelectEventTraitType, + getCurrentSelectEventForDB, + getCurrentSelectEventFromTraits, getTraceContextForDB, getTraceContextFromTraits, + isCurrentSelectEventTrait, isTraceContextTrait, traceContextTrait, traceContextTraitType, + withCurrentSelectEventTrait, withTraceContextTrait, } from "../core/tracer"; -export type { TraceContextTrait } from "../core/tracer"; +export type { + CurrentSelectEventTrait, + TraceContextTrait, +} from "../core/tracer"; diff --git a/packages/hyperdb/src/hyperdb/tracing/store.ts b/packages/hyperdb/src/hyperdb/tracing/store.ts index 246cca2..76636e6 100644 --- a/packages/hyperdb/src/hyperdb/tracing/store.ts +++ b/packages/hyperdb/src/hyperdb/tracing/store.ts @@ -39,6 +39,7 @@ export type { MutationEventKind, RootTrace, SelectCommandEvent, + SelectScanSource, SerializableTraceValue, TraceContext, TraceError, diff --git a/packages/hyperdb/src/react/hooks.test.ts b/packages/hyperdb/src/react/hooks.test.ts index d636c8e..4b580f1 100644 --- a/packages/hyperdb/src/react/hooks.test.ts +++ b/packages/hyperdb/src/react/hooks.test.ts @@ -19,6 +19,7 @@ const mocks = { setResult: vi.fn(), initCachedSelector: vi.fn(), runSelectorAsync: vi.fn(), + runSelectorMaybeAsync: vi.fn(), isNeedToRerunRange: vi.fn(), stableSerializeSelectorArgs: vi.fn(), }; @@ -87,6 +88,7 @@ describe("useAsyncSelector", () => { mocks.setResult.mockReset(); mocks.initCachedSelector.mockReset(); mocks.runSelectorAsync.mockReset(); + mocks.runSelectorMaybeAsync.mockReset(); mocks.isNeedToRerunRange.mockReset(); mocks.stableSerializeSelectorArgs.mockReset(); mocks.stableSerializeSelectorArgs.mockReturnValue("args-key"); @@ -95,6 +97,7 @@ describe("useAsyncSelector", () => { useDB: () => mocks.db, initCachedSelector: (...args) => mocks.initCachedSelector(...args), runSelectorAsync: (...args) => mocks.runSelectorAsync(...args), + runSelectorMaybeAsync: (...args) => mocks.runSelectorMaybeAsync(...args), isNeedToRerunRange: (...args) => mocks.isNeedToRerunRange(...args), stableSerializeSelectorArgs: (...args) => mocks.stableSerializeSelectorArgs(...args), @@ -158,16 +161,18 @@ describe("useAsyncSelector", () => { const secondCmd = { table: "tasks", range: "second" }; let runCount = 0; - mocks.runSelectorAsync.mockImplementation((_db, _gen, cmds: unknown[]) => { - runCount++; - if (runCount === 1) { - cmds.push(firstCmd); - return first.promise; - } + mocks.runSelectorMaybeAsync.mockImplementation( + (_db, _gen, cmds: unknown[]) => { + runCount++; + if (runCount === 1) { + cmds.push(firstCmd); + return first.promise; + } - cmds.push(secondCmd); - return second.promise; - }); + cmds.push(secondCmd); + return second.promise; + }, + ); useAsyncSelector({ selector: function* selector() { @@ -176,20 +181,20 @@ describe("useAsyncSelector", () => { args: {}, }); - expect(mocks.runSelectorAsync).toHaveBeenCalledTimes(1); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(1); expect(mocks.db.subscribe).toHaveBeenCalledTimes(1); mocks.db.emit([{ id: "op-1" }]); mocks.db.emit([{ id: "op-2" }]); mocks.db.emit([{ id: "op-3" }]); - expect(mocks.runSelectorAsync).toHaveBeenCalledTimes(1); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(1); first.resolve("stale"); await flushPromises(); expect(mocks.setResult).not.toHaveBeenCalled(); - expect(mocks.runSelectorAsync).toHaveBeenCalledTimes(2); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(2); second.resolve("latest"); await flushPromises(); @@ -207,17 +212,19 @@ describe("useAsyncSelector", () => { [secondCmd], ignoredOps, ); - expect(mocks.runSelectorAsync).toHaveBeenCalledTimes(2); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(2); }); it("ignores a pending async selector result after unmount cleanup", async () => { const pending = deferred(); const cmd = { table: "tasks", range: "pending" }; - mocks.runSelectorAsync.mockImplementation((_db, _gen, cmds: unknown[]) => { - cmds.push(cmd); - return pending.promise; - }); + mocks.runSelectorMaybeAsync.mockImplementation( + (_db, _gen, cmds: unknown[]) => { + cmds.push(cmd); + return pending.promise; + }, + ); useAsyncSelector({ selector: function* selector() { @@ -254,16 +261,43 @@ describe("useAsyncSelector", () => { expect(result).toEqual([]); expect(mocks.stableSerializeSelectorArgs).not.toHaveBeenCalled(); - expect(mocks.runSelectorAsync).not.toHaveBeenCalled(); + expect(mocks.runSelectorMaybeAsync).not.toHaveBeenCalled(); expect(mocks.db.subscribe).not.toHaveBeenCalled(); }); + it("applies fully synchronous async selector runs without waiting for a promise turn", () => { + const cmd = { table: "tasks", range: "sync" }; + const selector = vi.fn(function* selector(_args: { projectId: string }) { + return ["unused"]; + }); + + mocks.runSelectorMaybeAsync.mockImplementation( + (_db, gen, cmds: unknown[]) => { + cmds.push(cmd); + gen(); + return ["task-1"]; + }, + ); + + useAsyncSelector({ + selector, + args: { projectId: "project-1" }, + defaultValue: [], + }); + + expect(selector).toHaveBeenCalledWith({ projectId: "project-1" }); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(1); + expect(mocks.runSelectorAsync).not.toHaveBeenCalled(); + expect(mocks.setResult).toHaveBeenCalledWith(["task-1"]); + expect(mocks.refs[0].current).toEqual([cmd]); + }); + it("runs object-form async selectors with args", async () => { const selector = vi.fn(function* selector(_args: { projectId: string }) { return ["unused"]; }); - mocks.runSelectorAsync.mockImplementation((_db, gen) => { + mocks.runSelectorMaybeAsync.mockImplementation((_db, gen) => { gen(); return Promise.resolve(["task-1"]); }); @@ -277,7 +311,7 @@ describe("useAsyncSelector", () => { await flushPromises(); expect(selector).toHaveBeenCalledWith({ projectId: "project-1" }); - expect(mocks.runSelectorAsync).toHaveBeenCalledTimes(1); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(1); expect(mocks.db.subscribe).toHaveBeenCalledTimes(1); expect(mocks.setResult).toHaveBeenCalledWith(["task-1"]); }); @@ -286,7 +320,7 @@ describe("useAsyncSelector", () => { const selector = vi.fn(function* selector(_args: { projectId: string }) { return ["unused"]; }); - mocks.runSelectorAsync.mockReturnValue(new Promise(() => {})); + mocks.runSelectorMaybeAsync.mockReturnValue(new Promise(() => {})); mocks.stableSerializeSelectorArgs .mockReturnValueOnce("project-1") .mockReturnValueOnce("project-2"); @@ -305,6 +339,6 @@ describe("useAsyncSelector", () => { expect(mocks.setResult).toHaveBeenCalledWith(["loading-1"]); expect(mocks.setResult).toHaveBeenCalledWith(["loading-2"]); - expect(mocks.runSelectorAsync).toHaveBeenCalledTimes(2); + expect(mocks.runSelectorMaybeAsync).toHaveBeenCalledTimes(2); }); }); diff --git a/packages/hyperdb/src/react/hooks.ts b/packages/hyperdb/src/react/hooks.ts index 526a563..c9ac1cd 100644 --- a/packages/hyperdb/src/react/hooks.ts +++ b/packages/hyperdb/src/react/hooks.ts @@ -9,6 +9,7 @@ import { import { initCachedSelector, runSelectorAsync, + runSelectorMaybeAsync, select, type AnyObjectSelector, type SelectorArgs, @@ -60,6 +61,11 @@ const createDisabledStore = (defaultValue: TReturn) => ({ getSnapshot: () => defaultValue, }); +const isPromiseLike = (value: T | PromiseLike): value is PromiseLike => + value !== null && + (typeof value === "object" || typeof value === "function") && + typeof (value as { then?: unknown }).then === "function"; + const defaultHookDeps = { useCallback, useEffect, @@ -70,6 +76,7 @@ const defaultHookDeps = { useDB, initCachedSelector, runSelectorAsync, + runSelectorMaybeAsync, select, isNeedToRerunRange, stableSerializeSelectorArgs, @@ -170,37 +177,65 @@ export function useAsyncSelector( let isRunning = false; let rerunRequested = false; - const run = async () => { + const run = () => { if (isRunning) { rerunRequested = true; return; } isRunning = true; + try { do { rerunRequested = false; const cmds: SelectRangeCmd[] = []; - // TODO: we can detetect if CachedDB has already cached value in range, - // and don't spawn async/await promise that may dramatically improve performance - const value = await hookDeps.runSelectorAsync( + const value = hookDeps.runSelectorMaybeAsync( db, genRef.current, cmds, ); - if (cancelled) return; + + if (isPromiseLike(value)) { + void Promise.resolve(value) + .then((resolvedValue) => { + if (cancelled || rerunRequested) { + return; + } + + selectRangeCmdsRef.current = cmds; + setResult(resolvedValue); + }) + .catch((error: unknown) => { + void Promise.reject(error); + }) + .finally(() => { + isRunning = false; + if (rerunRequested && !cancelled) { + run(); + } + }); + return; + } + + if (cancelled) { + isRunning = false; + return; + } if (rerunRequested) continue; selectRangeCmdsRef.current = cmds; setResult(value); } while (rerunRequested); - } finally { + + isRunning = false; + } catch (error) { isRunning = false; + void Promise.reject(error); } }; - void run(); + run(); const unsubscribe = db.subscribe((ops) => { if (isRunning) { @@ -216,7 +251,7 @@ export function useAsyncSelector( return; } - void run(); + run(); }); return () => {