feat: normalize labels into indexed table for efficient filtering#103
Conversation
Labels were stored as JSON in durably_runs.labels and filtered via json_extract/json_each, which requires full table scans. At scale (500K+ runs for multi-tenant workloads), this becomes a bottleneck. Add durably_run_labels(run_id, key, value) with (key, value) index for O(log n) lookups. Uses dual-write strategy: JSON column kept for reads/events, normalized table used for WHERE-clause filtering via EXISTS subqueries. Migration v2 backfills from existing JSON data with SQLite/PostgreSQL-specific JSON functions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds a normalized Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant State as DurablyState
participant Backend as BackendDetector
participant Migrations as Migrations
participant DB as Database
App->>State: initialize(options)
State->>Backend: detectBackend(options.dialect)
Backend-->>State: backend
State->>Migrations: runMigrations(db, backend)
Migrations->>DB: check schema version
alt needs v2
Migrations->>DB: create durably_run_labels table
Migrations->>DB: create indexes (run_id+key unique, key+value non-unique)
alt backend supports set-based backfill
Migrations->>DB: set-based insert of label rows
else
Migrations->>DB: iterate runs and insert label rows
end
end
DB-->>Migrations: complete
Migrations-->>State: done
State-->>App: ready
sequenceDiagram
participant Client as App/API
participant Store as Storage Layer
participant Mutex as In-Process WriteLock
participant DB as Database
participant Labels as durably_run_labels
participant Worker as Worker
Client->>Store: enqueue(run, labels)
Store->>Mutex: acquire write lock
Mutex-->>Store: lock granted
Store->>DB: insert into durably_runs -> run_id
Store->>Labels: insert normalized rows (run_id, key, value)
Labels-->>Store: rows inserted
Store->>Mutex: release write lock
Store-->>Client: enqueue result
Note over Worker,Store: concurrent mutating ops serialized by Mutex
Client->>Store: getRuns(filter with labels)
Store->>Labels: EXISTS query for matching (key,value)
alt EXISTS supported/used
Labels-->>Store: matching run_ids
Store->>DB: fetch runs by ids/filters
else JSON fallback
Store->>DB: JSON-based label filtering
end
DB-->>Store: runs
Store-->>Client: filtered runs
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
No need for separate v2 migration since there are no existing databases to migrate from. Remove backend parameter from runMigrations since backfill logic is no longer needed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/durably/src/storage.ts (1)
486-500: Consider avoiding redundant JSON re-parsing.The labels are already available from each
input.labelsin the originalinputsarray. Currently, the code JSON-stringifies labels intorun.labels(line 471) and then re-parses them (line 490). While functionally correct, you could avoid this round-trip by tracking which input corresponds to each new run.♻️ Optional refactor to avoid re-parsing
// Insert all new runs in a single batch - const newRuns = runs.filter((r) => r.created_at === now) + const newRunIndices: number[] = [] + for (let i = 0; i < runs.length; i++) { + if (runs[i].created_at === now) newRunIndices.push(i) + } + const newRuns = newRunIndices.map((i) => runs[i]) if (newRuns.length > 0) { await trx.insertInto('durably_runs').values(newRuns).execute() // Insert normalized labels for indexed filtering const labelRows: { run_id: string; key: string; value: string }[] = [] - for (const run of newRuns) { - const labels = JSON.parse(run.labels) as Record<string, string> + for (const idx of newRunIndices) { + const labels = inputs[idx].labels ?? {} for (const [key, value] of Object.entries(labels)) { - labelRows.push({ run_id: run.id, key, value }) + labelRows.push({ run_id: runs[idx].id, key, value }) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/durably/src/storage.ts` around lines 486 - 500, The code reparses run.labels from newRuns to build labelRows; instead, carry the parsed labels from the original inputs to avoid the JSON round-trip: when creating the runs (the runRows/newRuns creation around run.labels assignment), attach or preserve the parsed labels (e.g., parsedLabels or use the inputs[index].labels) and then use that property instead of JSON.parse(run.labels) when building labelRows for inserting into durably_run_labels via trx.insertInto; reference symbols: newRuns, run.labels, inputs, labelRows, durably_run_labels, trx.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/durably/src/storage.ts`:
- Around line 486-500: The code reparses run.labels from newRuns to build
labelRows; instead, carry the parsed labels from the original inputs to avoid
the JSON round-trip: when creating the runs (the runRows/newRuns creation around
run.labels assignment), attach or preserve the parsed labels (e.g., parsedLabels
or use the inputs[index].labels) and then use that property instead of
JSON.parse(run.labels) when building labelRows for inserting into
durably_run_labels via trx.insertInto; reference symbols: newRuns, run.labels,
inputs, labelRows, durably_run_labels, trx.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3a2a061c-fe99-48ce-9364-4ee92e77b4ac
📒 Files selected for processing (6)
CLAUDE.mdpackages/durably/src/durably.tspackages/durably/src/migrations.tspackages/durably/src/schema.tspackages/durably/src/storage.tspackages/durably/tests/node/migration-consolidated.test.ts
…bel filtering - Extract shared insertLabelRows() helper for enqueue/enqueueMany - Avoid JSON round-trip in enqueueMany by tracking labels in a Map - Add JSON fallback to getRuns label filter (EXISTS OR json_extract) to handle atomicity gap without requiring transactions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests that exercise concurrent writes between the worker and user operations (enqueue, enqueueMany, deleteRun, triggerAndWait). deleteRun currently fails with SQLITE_BUSY due to libsql's connection model opening separate connections for transactions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
libsql opens separate SQLite connections for transactions, causing SQLITE_BUSY when concurrent writes happen from the same Kysely instance (e.g., worker processing + user enqueue/delete). Add an async write mutex that serializes all mutating operations within a single process. Read operations are not locked. This preserves all existing transactions (deleteRun, recordStep, enqueueMany) while preventing write/write conflicts. Also fix contention test type errors by adding output schemas. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
packages/durably/src/storage.ts (1)
515-519: Consider batching all label inserts for efficiency.Currently, this iterates and issues a separate
insertLabelRowscall per run. For large batch enqueues with many labeled runs, this could be optimized to a single insert.♻️ Optional: Single batch insert for all labels
// Insert normalized labels for indexed filtering - for (const [runId, labels] of newRunLabels) { - await insertLabelRows(trx, runId, labels) - } + const allLabels = [...newRunLabels.entries()].flatMap(([runId, labels]) => + Object.entries(labels).map(([key, value]) => ({ run_id: runId, key, value })) + ) + if (allLabels.length > 0) { + await trx.insertInto('durably_run_labels').values(allLabels).execute() + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/durably/src/storage.ts` around lines 515 - 519, The loop issues one insertLabelRows(trx, runId, labels) per run which is inefficient; instead collect all label rows from newRunLabels into a single array and perform one bulk insert within the transaction (either by extending insertLabelRows to accept a batched payload or adding a new insertLabelRowsBatch(trx, rows) helper). Build rows with runId, key, value (or the same shape insertLabelRows expects) and call the single bulk insert using the existing trx so all labels are inserted in one query.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@packages/durably/src/storage.ts`:
- Around line 515-519: The loop issues one insertLabelRows(trx, runId, labels)
per run which is inefficient; instead collect all label rows from newRunLabels
into a single array and perform one bulk insert within the transaction (either
by extending insertLabelRows to accept a batched payload or adding a new
insertLabelRowsBatch(trx, rows) helper). Build rows with runId, key, value (or
the same shape insertLabelRows expects) and call the single bulk insert using
the existing trx so all labels are inserted in one query.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2104fc7a-955e-4feb-ab26-1ab3eaaf9f80
📒 Files selected for processing (5)
packages/durably/src/durably.tspackages/durably/src/migrations.tspackages/durably/src/storage.tspackages/durably/tests/node/libsql-write-contention.test.tspackages/durably/tests/node/migration-consolidated.test.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/durably/src/durably.ts
- packages/durably/tests/node/migration-consolidated.test.ts
- Wrap enqueue() run+labels in transaction for atomicity - Batch all label rows in enqueueMany() into single INSERT - Fix orphaned JSDoc comment placement Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
durably_run_labels(run_id, key, value)normalized table with(key, value)index for O(log n) label filtering instead of full-table-scan JSON extractiongetRuns()label filtering usesEXISTSsubqueries with JSON fallback (json_extract/->>) to ensure runs are never lost even if label row insert hasn't completeddeleteRun()cleans up label rows in the same transactionenqueue()now wraps run insert + label row insert in a transaction (was two separate operations)enqueueMany()collects all label rows into a single INSERT instead of one per runMotivation
For multi-tenant workloads (e.g., 10 tenants × 48 runs/day × 365 × 3 years = 525K runs), JSON-based label filtering requires scanning every row. The normalized table with a
(key, value)index reduces this to O(log n).libsql's transaction model opens separate SQLite connections, causing SQLITE_BUSY when concurrent writes happen (e.g., worker processing + user enqueue). The write mutex serializes mutations within a single process to prevent this.
Test plan
pnpm validatepasses (format, lint, typecheck, tests)🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Refactor
Tests