From f2223c00c13cf1308684f73e59681d45db6013cb Mon Sep 17 00:00:00 2001 From: coji Date: Thu, 12 Mar 2026 23:21:51 +0900 Subject: [PATCH 1/8] feat: add purgeRuns API and retainRuns auto-cleanup option - `durably.purgeRuns({ olderThan, limit })` for manual batch deletion of terminal runs (completed, failed, cancelled) with cascading cleanup of steps, logs, and labels - `retainRuns: '30d'` option on createDurably for automatic periodic purge (runs once per 60s during worker polling, batch size 100) - Migration v2: adds (status, completed_at) index for efficient purge queries - Duration parser supports 'd' (days), 'h' (hours), 'm' (minutes) - 11 new tests covering all purge scenarios Closes #88 Co-Authored-By: Claude Opus 4.6 --- packages/durably/docs/llms.md | 16 ++ packages/durably/src/durably.ts | 56 ++++ packages/durably/src/migrations.ts | 14 +- packages/durably/src/storage.ts | 45 +++ .../tests/node/migration-consolidated.test.ts | 3 +- packages/durably/tests/node/purge.test.ts | 4 + packages/durably/tests/shared/purge.shared.ts | 256 ++++++++++++++++++ website/public/llms.txt | 16 ++ 8 files changed, 408 insertions(+), 2 deletions(-) create mode 100644 packages/durably/tests/node/purge.test.ts create mode 100644 packages/durably/tests/shared/purge.shared.ts diff --git a/packages/durably/docs/llms.md b/packages/durably/docs/llms.md index 9086f2d..e40bac4 100644 --- a/packages/durably/docs/llms.md +++ b/packages/durably/docs/llms.md @@ -36,6 +36,7 @@ const durably = createDurably({ leaseRenewIntervalMs: 5000, // Lease renewal interval (ms) leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days (optional; supports 'd', 'h', 'm' units) // Optional: type-safe labels with Zod schema // labels: z.object({ organizationId: z.string(), env: z.string() }), jobs: { @@ -229,6 +230,21 @@ await durably.cancel(runId) await durably.deleteRun(runId) ``` +### Purge Old Runs + +Batch-delete terminal runs (completed, failed, cancelled) older than a cutoff date. +Pending and leased runs are never deleted. + +```ts +// Delete terminal runs older than 30 days +const deleted = await durably.purgeRuns({ + olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000), + limit: 500, // optional batch size (default: 1000) +}) +``` + +For automatic cleanup, use the `retainRuns` option (see Quick Start). + ## Events Subscribe to job execution events: diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index f7878d1..8ad4fde 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -71,6 +71,12 @@ export interface DurablyOptions< * ``` */ jobs?: TJobs + /** + * Auto-delete terminal runs older than the specified duration. + * Only runs in terminal states (completed, failed, cancelled) are purged. + * @example '30d' (30 days), '24h' (24 hours), '60m' (60 minutes) + */ + retainRuns?: string } /** @@ -83,6 +89,23 @@ const DEFAULTS = { preserveSteps: false, } as const +function parseDuration(value: string): number { + const match = value.match(/^(\d+)(d|h|m)$/) + if (!match) { + throw new Error( + `Invalid duration format: "${value}". Use e.g. '30d', '24h', '60m'`, + ) + } + const num = Number.parseInt(match[1], 10) + const unit = match[2] + const multipliers: Record = { + d: 86400000, + h: 3600000, + m: 60000, + } + return num * multipliers[unit] +} + const ulid = monotonicFactory() const BROWSER_SINGLETON_REGISTRY_KEY = '__durablyBrowserSingletonRegistry' const BROWSER_LOCAL_DIALECT_KEY = '__durablyBrowserLocalKey' @@ -307,6 +330,13 @@ export interface Durably< */ deleteRun(runId: string): Promise + /** + * Delete terminal runs older than the specified cutoff. + * Only runs in terminal states (completed, failed, cancelled) are purged. + * @returns Number of deleted runs + */ + purgeRuns(options: { olderThan: Date; limit?: number }): Promise + /** * Get a run by ID * @example @@ -376,6 +406,8 @@ interface DurablyState< migrated: boolean leaseMs: number leaseRenewIntervalMs: number + retainRunsMs: number | null + lastPurgeAt: number releaseBrowserSingleton: () => void } @@ -864,12 +896,33 @@ function createDurablyInstance< }) }, + async purgeRuns(options: { + olderThan: Date + limit?: number + }): Promise { + return storage.purgeRuns({ + olderThan: options.olderThan.toISOString(), + limit: options.limit, + }) + }, + async processOne(options?: { workerId?: string }): Promise { const workerId = options?.workerId ?? defaultWorkerId() const now = new Date().toISOString() await storage.releaseExpiredLeases(now) + // Auto-purge old terminal runs if retainRuns is configured + const PURGE_INTERVAL_MS = 60_000 + if ( + state.retainRunsMs !== null && + Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS + ) { + state.lastPurgeAt = Date.now() + const cutoff = new Date(Date.now() - state.retainRunsMs).toISOString() + await storage.purgeRuns({ olderThan: cutoff, limit: 100 }) + } + const leasedRuns = await storage.getRuns({ status: 'leased' }) const excludeConcurrencyKeys = leasedRuns .filter( @@ -978,6 +1031,7 @@ export function createDurably< options.leaseRenewIntervalMs ?? DEFAULTS.leaseRenewIntervalMs, leaseMs: options.leaseMs ?? DEFAULTS.leaseMs, preserveSteps: options.preserveSteps ?? DEFAULTS.preserveSteps, + retainRunsMs: options.retainRuns ? parseDuration(options.retainRuns) : null, } const db = new Kysely({ dialect: options.dialect }) @@ -1023,6 +1077,8 @@ export function createDurably< migrated: false, leaseMs: config.leaseMs, leaseRenewIntervalMs: config.leaseRenewIntervalMs, + retainRunsMs: config.retainRunsMs, + lastPurgeAt: 0, releaseBrowserSingleton, } diff --git a/packages/durably/src/migrations.ts b/packages/durably/src/migrations.ts index d4e0382..22a1c0b 100644 --- a/packages/durably/src/migrations.ts +++ b/packages/durably/src/migrations.ts @@ -9,7 +9,7 @@ interface Migration { up: (db: Kysely) => Promise } -export const LATEST_SCHEMA_VERSION = 1 +export const LATEST_SCHEMA_VERSION = 2 const migrations: Migration[] = [ { @@ -166,6 +166,18 @@ const migrations: Migration[] = [ .execute() }, }, + { + version: 2, + up: async (db) => { + // Index for efficient purge queries (retainRuns feature) + await db.schema + .createIndex('idx_durably_runs_status_completed') + .ifNotExists() + .on('durably_runs') + .columns(['status', 'completed_at']) + .execute() + }, + }, ] /** diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index f09f134..fe1a23e 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -215,6 +215,9 @@ export interface Store< progress: ProgressData | null, ): Promise + // Purge + purgeRuns(options: { olderThan: string; limit?: number }): Promise + // Logs createLog(input: CreateLogInput): Promise getLogs(runId: string): Promise @@ -664,6 +667,47 @@ export function createKyselyStore( }) }, + async purgeRuns(options: { + olderThan: string + limit?: number + }): Promise { + const limit = options.limit ?? 1000 + const terminalStatuses: RunStatus[] = ['completed', 'failed', 'cancelled'] + + return await db.transaction().execute(async (trx) => { + // Find IDs of terminal runs older than cutoff + const rows = await trx + .selectFrom('durably_runs') + .select('id') + .where('status', 'in', terminalStatuses) + .where('completed_at', '<', options.olderThan) + .orderBy('completed_at', 'asc') + .limit(limit) + .execute() + + if (rows.length === 0) return 0 + + const ids = rows.map((r) => r.id) + + // Cascade delete in dependency order + await trx + .deleteFrom('durably_steps') + .where('run_id', 'in', ids) + .execute() + await trx + .deleteFrom('durably_logs') + .where('run_id', 'in', ids) + .execute() + await trx + .deleteFrom('durably_run_labels') + .where('run_id', 'in', ids) + .execute() + await trx.deleteFrom('durably_runs').where('id', 'in', ids).execute() + + return ids.length + }) + }, + async claimNext( workerId: string, now: string, @@ -1035,6 +1079,7 @@ export function createKyselyStore( 'enqueueMany', 'updateRun', 'deleteRun', + 'purgeRuns', 'claimNext', 'renewLease', 'releaseExpiredLeases', diff --git a/packages/durably/tests/node/migration-consolidated.test.ts b/packages/durably/tests/node/migration-consolidated.test.ts index d34758d..ab21b4c 100644 --- a/packages/durably/tests/node/migration-consolidated.test.ts +++ b/packages/durably/tests/node/migration-consolidated.test.ts @@ -38,7 +38,7 @@ describe('migration consolidated schema', () => { SELECT version FROM durably_schema_versions ORDER BY version DESC LIMIT 1 `.execute(durably.db) expect(versions.rows[0]?.version).toBe(LATEST_SCHEMA_VERSION) - expect(LATEST_SCHEMA_VERSION).toBe(1) + expect(LATEST_SCHEMA_VERSION).toBe(2) }) it('creates all expected indexes', async () => { @@ -61,6 +61,7 @@ describe('migration consolidated schema', () => { expect(indexNames).toContain('idx_durably_runs_status_created') expect(indexNames).toContain('idx_durably_runs_status_lease_expires') expect(indexNames).toContain('idx_durably_runs_job_created') + expect(indexNames).toContain('idx_durably_runs_status_completed') // Steps indexes expect(indexNames).toContain('idx_durably_steps_run_index') diff --git a/packages/durably/tests/node/purge.test.ts b/packages/durably/tests/node/purge.test.ts new file mode 100644 index 0000000..25b8791 --- /dev/null +++ b/packages/durably/tests/node/purge.test.ts @@ -0,0 +1,4 @@ +import { createNodeDialect } from '../helpers/node-dialect' +import { createPurgeTests } from '../shared/purge.shared' + +createPurgeTests(createNodeDialect) diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts new file mode 100644 index 0000000..d42c611 --- /dev/null +++ b/packages/durably/tests/shared/purge.shared.ts @@ -0,0 +1,256 @@ +import type { Dialect } from 'kysely' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { z } from 'zod' +import { createDurably, defineJob, type Durably } from '../../src' + +export function createPurgeTests(createDialect: () => Dialect) { + describe('purgeRuns', () => { + let durably: Durably + + beforeEach(async () => { + durably = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + }) + await durably.migrate() + }) + + afterEach(async () => { + await durably.stop() + await durably.db.destroy() + }) + + const testJob = defineJob({ + name: 'purge-test-job', + input: z.object({}), + run: async () => {}, + }) + + const failingJob = defineJob({ + name: 'purge-failing-job', + input: z.object({}), + run: async () => { + throw new Error('fail') + }, + }) + + describe('durably.purgeRuns()', () => { + it('deletes completed runs older than cutoff', async () => { + const d = durably.register({ testJob }) + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Purge with cutoff in the future — should delete the run + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(1) + expect(await d.getRun(run.id)).toBeNull() + }) + + it('deletes failed runs older than cutoff', async () => { + const d = durably.register({ failingJob }) + d.start() + + const run = await d.jobs.failingJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('failed') + }, + { timeout: 5000 }, + ) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(1) + expect(await d.getRun(run.id)).toBeNull() + }) + + it('deletes cancelled runs older than cutoff', async () => { + const d = durably.register({ testJob }) + + const run = await d.jobs.testJob.trigger({}) + await d.cancel(run.id) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(1) + expect(await d.getRun(run.id)).toBeNull() + }) + + it('does NOT delete pending or leased runs', async () => { + const d = durably.register({ testJob }) + + // Create a pending run (don't start worker) + const run = await d.jobs.testJob.trigger({}) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(0) + expect(await d.getRun(run.id)).not.toBeNull() + }) + + it('does NOT delete runs newer than cutoff', async () => { + const d = durably.register({ testJob }) + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Purge with cutoff in the past — should not delete + const pastDate = new Date(Date.now() - 60000) + const deleted = await d.purgeRuns({ olderThan: pastDate }) + + expect(deleted).toBe(0) + expect(await d.getRun(run.id)).not.toBeNull() + }) + + it('respects the limit parameter', async () => { + const d = durably.register({ testJob }) + d.start() + + // Trigger 3 runs + await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const runs = await d.getRuns({ status: 'completed' }) + expect(runs.length).toBe(3) + }, + { timeout: 5000 }, + ) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate, limit: 2 }) + + expect(deleted).toBe(2) + const remaining = await d.getRuns() + expect(remaining.length).toBe(1) + }) + + it('returns 0 when no runs match', async () => { + const d = durably.register({ testJob }) + const deleted = await d.purgeRuns({ + olderThan: new Date(Date.now() + 60000), + }) + expect(deleted).toBe(0) + }) + + it('also deletes associated steps', async () => { + const jobWithSteps = defineJob({ + name: 'purge-steps-job', + input: z.object({}), + run: async (ctx) => { + await ctx.run('step1', () => 'result1') + await ctx.run('step2', () => 'result2') + }, + }) + + // preserveSteps: true so steps remain after completion + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + preserveSteps: true, + }).register({ jobWithSteps }) + + await d.migrate() + d.start() + const run = await d.jobs.jobWithSteps.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Verify steps exist before purge + const steps = await d.storage.getSteps(run.id) + expect(steps.length).toBeGreaterThan(0) + + const futureDate = new Date(Date.now() + 60000) + await d.purgeRuns({ olderThan: futureDate }) + + // Steps should be gone too + const stepsAfter = await d.storage.getSteps(run.id) + expect(stepsAfter.length).toBe(0) + + await d.stop() + await d.db.destroy() + }) + }) + + describe('retainRuns option', () => { + it('auto-purges old runs during worker polling', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '1m', // 1 minute retention + }).register({ testJob }) + + await d.migrate() + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Run just completed — should NOT be purged (it's not 1 minute old) + // Wait a polling cycle to ensure auto-purge ran + await new Promise((r) => setTimeout(r, 200)) + expect(await d.getRun(run.id)).not.toBeNull() + + await d.stop() + await d.db.destroy() + }) + + it('throws on invalid retainRuns format', () => { + expect(() => + createDurably({ + dialect: createDialect(), + retainRuns: 'invalid', + }), + ).toThrow('Invalid duration format') + }) + + it('throws on invalid retainRuns unit', () => { + expect(() => + createDurably({ + dialect: createDialect(), + retainRuns: '30s', + }), + ).toThrow('Invalid duration format') + }) + }) + }) +} diff --git a/website/public/llms.txt b/website/public/llms.txt index 0dc9f71..e44493e 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -36,6 +36,7 @@ const durably = createDurably({ leaseRenewIntervalMs: 5000, // Lease renewal interval (ms) leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days (optional; supports 'd', 'h', 'm' units) // Optional: type-safe labels with Zod schema // labels: z.object({ organizationId: z.string(), env: z.string() }), jobs: { @@ -229,6 +230,21 @@ await durably.cancel(runId) await durably.deleteRun(runId) ``` +### Purge Old Runs + +Batch-delete terminal runs (completed, failed, cancelled) older than a cutoff date. +Pending and leased runs are never deleted. + +```ts +// Delete terminal runs older than 30 days +const deleted = await durably.purgeRuns({ + olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000), + limit: 500, // optional batch size (default: 1000) +}) +``` + +For automatic cleanup, use the `retainRuns` option (see Quick Start). + ## Events Subscribe to job execution events: From bceeff6b2eb5339643a65a36a78c75b128fa179f Mon Sep 17 00:00:00 2001 From: coji Date: Thu, 12 Mar 2026 23:27:32 +0900 Subject: [PATCH 2/8] refactor: simplify purge implementation per review - Extract cascadeDeleteRuns() helper to eliminate duplication between deleteRun() and purgeRuns() - Extract TERMINAL_STATUSES constant to module scope - Move PURGE_INTERVAL_MS to module scope (was recreated per processOne call) - Make auto-purge fire-and-forget (void) so it doesn't block job claiming or lease renewal in the worker polling loop Co-Authored-By: Claude Opus 4.6 --- packages/durably/src/durably.ts | 9 ++++-- packages/durably/src/storage.ts | 52 +++++++++++++-------------------- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index 8ad4fde..ef0a678 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -106,6 +106,8 @@ function parseDuration(value: string): number { return num * multipliers[unit] } +const PURGE_INTERVAL_MS = 60_000 + const ulid = monotonicFactory() const BROWSER_SINGLETON_REGISTRY_KEY = '__durablyBrowserSingletonRegistry' const BROWSER_LOCAL_DIALECT_KEY = '__durablyBrowserLocalKey' @@ -912,15 +914,16 @@ function createDurablyInstance< await storage.releaseExpiredLeases(now) - // Auto-purge old terminal runs if retainRuns is configured - const PURGE_INTERVAL_MS = 60_000 + // Auto-purge old terminal runs if retainRuns is configured. + // Fire-and-forget: write lock serializes with other mutations, + // so purge doesn't block job claiming or lease renewal. if ( state.retainRunsMs !== null && Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS ) { state.lastPurgeAt = Date.now() const cutoff = new Date(Date.now() - state.retainRunsMs).toISOString() - await storage.purgeRuns({ olderThan: cutoff, limit: 100 }) + void storage.purgeRuns({ olderThan: cutoff, limit: 100 }) } const leasedRuns = await storage.getRuns({ status: 'leased' }) diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index fe1a23e..31b72ca 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -4,6 +4,9 @@ import type { Database } from './schema' const ulid = monotonicFactory() +/** Run statuses that represent terminal (non-active) states */ +const TERMINAL_STATUSES: RunStatus[] = ['completed', 'failed', 'cancelled'] + export type RunStatus = | 'pending' | 'leased' @@ -364,6 +367,20 @@ export function createKyselyStore( ): Store> { const withWriteLock = createWriteMutex() + /** Delete runs and all associated data (steps, logs, labels) in dependency order */ + async function cascadeDeleteRuns( + trx: Kysely, + ids: string[], + ): Promise { + await trx.deleteFrom('durably_steps').where('run_id', 'in', ids).execute() + await trx.deleteFrom('durably_logs').where('run_id', 'in', ids).execute() + await trx + .deleteFrom('durably_run_labels') + .where('run_id', 'in', ids) + .execute() + await trx.deleteFrom('durably_runs').where('id', 'in', ids).execute() + } + async function insertLabelRows( executor: Kysely, runId: string, @@ -651,19 +668,7 @@ export function createKyselyStore( async deleteRun(runId: string) { await db.transaction().execute(async (trx) => { - await trx - .deleteFrom('durably_steps') - .where('run_id', '=', runId) - .execute() - await trx - .deleteFrom('durably_logs') - .where('run_id', '=', runId) - .execute() - await trx - .deleteFrom('durably_run_labels') - .where('run_id', '=', runId) - .execute() - await trx.deleteFrom('durably_runs').where('id', '=', runId).execute() + await cascadeDeleteRuns(trx, [runId]) }) }, @@ -672,14 +677,12 @@ export function createKyselyStore( limit?: number }): Promise { const limit = options.limit ?? 1000 - const terminalStatuses: RunStatus[] = ['completed', 'failed', 'cancelled'] return await db.transaction().execute(async (trx) => { - // Find IDs of terminal runs older than cutoff const rows = await trx .selectFrom('durably_runs') .select('id') - .where('status', 'in', terminalStatuses) + .where('status', 'in', TERMINAL_STATUSES) .where('completed_at', '<', options.olderThan) .orderBy('completed_at', 'asc') .limit(limit) @@ -688,22 +691,7 @@ export function createKyselyStore( if (rows.length === 0) return 0 const ids = rows.map((r) => r.id) - - // Cascade delete in dependency order - await trx - .deleteFrom('durably_steps') - .where('run_id', 'in', ids) - .execute() - await trx - .deleteFrom('durably_logs') - .where('run_id', 'in', ids) - .execute() - await trx - .deleteFrom('durably_run_labels') - .where('run_id', 'in', ids) - .execute() - await trx.deleteFrom('durably_runs').where('id', 'in', ids).execute() - + await cascadeDeleteRuns(trx, ids) return ids.length }) }, From 5e31d3210d18460ebbce32991f10a25a505c23d9 Mon Sep 17 00:00:00 2001 From: coji Date: Thu, 12 Mar 2026 23:29:15 +0900 Subject: [PATCH 3/8] refactor: consolidate purge index into migration v1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No backward compatibility needed — merge the (status, completed_at) index into the single v1 migration instead of keeping a separate v2. Co-Authored-By: Claude Opus 4.6 --- packages/durably/src/migrations.ts | 21 +++++++------------ .../tests/node/migration-consolidated.test.ts | 2 +- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/packages/durably/src/migrations.ts b/packages/durably/src/migrations.ts index 22a1c0b..4c394ac 100644 --- a/packages/durably/src/migrations.ts +++ b/packages/durably/src/migrations.ts @@ -9,7 +9,7 @@ interface Migration { up: (db: Kysely) => Promise } -export const LATEST_SCHEMA_VERSION = 2 +export const LATEST_SCHEMA_VERSION = 1 const migrations: Migration[] = [ { @@ -80,6 +80,13 @@ const migrations: Migration[] = [ .columns(['job_name', 'created_at']) .execute() + await db.schema + .createIndex('idx_durably_runs_status_completed') + .ifNotExists() + .on('durably_runs') + .columns(['status', 'completed_at']) + .execute() + // Create normalized labels table for indexed label filtering await db.schema .createTable('durably_run_labels') @@ -166,18 +173,6 @@ const migrations: Migration[] = [ .execute() }, }, - { - version: 2, - up: async (db) => { - // Index for efficient purge queries (retainRuns feature) - await db.schema - .createIndex('idx_durably_runs_status_completed') - .ifNotExists() - .on('durably_runs') - .columns(['status', 'completed_at']) - .execute() - }, - }, ] /** diff --git a/packages/durably/tests/node/migration-consolidated.test.ts b/packages/durably/tests/node/migration-consolidated.test.ts index ab21b4c..b1288c8 100644 --- a/packages/durably/tests/node/migration-consolidated.test.ts +++ b/packages/durably/tests/node/migration-consolidated.test.ts @@ -38,7 +38,7 @@ describe('migration consolidated schema', () => { SELECT version FROM durably_schema_versions ORDER BY version DESC LIMIT 1 `.execute(durably.db) expect(versions.rows[0]?.version).toBe(LATEST_SCHEMA_VERSION) - expect(LATEST_SCHEMA_VERSION).toBe(2) + expect(LATEST_SCHEMA_VERSION).toBe(1) }) it('creates all expected indexes', async () => { From 78de639449b4347d48dd3bcfebfa5b34dfea47ca Mon Sep 17 00:00:00 2001 From: coji Date: Thu, 12 Mar 2026 23:41:45 +0900 Subject: [PATCH 4/8] fix: address simplify review findings - Move TERMINAL_STATUSES after RunStatus type definition (ordering) - Replace void fire-and-forget with .catch() to prevent unhandled rejection - Use single Date.now() variable in purge block instead of calling 3 times - Add comment documenting intentional immediate purge on first cycle Co-Authored-By: Claude Opus 4.6 --- packages/durably/src/durably.ts | 10 +++++++--- packages/durably/src/storage.ts | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index ef0a678..841ff71 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -917,13 +917,17 @@ function createDurablyInstance< // Auto-purge old terminal runs if retainRuns is configured. // Fire-and-forget: write lock serializes with other mutations, // so purge doesn't block job claiming or lease renewal. + // lastPurgeAt starts at 0, so the first cycle purges immediately on startup. if ( state.retainRunsMs !== null && Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS ) { - state.lastPurgeAt = Date.now() - const cutoff = new Date(Date.now() - state.retainRunsMs).toISOString() - void storage.purgeRuns({ olderThan: cutoff, limit: 100 }) + const purgeNow = Date.now() + state.lastPurgeAt = purgeNow + const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString() + storage.purgeRuns({ olderThan: cutoff, limit: 100 }).catch(() => { + // Purge failure is non-fatal — will retry on next interval + }) } const leasedRuns = await storage.getRuns({ status: 'leased' }) diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index 31b72ca..e771f3a 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -4,9 +4,6 @@ import type { Database } from './schema' const ulid = monotonicFactory() -/** Run statuses that represent terminal (non-active) states */ -const TERMINAL_STATUSES: RunStatus[] = ['completed', 'failed', 'cancelled'] - export type RunStatus = | 'pending' | 'leased' @@ -14,6 +11,9 @@ export type RunStatus = | 'failed' | 'cancelled' +/** Run statuses that represent terminal (non-active) states */ +const TERMINAL_STATUSES: RunStatus[] = ['completed', 'failed', 'cancelled'] + /** * Run data for creating a new run */ From 23b217ff28942fa1eee20bfb982e1e82995d5d8a Mon Sep 17 00:00:00 2001 From: coji Date: Fri, 13 Mar 2026 00:22:59 +0900 Subject: [PATCH 5/8] perf: remove redundant getRuns({ status: 'leased' }) from processOne hot path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The claimNext SQL already contains an activeLeaseGuard subquery that checks NOT EXISTS for active leases with the same concurrency key. The JS-side getRuns → filter → excludeConcurrencyKeys flow was redundant and added a full table scan + JOIN + JSON parse per poll cycle. Co-Authored-By: Claude Opus 4.6 --- packages/durably/src/durably.ts | 14 +----- packages/durably/src/storage.ts | 18 +------ .../durably/tests/shared/storage.shared.ts | 48 ------------------- 3 files changed, 2 insertions(+), 78 deletions(-) diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index 841ff71..6db414a 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -930,19 +930,7 @@ function createDurablyInstance< }) } - const leasedRuns = await storage.getRuns({ status: 'leased' }) - const excludeConcurrencyKeys = leasedRuns - .filter( - (entry): entry is Run & { concurrencyKey: string } => - entry.concurrencyKey !== null && - entry.leaseExpiresAt !== null && - entry.leaseExpiresAt > now, - ) - .map((entry) => entry.concurrencyKey) - - const run = await storage.claimNext(workerId, now, state.leaseMs, { - excludeConcurrencyKeys, - }) + const run = await storage.claimNext(workerId, now, state.leaseMs) if (!run) { return false } diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index e771f3a..677201e 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -128,10 +128,6 @@ export interface ProgressData { message?: string } -export interface ClaimOptions { - excludeConcurrencyKeys?: string[] -} - export type DatabaseBackend = 'generic' | 'postgres' /** @@ -172,7 +168,6 @@ export interface Store< workerId: string, now: string, leaseMs: number, - options?: ClaimOptions, ): Promise | null> renewLease( runId: string, @@ -700,10 +695,8 @@ export function createKyselyStore( workerId: string, now: string, leaseMs: number, - options?: ClaimOptions, ): Promise { const leaseExpiresAt = new Date(Date.parse(now) + leaseMs).toISOString() - const excludeConcurrencyKeys = options?.excludeConcurrencyKeys ?? [] const activeLeaseGuard = sql` ( concurrency_key IS NULL @@ -721,7 +714,7 @@ export function createKyselyStore( if (backend === 'postgres') { return await db.transaction().execute(async (trx) => { - const skipKeys = [...excludeConcurrencyKeys] + const skipKeys: string[] = [] // Loop: on concurrency-key conflict, exclude that key and retry // to find the next eligible candidate in the same transaction. @@ -822,15 +815,6 @@ export function createKyselyStore( .orderBy('id', 'asc') .limit(1) - if (excludeConcurrencyKeys.length > 0) { - subquery = subquery.where((eb) => - eb.or([ - eb('concurrency_key', 'is', null), - eb('concurrency_key', 'not in', excludeConcurrencyKeys), - ]), - ) - } - const row = await db .updateTable('durably_runs') .set({ diff --git a/packages/durably/tests/shared/storage.shared.ts b/packages/durably/tests/shared/storage.shared.ts index 5db841a..7272870 100644 --- a/packages/durably/tests/shared/storage.shared.ts +++ b/packages/durably/tests/shared/storage.shared.ts @@ -403,54 +403,6 @@ export function createStorageTests(createDialect: () => Dialect) { expect(result).toBeNull() }) - it('claimNext respects concurrency key exclusion', async () => { - await durably.storage.enqueue({ - jobName: 'job', - input: {}, - concurrencyKey: 'key-a', - }) - const run2 = await durably.storage.enqueue({ - jobName: 'job', - input: {}, - concurrencyKey: 'key-b', - }) - - const claimed = await durably.storage.claimNext( - 'test-worker', - new Date().toISOString(), - 30_000, - { excludeConcurrencyKeys: ['key-a'] }, - ) - - expect(claimed).not.toBeNull() - expect(claimed!.id).toBe(run2.id) - expect(claimed!.concurrencyKey).toBe('key-b') - expect(claimed!.status).toBe('leased') - }) - - it('claimNext skips runs with null concurrency key when not excluded', async () => { - const run1 = await durably.storage.enqueue({ - jobName: 'job', - input: {}, - }) - await durably.storage.enqueue({ - jobName: 'job', - input: {}, - concurrencyKey: 'key-a', - }) - - // Excluding key-a should still return the run without a concurrency key - const claimed = await durably.storage.claimNext( - 'test-worker', - new Date().toISOString(), - 30_000, - { excludeConcurrencyKeys: ['key-a'] }, - ) - - expect(claimed).not.toBeNull() - expect(claimed!.id).toBe(run1.id) - }) - it('claimNext preserves started_at on re-claim of recovered run', async () => { // Create and claim a run const created = await durably.storage.enqueue({ From 1b7d71b9e7697ca27b87f533caa0238e00e71acb Mon Sep 17 00:00:00 2001 From: coji Date: Fri, 13 Mar 2026 00:26:34 +0900 Subject: [PATCH 6/8] docs: add purgeRuns and retainRuns to website API reference Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 1 + website/api/create-durably.md | 15 +++++++++++++++ website/api/index.md | 19 +++++++++++-------- 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index f8cadd9..838f571 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -45,6 +45,7 @@ Five tables: `durably_runs`, `durably_run_labels`, `durably_steps`, `durably_log - `leaseRenewIntervalMs`: 5000ms - `leaseMs`: 30000ms (lease duration; expired leases are reclaimed) - `preserveSteps`: false (deletes step output data when runs reach terminal state) +- `retainRuns`: undefined (no automatic cleanup; set e.g. `'30d'` to auto-delete terminal runs) ## Browser Constraints (by design) diff --git a/website/api/create-durably.md b/website/api/create-durably.md index 6e4e2fb..62f88e2 100644 --- a/website/api/create-durably.md +++ b/website/api/create-durably.md @@ -28,6 +28,7 @@ interface DurablyOptions< leaseRenewIntervalMs?: number leaseMs?: number preserveSteps?: boolean + retainRuns?: string labels?: z.ZodType jobs?: TJobs } @@ -41,6 +42,7 @@ interface DurablyOptions< | `leaseMs` | `number` | `30000` | Lease duration — time until a job is considered stale (ms) | | `labels` | `z.ZodType` | — | Zod schema for labels. Enables type-safe labels and runtime validation on `trigger()` | | `preserveSteps` | `boolean` | `false` | Keep step output data when runs reach terminal state (completed/failed/cancelled) | +| `retainRuns` | `string` | — | Auto-delete terminal runs older than this duration (e.g. `'30d'`, `'12h'`, `'90m'`) | | `jobs` | `TJobs` | — | Job definitions to register. Shorthand for calling `.register()` after creation | ## Returns @@ -148,6 +150,19 @@ await durably.deleteRun(runId: string): Promise Deletes a run and its associated steps and logs. +### `purgeRuns()` + +```ts +await durably.purgeRuns(options?: { + olderThan?: string // ISO timestamp cutoff (default: now) + limit?: number // max rows to delete per call (default: 1000) +}): Promise +``` + +Deletes terminal runs (completed, failed, cancelled) older than the cutoff. Returns the number of deleted runs. Associated steps, logs, and labels are cascade-deleted. + +For automatic cleanup, use the [`retainRuns`](#options) option instead. + ### `getRun()` ```ts diff --git a/website/api/index.md b/website/api/index.md index ce4f57d..b1642c5 100644 --- a/website/api/index.md +++ b/website/api/index.md @@ -65,6 +65,7 @@ const durably = createDurably({ pollingIntervalMs: 1000, // Check for jobs every 1s leaseRenewIntervalMs: 5000, // Lease renewal every 5s leaseMs: 30000, // Lease duration (stale after 30s) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days // labels: z.object({ ... }), // Optional: type-safe labels jobs: { importCsv: importCsvJob }, }) @@ -208,14 +209,16 @@ function ImportButton() { ### Instance Methods -| Method | Description | -| -------------------- | --------------------------------------------------------------------------------- | -| `init()` | Migrate database and start worker | -| `register(jobs)` | Register job definitions | -| `on(event, handler)` | Subscribe to events | -| `stop()` | Stop worker gracefully | -| `retrigger(runId)` | Retrigger completed/failed/cancelled run (validates input against current schema) | -| `cancel(runId)` | Cancel pending or leased run | +| Method | Description | +| --------------------- | --------------------------------------------------------------------------------- | +| `init()` | Migrate database and start worker | +| `register(jobs)` | Register job definitions | +| `on(event, handler)` | Subscribe to events | +| `stop()` | Stop worker gracefully | +| `retrigger(runId)` | Retrigger completed/failed/cancelled run (validates input against current schema) | +| `cancel(runId)` | Cancel pending or leased run | +| `deleteRun(runId)` | Delete a run and its associated steps, logs, and labels | +| `purgeRuns(options?)` | Delete terminal runs older than a cutoff (for cleanup) | ### Step Context From cbb6cdb5fcc9f123af42fdcdc9fcd6a3abf9ad13 Mon Sep 17 00:00:00 2001 From: coji Date: Fri, 13 Mar 2026 00:28:32 +0900 Subject: [PATCH 7/8] =?UTF-8?q?docs:=20fix=20purgeRuns=20signature=20?= =?UTF-8?q?=E2=80=94=20olderThan=20is=20required=20Date,=20not=20optional?= =?UTF-8?q?=20string?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 --- website/api/create-durably.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/api/create-durably.md b/website/api/create-durably.md index 62f88e2..3ee1ae6 100644 --- a/website/api/create-durably.md +++ b/website/api/create-durably.md @@ -153,8 +153,8 @@ Deletes a run and its associated steps and logs. ### `purgeRuns()` ```ts -await durably.purgeRuns(options?: { - olderThan?: string // ISO timestamp cutoff (default: now) +await durably.purgeRuns(options: { + olderThan: Date // cutoff date — runs completed before this are deleted limit?: number // max rows to delete per call (default: 1000) }): Promise ``` From 85a366c5fa57c7362f6e752017f9fb1235b9b788 Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 16 Mar 2026 16:53:15 +0900 Subject: [PATCH 8/8] fix: address CodeRabbit review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move auto-purge after claimNext so it never serializes with job claiming - Add positive auto-purge test with backdated completed_at - Clarify retainRuns runs only during worker polling in docs - Fix purgeRuns(options?) → purgeRuns(options) in API quick reference - Clarify olderThan matches completedAt, not just "completed" status Co-Authored-By: Claude Opus 4.6 --- packages/durably/docs/llms.md | 4 +- packages/durably/src/durably.ts | 30 +++++++------- packages/durably/tests/shared/purge.shared.ts | 40 +++++++++++++++++++ website/api/create-durably.md | 24 +++++------ website/api/index.md | 20 +++++----- website/public/llms.txt | 4 +- 6 files changed, 80 insertions(+), 42 deletions(-) diff --git a/packages/durably/docs/llms.md b/packages/durably/docs/llms.md index e40bac4..f5a4c65 100644 --- a/packages/durably/docs/llms.md +++ b/packages/durably/docs/llms.md @@ -36,7 +36,7 @@ const durably = createDurably({ leaseRenewIntervalMs: 5000, // Lease renewal interval (ms) leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup) - retainRuns: '30d', // Auto-delete terminal runs older than 30 days (optional; supports 'd', 'h', 'm' units) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days (runs during worker polling; supports 'd', 'h', 'm' units) // Optional: type-safe labels with Zod schema // labels: z.object({ organizationId: z.string(), env: z.string() }), jobs: { @@ -243,7 +243,7 @@ const deleted = await durably.purgeRuns({ }) ``` -For automatic cleanup, use the `retainRuns` option (see Quick Start). +For automatic cleanup, use the `retainRuns` option (see Quick Start). Cleanup runs during idle worker polling cycles, at most once per minute, in batches of 100. ## Events diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index 6db414a..fa84469 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -914,24 +914,22 @@ function createDurablyInstance< await storage.releaseExpiredLeases(now) - // Auto-purge old terminal runs if retainRuns is configured. - // Fire-and-forget: write lock serializes with other mutations, - // so purge doesn't block job claiming or lease renewal. - // lastPurgeAt starts at 0, so the first cycle purges immediately on startup. - if ( - state.retainRunsMs !== null && - Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS - ) { - const purgeNow = Date.now() - state.lastPurgeAt = purgeNow - const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString() - storage.purgeRuns({ olderThan: cutoff, limit: 100 }).catch(() => { - // Purge failure is non-fatal — will retry on next interval - }) - } - const run = await storage.claimNext(workerId, now, state.leaseMs) if (!run) { + // Auto-purge old terminal runs if retainRuns is configured. + // Runs after claimNext so purge never serializes with job claiming. + // lastPurgeAt starts at 0, so the first idle cycle purges immediately. + if ( + state.retainRunsMs !== null && + Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS + ) { + const purgeNow = Date.now() + state.lastPurgeAt = purgeNow + const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString() + storage.purgeRuns({ olderThan: cutoff, limit: 100 }).catch(() => { + // Purge failure is non-fatal — will retry on next interval + }) + } return false } diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts index d42c611..ff051f1 100644 --- a/packages/durably/tests/shared/purge.shared.ts +++ b/packages/durably/tests/shared/purge.shared.ts @@ -234,6 +234,46 @@ export function createPurgeTests(createDialect: () => Dialect) { await d.db.destroy() }) + it('auto-purges runs with backdated completed_at', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '1m', // 1 minute retention + }).register({ testJob }) + + await d.migrate() + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Backdate completed_at to 2 minutes ago so it's older than retention + const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000).toISOString() + await d.db + .updateTable('durably_runs') + .set({ completed_at: twoMinutesAgo }) + .where('id', '=', run.id) + .execute() + + // Wait for auto-purge to fire during idle polling + await vi.waitFor( + async () => { + expect(await d.getRun(run.id)).toBeNull() + }, + { timeout: 5000 }, + ) + + await d.stop() + await d.db.destroy() + }) + it('throws on invalid retainRuns format', () => { expect(() => createDurably({ diff --git a/website/api/create-durably.md b/website/api/create-durably.md index 3ee1ae6..b6e2caf 100644 --- a/website/api/create-durably.md +++ b/website/api/create-durably.md @@ -34,16 +34,16 @@ interface DurablyOptions< } ``` -| Option | Type | Default | Description | -| ---------------------- | ----------- | -------- | ------------------------------------------------------------------------------------- | -| `dialect` | `Dialect` | required | Kysely dialect (SQLite, libSQL, or PostgreSQL) | -| `pollingIntervalMs` | `number` | `1000` | How often to check for pending jobs (ms) | -| `leaseRenewIntervalMs` | `number` | `5000` | How often to renew the lease (ms) | -| `leaseMs` | `number` | `30000` | Lease duration — time until a job is considered stale (ms) | -| `labels` | `z.ZodType` | — | Zod schema for labels. Enables type-safe labels and runtime validation on `trigger()` | -| `preserveSteps` | `boolean` | `false` | Keep step output data when runs reach terminal state (completed/failed/cancelled) | -| `retainRuns` | `string` | — | Auto-delete terminal runs older than this duration (e.g. `'30d'`, `'12h'`, `'90m'`) | -| `jobs` | `TJobs` | — | Job definitions to register. Shorthand for calling `.register()` after creation | +| Option | Type | Default | Description | +| ---------------------- | ----------- | -------- | ----------------------------------------------------------------------------------------------------------------- | +| `dialect` | `Dialect` | required | Kysely dialect (SQLite, libSQL, or PostgreSQL) | +| `pollingIntervalMs` | `number` | `1000` | How often to check for pending jobs (ms) | +| `leaseRenewIntervalMs` | `number` | `5000` | How often to renew the lease (ms) | +| `leaseMs` | `number` | `30000` | Lease duration — time until a job is considered stale (ms) | +| `labels` | `z.ZodType` | — | Zod schema for labels. Enables type-safe labels and runtime validation on `trigger()` | +| `preserveSteps` | `boolean` | `false` | Keep step output data when runs reach terminal state (completed/failed/cancelled) | +| `retainRuns` | `string` | — | Auto-delete terminal runs older than this duration (e.g. `'30d'`, `'12h'`, `'90m'`). Throws if format is invalid. | +| `jobs` | `TJobs` | — | Job definitions to register. Shorthand for calling `.register()` after creation | ## Returns @@ -154,12 +154,12 @@ Deletes a run and its associated steps and logs. ```ts await durably.purgeRuns(options: { - olderThan: Date // cutoff date — runs completed before this are deleted + olderThan: Date // cutoff — terminal runs with completedAt before this are deleted limit?: number // max rows to delete per call (default: 1000) }): Promise ``` -Deletes terminal runs (completed, failed, cancelled) older than the cutoff. Returns the number of deleted runs. Associated steps, logs, and labels are cascade-deleted. +Deletes terminal runs (completed, failed, cancelled) with `completedAt` older than the cutoff. Returns the number of deleted runs. Associated steps, logs, and labels are cascade-deleted. For automatic cleanup, use the [`retainRuns`](#options) option instead. diff --git a/website/api/index.md b/website/api/index.md index b1642c5..d5f67a0 100644 --- a/website/api/index.md +++ b/website/api/index.md @@ -209,16 +209,16 @@ function ImportButton() { ### Instance Methods -| Method | Description | -| --------------------- | --------------------------------------------------------------------------------- | -| `init()` | Migrate database and start worker | -| `register(jobs)` | Register job definitions | -| `on(event, handler)` | Subscribe to events | -| `stop()` | Stop worker gracefully | -| `retrigger(runId)` | Retrigger completed/failed/cancelled run (validates input against current schema) | -| `cancel(runId)` | Cancel pending or leased run | -| `deleteRun(runId)` | Delete a run and its associated steps, logs, and labels | -| `purgeRuns(options?)` | Delete terminal runs older than a cutoff (for cleanup) | +| Method | Description | +| -------------------- | --------------------------------------------------------------------------------- | +| `init()` | Migrate database and start worker | +| `register(jobs)` | Register job definitions | +| `on(event, handler)` | Subscribe to events | +| `stop()` | Stop worker gracefully | +| `retrigger(runId)` | Retrigger completed/failed/cancelled run (validates input against current schema) | +| `cancel(runId)` | Cancel pending or leased run | +| `deleteRun(runId)` | Delete a run and its associated steps, logs, and labels | +| `purgeRuns(options)` | Delete terminal runs older than a cutoff (for cleanup) | ### Step Context diff --git a/website/public/llms.txt b/website/public/llms.txt index e44493e..bccad7c 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -36,7 +36,7 @@ const durably = createDurably({ leaseRenewIntervalMs: 5000, // Lease renewal interval (ms) leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup) - retainRuns: '30d', // Auto-delete terminal runs older than 30 days (optional; supports 'd', 'h', 'm' units) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days (runs during worker polling; supports 'd', 'h', 'm' units) // Optional: type-safe labels with Zod schema // labels: z.object({ organizationId: z.string(), env: z.string() }), jobs: { @@ -243,7 +243,7 @@ const deleted = await durably.purgeRuns({ }) ``` -For automatic cleanup, use the `retainRuns` option (see Quick Start). +For automatic cleanup, use the `retainRuns` option (see Quick Start). Cleanup runs during idle worker polling cycles, at most once per minute, in batches of 100. ## Events