Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Five tables: `durably_runs`, `durably_run_labels`, `durably_steps`, `durably_log
- `leaseRenewIntervalMs`: 5000ms
- `leaseMs`: 30000ms (lease duration; expired leases are reclaimed)
- `preserveSteps`: false (deletes step output data when runs reach terminal state)
- `retainRuns`: undefined (no automatic cleanup; set e.g. `'30d'` to auto-delete terminal runs)

## Browser Constraints (by design)

Expand Down
16 changes: 16 additions & 0 deletions packages/durably/docs/llms.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const durably = createDurably({
leaseRenewIntervalMs: 5000, // Lease renewal interval (ms)
leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed
preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup)
retainRuns: '30d', // Auto-delete terminal runs older than 30 days (runs during worker polling; supports 'd', 'h', 'm' units)
// Optional: type-safe labels with Zod schema
// labels: z.object({ organizationId: z.string(), env: z.string() }),
jobs: {
Expand Down Expand Up @@ -229,6 +230,21 @@ await durably.cancel(runId)
await durably.deleteRun(runId)
```

### Purge Old Runs

Batch-delete terminal runs (completed, failed, cancelled) older than a cutoff date.
Pending and leased runs are never deleted.

```ts
// Delete terminal runs older than 30 days
const deleted = await durably.purgeRuns({
olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000),
limit: 500, // optional batch size (default: 1000)
})
```

For automatic cleanup, use the `retainRuns` option (see Quick Start). Cleanup runs during idle worker polling cycles, at most once per minute, in batches of 100.

## Events

Subscribe to job execution events:
Expand Down
75 changes: 62 additions & 13 deletions packages/durably/src/durably.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ export interface DurablyOptions<
* ```
*/
jobs?: TJobs
/**
* Auto-delete terminal runs older than the specified duration.
* Only runs in terminal states (completed, failed, cancelled) are purged.
* @example '30d' (30 days), '24h' (24 hours), '60m' (60 minutes)
*/
retainRuns?: string
}

/**
Expand All @@ -83,6 +89,25 @@ const DEFAULTS = {
preserveSteps: false,
} as const

function parseDuration(value: string): number {
const match = value.match(/^(\d+)(d|h|m)$/)
if (!match) {
throw new Error(
`Invalid duration format: "${value}". Use e.g. '30d', '24h', '60m'`,
)
}
const num = Number.parseInt(match[1], 10)
const unit = match[2]
const multipliers: Record<string, number> = {
d: 86400000,
h: 3600000,
m: 60000,
}
return num * multipliers[unit]
}

const PURGE_INTERVAL_MS = 60_000

const ulid = monotonicFactory()
const BROWSER_SINGLETON_REGISTRY_KEY = '__durablyBrowserSingletonRegistry'
const BROWSER_LOCAL_DIALECT_KEY = '__durablyBrowserLocalKey'
Expand Down Expand Up @@ -307,6 +332,13 @@ export interface Durably<
*/
deleteRun(runId: string): Promise<void>

/**
* Delete terminal runs older than the specified cutoff.
* Only runs in terminal states (completed, failed, cancelled) are purged.
* @returns Number of deleted runs
*/
purgeRuns(options: { olderThan: Date; limit?: number }): Promise<number>

/**
* Get a run by ID
* @example
Expand Down Expand Up @@ -376,6 +408,8 @@ interface DurablyState<
migrated: boolean
leaseMs: number
leaseRenewIntervalMs: number
retainRunsMs: number | null
lastPurgeAt: number
releaseBrowserSingleton: () => void
}

Expand Down Expand Up @@ -864,26 +898,38 @@ function createDurablyInstance<
})
},

async purgeRuns(options: {
olderThan: Date
limit?: number
}): Promise<number> {
return storage.purgeRuns({
olderThan: options.olderThan.toISOString(),
limit: options.limit,
})
},

async processOne(options?: { workerId?: string }): Promise<boolean> {
const workerId = options?.workerId ?? defaultWorkerId()
const now = new Date().toISOString()

await storage.releaseExpiredLeases(now)

const leasedRuns = await storage.getRuns({ status: 'leased' })
const excludeConcurrencyKeys = leasedRuns
.filter(
(entry): entry is Run<TLabels> & { concurrencyKey: string } =>
entry.concurrencyKey !== null &&
entry.leaseExpiresAt !== null &&
entry.leaseExpiresAt > now,
)
.map((entry) => entry.concurrencyKey)

const run = await storage.claimNext(workerId, now, state.leaseMs, {
excludeConcurrencyKeys,
})
const run = await storage.claimNext(workerId, now, state.leaseMs)
if (!run) {
// Auto-purge old terminal runs if retainRuns is configured.
// Runs after claimNext so purge never serializes with job claiming.
// lastPurgeAt starts at 0, so the first idle cycle purges immediately.
if (
state.retainRunsMs !== null &&
Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS
) {
const purgeNow = Date.now()
state.lastPurgeAt = purgeNow
const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString()
storage.purgeRuns({ olderThan: cutoff, limit: 100 }).catch(() => {
// Purge failure is non-fatal — will retry on next interval
})
}
return false
}

Expand Down Expand Up @@ -978,6 +1024,7 @@ export function createDurably<
options.leaseRenewIntervalMs ?? DEFAULTS.leaseRenewIntervalMs,
leaseMs: options.leaseMs ?? DEFAULTS.leaseMs,
preserveSteps: options.preserveSteps ?? DEFAULTS.preserveSteps,
retainRunsMs: options.retainRuns ? parseDuration(options.retainRuns) : null,
}

const db = new Kysely<Database>({ dialect: options.dialect })
Expand Down Expand Up @@ -1023,6 +1070,8 @@ export function createDurably<
migrated: false,
leaseMs: config.leaseMs,
leaseRenewIntervalMs: config.leaseRenewIntervalMs,
retainRunsMs: config.retainRunsMs,
lastPurgeAt: 0,
releaseBrowserSingleton,
}

Expand Down
7 changes: 7 additions & 0 deletions packages/durably/src/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ const migrations: Migration[] = [
.columns(['job_name', 'created_at'])
.execute()

await db.schema
.createIndex('idx_durably_runs_status_completed')
.ifNotExists()
.on('durably_runs')
.columns(['status', 'completed_at'])
.execute()
Comment on lines +83 to +88
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't fold new DDL into migration 1.

Line 12 still leaves LATEST_SCHEMA_VERSION at 1, so databases that already recorded version 1 will skip this block forever. Existing installs won't get idx_durably_runs_status_completed, which means the new purge query loses its supporting index.

💡 Safer migration shape
-export const LATEST_SCHEMA_VERSION = 1
+export const LATEST_SCHEMA_VERSION = 2

 const migrations: Migration[] = [
   {
     version: 1,
     up: async (db) => {
-      await db.schema
-        .createIndex('idx_durably_runs_status_completed')
-        .ifNotExists()
-        .on('durably_runs')
-        .columns(['status', 'completed_at'])
-        .execute()
     },
   },
+  {
+    version: 2,
+    up: async (db) => {
+      await db.schema
+        .createIndex('idx_durably_runs_status_completed')
+        .ifNotExists()
+        .on('durably_runs')
+        .columns(['status', 'completed_at'])
+        .execute()
+    },
+  },
 ]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await db.schema
.createIndex('idx_durably_runs_status_completed')
.ifNotExists()
.on('durably_runs')
.columns(['status', 'completed_at'])
.execute()
export const LATEST_SCHEMA_VERSION = 2
const migrations: Migration[] = [
{
version: 1,
up: async (db) => {
// ... existing v1 migration logic ...
},
},
{
version: 2,
up: async (db) => {
await db.schema
.createIndex('idx_durably_runs_status_completed')
.ifNotExists()
.on('durably_runs')
.columns(['status', 'completed_at'])
.execute()
},
},
]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/src/migrations.ts` around lines 83 - 88, The migration adds
an index but it's bundled into migration 1 while LATEST_SCHEMA_VERSION remains
1, so existing databases will skip it; update the migrations by moving the
createIndex call out of the existing migration-1 block into a new migration
function/entry (e.g., create a migration for version 2) and increment
LATEST_SCHEMA_VERSION accordingly so the new index
(idx_durably_runs_status_completed) runs for already-upgraded DBs; ensure the
new migration includes the same
.createIndex(...).ifNotExists().on('durably_runs').columns(['status','completed_at']).execute()
logic and any related rollback or idempotency safeguards.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally kept in v1. This library has no production databases yet — we explicitly consolidated all migrations into v1 earlier in this PR to avoid carrying dead migration versions. The ifNotExists guard makes it safe for any edge case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coji, understood — thanks for the context. A clean consolidation into v1 while the library is still pre-production is a perfectly reasonable approach, and the ifNotExists guard ensures idempotency for any edge cases. I'll stand down on this one.


✏️ Learnings added
Learnt from: coji
Repo: coji/durably PR: 109
File: packages/durably/src/migrations.ts:83-88
Timestamp: 2026-03-16T07:56:24.962Z
Learning: In `packages/durably/src/migrations.ts`, all schema migrations (including new indexes like `idx_durably_runs_status_completed`) are intentionally consolidated into migration version 1. The library has no production databases yet, so there is no need to carry separate incremental migration versions. `LATEST_SCHEMA_VERSION` is kept at 1 by design. The `ifNotExists` guard on DDL statements is used as the idempotency safeguard. Do not flag folding new DDL into migration v1 as an issue for this repository.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: CR
Repo: coji/durably PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T13:21:21.863Z
Learning: Applies to **/*.{js,ts} : Use the dialect injection pattern - pass a Kysely dialect to `createDurably()` to abstract SQLite implementations

Learnt from: coji
Repo: coji/durably PR: 101
File: packages/durably/tests/node/db-semantics.postgres.test.ts:4-4
Timestamp: 2026-03-08T05:03:43.723Z
Learning: In `packages/durably/tests/node/db-semantics.postgres.test.ts`, `createPostgresSchemaResource()` is intentionally called with parentheses. It returns an object `{ createDialect, setup, cleanup }` (a SharedDialectResource), not a plain dialect factory function. The `createDbSemanticsTests` harness in `packages/durably/tests/shared/db-semantics.shared.ts` (around line 18) handles both patterns: a bare factory function OR a pre-invoked resource object with `createDialect`, optional `setup`, and optional `cleanup` methods.

Learnt from: CR
Repo: coji/durably PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T13:21:21.863Z
Learning: Applies to **/*.{js,ts,mjs} : Use ESM-only syntax; CommonJS is not supported. Always use top-level `await` for async initialization (e.g., `await durably.migrate()`). Do not wrap in async IIFE or Promise chains.

Learnt from: coji
Repo: coji/durably PR: 101
File: docs/rfcs/runtime-rearchitecture/ja/core-runtime.md:0-0
Timestamp: 2026-03-08T05:06:39.050Z
Learning: Reviewers should verify that step execution uses step.run() and that step output data is persisted. By default, preserveSteps is false (step history is deleted when runs reach a terminal state). If audit/debug history is required, ensure preserveSteps is set to true. Be aware that cleanupSteps used to exist with inverted semantics (cleanupSteps: true meant delete), and this was renamed to preserveSteps in Phase 1. Apply this guidance to all JS/TS code and docs that reference step persistence.


// Create normalized labels table for indexed label filtering
await db.schema
.createTable('durably_run_labels')
Expand Down
75 changes: 46 additions & 29 deletions packages/durably/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ export type RunStatus =
| 'failed'
| 'cancelled'

/** Run statuses that represent terminal (non-active) states */
const TERMINAL_STATUSES: RunStatus[] = ['completed', 'failed', 'cancelled']

/**
* Run data for creating a new run
*/
Expand Down Expand Up @@ -125,10 +128,6 @@ export interface ProgressData {
message?: string
}

export interface ClaimOptions {
excludeConcurrencyKeys?: string[]
}

export type DatabaseBackend = 'generic' | 'postgres'

/**
Expand Down Expand Up @@ -169,7 +168,6 @@ export interface Store<
workerId: string,
now: string,
leaseMs: number,
options?: ClaimOptions,
): Promise<Run<TLabels> | null>
renewLease(
runId: string,
Expand Down Expand Up @@ -215,6 +213,9 @@ export interface Store<
progress: ProgressData | null,
): Promise<void>

// Purge
purgeRuns(options: { olderThan: string; limit?: number }): Promise<number>

// Logs
createLog(input: CreateLogInput): Promise<Log>
getLogs(runId: string): Promise<Log[]>
Expand Down Expand Up @@ -361,6 +362,20 @@ export function createKyselyStore(
): Store<Record<string, string>> {
const withWriteLock = createWriteMutex()

/** Delete runs and all associated data (steps, logs, labels) in dependency order */
async function cascadeDeleteRuns(
trx: Kysely<Database>,
ids: string[],
): Promise<void> {
await trx.deleteFrom('durably_steps').where('run_id', 'in', ids).execute()
await trx.deleteFrom('durably_logs').where('run_id', 'in', ids).execute()
await trx
.deleteFrom('durably_run_labels')
.where('run_id', 'in', ids)
.execute()
await trx.deleteFrom('durably_runs').where('id', 'in', ids).execute()
}

async function insertLabelRows(
executor: Kysely<Database>,
runId: string,
Expand Down Expand Up @@ -648,30 +663,40 @@ export function createKyselyStore(

async deleteRun(runId: string) {
await db.transaction().execute(async (trx) => {
await trx
.deleteFrom('durably_steps')
.where('run_id', '=', runId)
.execute()
await trx
.deleteFrom('durably_logs')
.where('run_id', '=', runId)
.execute()
await trx
.deleteFrom('durably_run_labels')
.where('run_id', '=', runId)
await cascadeDeleteRuns(trx, [runId])
})
},

async purgeRuns(options: {
olderThan: string
limit?: number
}): Promise<number> {
const limit = options.limit ?? 1000

return await db.transaction().execute(async (trx) => {
const rows = await trx
.selectFrom('durably_runs')
.select('id')
.where('status', 'in', TERMINAL_STATUSES)
.where('completed_at', '<', options.olderThan)
.orderBy('completed_at', 'asc')
.limit(limit)
.execute()
await trx.deleteFrom('durably_runs').where('id', '=', runId).execute()

if (rows.length === 0) return 0

const ids = rows.map((r) => r.id)
await cascadeDeleteRuns(trx, ids)
return ids.length
})
},

async claimNext(
workerId: string,
now: string,
leaseMs: number,
options?: ClaimOptions,
): Promise<Run | null> {
const leaseExpiresAt = new Date(Date.parse(now) + leaseMs).toISOString()
const excludeConcurrencyKeys = options?.excludeConcurrencyKeys ?? []
const activeLeaseGuard = sql<boolean>`
(
concurrency_key IS NULL
Expand All @@ -689,7 +714,7 @@ export function createKyselyStore(

if (backend === 'postgres') {
return await db.transaction().execute(async (trx) => {
const skipKeys = [...excludeConcurrencyKeys]
const skipKeys: string[] = []

// Loop: on concurrency-key conflict, exclude that key and retry
// to find the next eligible candidate in the same transaction.
Expand Down Expand Up @@ -790,15 +815,6 @@ export function createKyselyStore(
.orderBy('id', 'asc')
.limit(1)

if (excludeConcurrencyKeys.length > 0) {
subquery = subquery.where((eb) =>
eb.or([
eb('concurrency_key', 'is', null),
eb('concurrency_key', 'not in', excludeConcurrencyKeys),
]),
)
}

const row = await db
.updateTable('durably_runs')
.set({
Expand Down Expand Up @@ -1035,6 +1051,7 @@ export function createKyselyStore(
'enqueueMany',
'updateRun',
'deleteRun',
'purgeRuns',
'claimNext',
'renewLease',
'releaseExpiredLeases',
Expand Down
1 change: 1 addition & 0 deletions packages/durably/tests/node/migration-consolidated.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ describe('migration consolidated schema', () => {
expect(indexNames).toContain('idx_durably_runs_status_created')
expect(indexNames).toContain('idx_durably_runs_status_lease_expires')
expect(indexNames).toContain('idx_durably_runs_job_created')
expect(indexNames).toContain('idx_durably_runs_status_completed')

// Steps indexes
expect(indexNames).toContain('idx_durably_steps_run_index')
Expand Down
4 changes: 4 additions & 0 deletions packages/durably/tests/node/purge.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { createNodeDialect } from '../helpers/node-dialect'
import { createPurgeTests } from '../shared/purge.shared'

createPurgeTests(createNodeDialect)
Loading