diff --git a/CLAUDE.md b/CLAUDE.md index f8cadd9..838f571 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -45,6 +45,7 @@ Five tables: `durably_runs`, `durably_run_labels`, `durably_steps`, `durably_log - `leaseRenewIntervalMs`: 5000ms - `leaseMs`: 30000ms (lease duration; expired leases are reclaimed) - `preserveSteps`: false (deletes step output data when runs reach terminal state) +- `retainRuns`: undefined (no automatic cleanup; set e.g. `'30d'` to auto-delete terminal runs) ## Browser Constraints (by design) diff --git a/packages/durably/docs/llms.md b/packages/durably/docs/llms.md index 9086f2d..f5a4c65 100644 --- a/packages/durably/docs/llms.md +++ b/packages/durably/docs/llms.md @@ -36,6 +36,7 @@ const durably = createDurably({ leaseRenewIntervalMs: 5000, // Lease renewal interval (ms) leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days (runs during worker polling; supports 'd', 'h', 'm' units) // Optional: type-safe labels with Zod schema // labels: z.object({ organizationId: z.string(), env: z.string() }), jobs: { @@ -229,6 +230,21 @@ await durably.cancel(runId) await durably.deleteRun(runId) ``` +### Purge Old Runs + +Batch-delete terminal runs (completed, failed, cancelled) older than a cutoff date. +Pending and leased runs are never deleted. + +```ts +// Delete terminal runs older than 30 days +const deleted = await durably.purgeRuns({ + olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000), + limit: 500, // optional batch size (default: 1000) +}) +``` + +For automatic cleanup, use the `retainRuns` option (see Quick Start). Cleanup runs during idle worker polling cycles, at most once per minute, in batches of 100. + ## Events Subscribe to job execution events: diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index f7878d1..fa84469 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -71,6 +71,12 @@ export interface DurablyOptions< * ``` */ jobs?: TJobs + /** + * Auto-delete terminal runs older than the specified duration. + * Only runs in terminal states (completed, failed, cancelled) are purged. + * @example '30d' (30 days), '24h' (24 hours), '60m' (60 minutes) + */ + retainRuns?: string } /** @@ -83,6 +89,25 @@ const DEFAULTS = { preserveSteps: false, } as const +function parseDuration(value: string): number { + const match = value.match(/^(\d+)(d|h|m)$/) + if (!match) { + throw new Error( + `Invalid duration format: "${value}". Use e.g. '30d', '24h', '60m'`, + ) + } + const num = Number.parseInt(match[1], 10) + const unit = match[2] + const multipliers: Record = { + d: 86400000, + h: 3600000, + m: 60000, + } + return num * multipliers[unit] +} + +const PURGE_INTERVAL_MS = 60_000 + const ulid = monotonicFactory() const BROWSER_SINGLETON_REGISTRY_KEY = '__durablyBrowserSingletonRegistry' const BROWSER_LOCAL_DIALECT_KEY = '__durablyBrowserLocalKey' @@ -307,6 +332,13 @@ export interface Durably< */ deleteRun(runId: string): Promise + /** + * Delete terminal runs older than the specified cutoff. + * Only runs in terminal states (completed, failed, cancelled) are purged. + * @returns Number of deleted runs + */ + purgeRuns(options: { olderThan: Date; limit?: number }): Promise + /** * Get a run by ID * @example @@ -376,6 +408,8 @@ interface DurablyState< migrated: boolean leaseMs: number leaseRenewIntervalMs: number + retainRunsMs: number | null + lastPurgeAt: number releaseBrowserSingleton: () => void } @@ -864,26 +898,38 @@ function createDurablyInstance< }) }, + async purgeRuns(options: { + olderThan: Date + limit?: number + }): Promise { + return storage.purgeRuns({ + olderThan: options.olderThan.toISOString(), + limit: options.limit, + }) + }, + async processOne(options?: { workerId?: string }): Promise { const workerId = options?.workerId ?? defaultWorkerId() const now = new Date().toISOString() await storage.releaseExpiredLeases(now) - const leasedRuns = await storage.getRuns({ status: 'leased' }) - const excludeConcurrencyKeys = leasedRuns - .filter( - (entry): entry is Run & { concurrencyKey: string } => - entry.concurrencyKey !== null && - entry.leaseExpiresAt !== null && - entry.leaseExpiresAt > now, - ) - .map((entry) => entry.concurrencyKey) - - const run = await storage.claimNext(workerId, now, state.leaseMs, { - excludeConcurrencyKeys, - }) + const run = await storage.claimNext(workerId, now, state.leaseMs) if (!run) { + // Auto-purge old terminal runs if retainRuns is configured. + // Runs after claimNext so purge never serializes with job claiming. + // lastPurgeAt starts at 0, so the first idle cycle purges immediately. + if ( + state.retainRunsMs !== null && + Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS + ) { + const purgeNow = Date.now() + state.lastPurgeAt = purgeNow + const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString() + storage.purgeRuns({ olderThan: cutoff, limit: 100 }).catch(() => { + // Purge failure is non-fatal — will retry on next interval + }) + } return false } @@ -978,6 +1024,7 @@ export function createDurably< options.leaseRenewIntervalMs ?? DEFAULTS.leaseRenewIntervalMs, leaseMs: options.leaseMs ?? DEFAULTS.leaseMs, preserveSteps: options.preserveSteps ?? DEFAULTS.preserveSteps, + retainRunsMs: options.retainRuns ? parseDuration(options.retainRuns) : null, } const db = new Kysely({ dialect: options.dialect }) @@ -1023,6 +1070,8 @@ export function createDurably< migrated: false, leaseMs: config.leaseMs, leaseRenewIntervalMs: config.leaseRenewIntervalMs, + retainRunsMs: config.retainRunsMs, + lastPurgeAt: 0, releaseBrowserSingleton, } diff --git a/packages/durably/src/migrations.ts b/packages/durably/src/migrations.ts index d4e0382..4c394ac 100644 --- a/packages/durably/src/migrations.ts +++ b/packages/durably/src/migrations.ts @@ -80,6 +80,13 @@ const migrations: Migration[] = [ .columns(['job_name', 'created_at']) .execute() + await db.schema + .createIndex('idx_durably_runs_status_completed') + .ifNotExists() + .on('durably_runs') + .columns(['status', 'completed_at']) + .execute() + // Create normalized labels table for indexed label filtering await db.schema .createTable('durably_run_labels') diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index f09f134..677201e 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -11,6 +11,9 @@ export type RunStatus = | 'failed' | 'cancelled' +/** Run statuses that represent terminal (non-active) states */ +const TERMINAL_STATUSES: RunStatus[] = ['completed', 'failed', 'cancelled'] + /** * Run data for creating a new run */ @@ -125,10 +128,6 @@ export interface ProgressData { message?: string } -export interface ClaimOptions { - excludeConcurrencyKeys?: string[] -} - export type DatabaseBackend = 'generic' | 'postgres' /** @@ -169,7 +168,6 @@ export interface Store< workerId: string, now: string, leaseMs: number, - options?: ClaimOptions, ): Promise | null> renewLease( runId: string, @@ -215,6 +213,9 @@ export interface Store< progress: ProgressData | null, ): Promise + // Purge + purgeRuns(options: { olderThan: string; limit?: number }): Promise + // Logs createLog(input: CreateLogInput): Promise getLogs(runId: string): Promise @@ -361,6 +362,20 @@ export function createKyselyStore( ): Store> { const withWriteLock = createWriteMutex() + /** Delete runs and all associated data (steps, logs, labels) in dependency order */ + async function cascadeDeleteRuns( + trx: Kysely, + ids: string[], + ): Promise { + await trx.deleteFrom('durably_steps').where('run_id', 'in', ids).execute() + await trx.deleteFrom('durably_logs').where('run_id', 'in', ids).execute() + await trx + .deleteFrom('durably_run_labels') + .where('run_id', 'in', ids) + .execute() + await trx.deleteFrom('durably_runs').where('id', 'in', ids).execute() + } + async function insertLabelRows( executor: Kysely, runId: string, @@ -648,19 +663,31 @@ export function createKyselyStore( async deleteRun(runId: string) { await db.transaction().execute(async (trx) => { - await trx - .deleteFrom('durably_steps') - .where('run_id', '=', runId) - .execute() - await trx - .deleteFrom('durably_logs') - .where('run_id', '=', runId) - .execute() - await trx - .deleteFrom('durably_run_labels') - .where('run_id', '=', runId) + await cascadeDeleteRuns(trx, [runId]) + }) + }, + + async purgeRuns(options: { + olderThan: string + limit?: number + }): Promise { + const limit = options.limit ?? 1000 + + return await db.transaction().execute(async (trx) => { + const rows = await trx + .selectFrom('durably_runs') + .select('id') + .where('status', 'in', TERMINAL_STATUSES) + .where('completed_at', '<', options.olderThan) + .orderBy('completed_at', 'asc') + .limit(limit) .execute() - await trx.deleteFrom('durably_runs').where('id', '=', runId).execute() + + if (rows.length === 0) return 0 + + const ids = rows.map((r) => r.id) + await cascadeDeleteRuns(trx, ids) + return ids.length }) }, @@ -668,10 +695,8 @@ export function createKyselyStore( workerId: string, now: string, leaseMs: number, - options?: ClaimOptions, ): Promise { const leaseExpiresAt = new Date(Date.parse(now) + leaseMs).toISOString() - const excludeConcurrencyKeys = options?.excludeConcurrencyKeys ?? [] const activeLeaseGuard = sql` ( concurrency_key IS NULL @@ -689,7 +714,7 @@ export function createKyselyStore( if (backend === 'postgres') { return await db.transaction().execute(async (trx) => { - const skipKeys = [...excludeConcurrencyKeys] + const skipKeys: string[] = [] // Loop: on concurrency-key conflict, exclude that key and retry // to find the next eligible candidate in the same transaction. @@ -790,15 +815,6 @@ export function createKyselyStore( .orderBy('id', 'asc') .limit(1) - if (excludeConcurrencyKeys.length > 0) { - subquery = subquery.where((eb) => - eb.or([ - eb('concurrency_key', 'is', null), - eb('concurrency_key', 'not in', excludeConcurrencyKeys), - ]), - ) - } - const row = await db .updateTable('durably_runs') .set({ @@ -1035,6 +1051,7 @@ export function createKyselyStore( 'enqueueMany', 'updateRun', 'deleteRun', + 'purgeRuns', 'claimNext', 'renewLease', 'releaseExpiredLeases', diff --git a/packages/durably/tests/node/migration-consolidated.test.ts b/packages/durably/tests/node/migration-consolidated.test.ts index d34758d..b1288c8 100644 --- a/packages/durably/tests/node/migration-consolidated.test.ts +++ b/packages/durably/tests/node/migration-consolidated.test.ts @@ -61,6 +61,7 @@ describe('migration consolidated schema', () => { expect(indexNames).toContain('idx_durably_runs_status_created') expect(indexNames).toContain('idx_durably_runs_status_lease_expires') expect(indexNames).toContain('idx_durably_runs_job_created') + expect(indexNames).toContain('idx_durably_runs_status_completed') // Steps indexes expect(indexNames).toContain('idx_durably_steps_run_index') diff --git a/packages/durably/tests/node/purge.test.ts b/packages/durably/tests/node/purge.test.ts new file mode 100644 index 0000000..25b8791 --- /dev/null +++ b/packages/durably/tests/node/purge.test.ts @@ -0,0 +1,4 @@ +import { createNodeDialect } from '../helpers/node-dialect' +import { createPurgeTests } from '../shared/purge.shared' + +createPurgeTests(createNodeDialect) diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts new file mode 100644 index 0000000..ff051f1 --- /dev/null +++ b/packages/durably/tests/shared/purge.shared.ts @@ -0,0 +1,296 @@ +import type { Dialect } from 'kysely' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { z } from 'zod' +import { createDurably, defineJob, type Durably } from '../../src' + +export function createPurgeTests(createDialect: () => Dialect) { + describe('purgeRuns', () => { + let durably: Durably + + beforeEach(async () => { + durably = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + }) + await durably.migrate() + }) + + afterEach(async () => { + await durably.stop() + await durably.db.destroy() + }) + + const testJob = defineJob({ + name: 'purge-test-job', + input: z.object({}), + run: async () => {}, + }) + + const failingJob = defineJob({ + name: 'purge-failing-job', + input: z.object({}), + run: async () => { + throw new Error('fail') + }, + }) + + describe('durably.purgeRuns()', () => { + it('deletes completed runs older than cutoff', async () => { + const d = durably.register({ testJob }) + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Purge with cutoff in the future — should delete the run + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(1) + expect(await d.getRun(run.id)).toBeNull() + }) + + it('deletes failed runs older than cutoff', async () => { + const d = durably.register({ failingJob }) + d.start() + + const run = await d.jobs.failingJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('failed') + }, + { timeout: 5000 }, + ) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(1) + expect(await d.getRun(run.id)).toBeNull() + }) + + it('deletes cancelled runs older than cutoff', async () => { + const d = durably.register({ testJob }) + + const run = await d.jobs.testJob.trigger({}) + await d.cancel(run.id) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(1) + expect(await d.getRun(run.id)).toBeNull() + }) + + it('does NOT delete pending or leased runs', async () => { + const d = durably.register({ testJob }) + + // Create a pending run (don't start worker) + const run = await d.jobs.testJob.trigger({}) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate }) + + expect(deleted).toBe(0) + expect(await d.getRun(run.id)).not.toBeNull() + }) + + it('does NOT delete runs newer than cutoff', async () => { + const d = durably.register({ testJob }) + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Purge with cutoff in the past — should not delete + const pastDate = new Date(Date.now() - 60000) + const deleted = await d.purgeRuns({ olderThan: pastDate }) + + expect(deleted).toBe(0) + expect(await d.getRun(run.id)).not.toBeNull() + }) + + it('respects the limit parameter', async () => { + const d = durably.register({ testJob }) + d.start() + + // Trigger 3 runs + await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const runs = await d.getRuns({ status: 'completed' }) + expect(runs.length).toBe(3) + }, + { timeout: 5000 }, + ) + + const futureDate = new Date(Date.now() + 60000) + const deleted = await d.purgeRuns({ olderThan: futureDate, limit: 2 }) + + expect(deleted).toBe(2) + const remaining = await d.getRuns() + expect(remaining.length).toBe(1) + }) + + it('returns 0 when no runs match', async () => { + const d = durably.register({ testJob }) + const deleted = await d.purgeRuns({ + olderThan: new Date(Date.now() + 60000), + }) + expect(deleted).toBe(0) + }) + + it('also deletes associated steps', async () => { + const jobWithSteps = defineJob({ + name: 'purge-steps-job', + input: z.object({}), + run: async (ctx) => { + await ctx.run('step1', () => 'result1') + await ctx.run('step2', () => 'result2') + }, + }) + + // preserveSteps: true so steps remain after completion + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + preserveSteps: true, + }).register({ jobWithSteps }) + + await d.migrate() + d.start() + const run = await d.jobs.jobWithSteps.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Verify steps exist before purge + const steps = await d.storage.getSteps(run.id) + expect(steps.length).toBeGreaterThan(0) + + const futureDate = new Date(Date.now() + 60000) + await d.purgeRuns({ olderThan: futureDate }) + + // Steps should be gone too + const stepsAfter = await d.storage.getSteps(run.id) + expect(stepsAfter.length).toBe(0) + + await d.stop() + await d.db.destroy() + }) + }) + + describe('retainRuns option', () => { + it('auto-purges old runs during worker polling', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '1m', // 1 minute retention + }).register({ testJob }) + + await d.migrate() + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Run just completed — should NOT be purged (it's not 1 minute old) + // Wait a polling cycle to ensure auto-purge ran + await new Promise((r) => setTimeout(r, 200)) + expect(await d.getRun(run.id)).not.toBeNull() + + await d.stop() + await d.db.destroy() + }) + + it('auto-purges runs with backdated completed_at', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '1m', // 1 minute retention + }).register({ testJob }) + + await d.migrate() + d.start() + + const run = await d.jobs.testJob.trigger({}) + + await vi.waitFor( + async () => { + const r = await d.getRun(run.id) + expect(r?.status).toBe('completed') + }, + { timeout: 5000 }, + ) + + // Backdate completed_at to 2 minutes ago so it's older than retention + const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000).toISOString() + await d.db + .updateTable('durably_runs') + .set({ completed_at: twoMinutesAgo }) + .where('id', '=', run.id) + .execute() + + // Wait for auto-purge to fire during idle polling + await vi.waitFor( + async () => { + expect(await d.getRun(run.id)).toBeNull() + }, + { timeout: 5000 }, + ) + + await d.stop() + await d.db.destroy() + }) + + it('throws on invalid retainRuns format', () => { + expect(() => + createDurably({ + dialect: createDialect(), + retainRuns: 'invalid', + }), + ).toThrow('Invalid duration format') + }) + + it('throws on invalid retainRuns unit', () => { + expect(() => + createDurably({ + dialect: createDialect(), + retainRuns: '30s', + }), + ).toThrow('Invalid duration format') + }) + }) + }) +} diff --git a/packages/durably/tests/shared/storage.shared.ts b/packages/durably/tests/shared/storage.shared.ts index 5db841a..7272870 100644 --- a/packages/durably/tests/shared/storage.shared.ts +++ b/packages/durably/tests/shared/storage.shared.ts @@ -403,54 +403,6 @@ export function createStorageTests(createDialect: () => Dialect) { expect(result).toBeNull() }) - it('claimNext respects concurrency key exclusion', async () => { - await durably.storage.enqueue({ - jobName: 'job', - input: {}, - concurrencyKey: 'key-a', - }) - const run2 = await durably.storage.enqueue({ - jobName: 'job', - input: {}, - concurrencyKey: 'key-b', - }) - - const claimed = await durably.storage.claimNext( - 'test-worker', - new Date().toISOString(), - 30_000, - { excludeConcurrencyKeys: ['key-a'] }, - ) - - expect(claimed).not.toBeNull() - expect(claimed!.id).toBe(run2.id) - expect(claimed!.concurrencyKey).toBe('key-b') - expect(claimed!.status).toBe('leased') - }) - - it('claimNext skips runs with null concurrency key when not excluded', async () => { - const run1 = await durably.storage.enqueue({ - jobName: 'job', - input: {}, - }) - await durably.storage.enqueue({ - jobName: 'job', - input: {}, - concurrencyKey: 'key-a', - }) - - // Excluding key-a should still return the run without a concurrency key - const claimed = await durably.storage.claimNext( - 'test-worker', - new Date().toISOString(), - 30_000, - { excludeConcurrencyKeys: ['key-a'] }, - ) - - expect(claimed).not.toBeNull() - expect(claimed!.id).toBe(run1.id) - }) - it('claimNext preserves started_at on re-claim of recovered run', async () => { // Create and claim a run const created = await durably.storage.enqueue({ diff --git a/website/api/create-durably.md b/website/api/create-durably.md index 6e4e2fb..b6e2caf 100644 --- a/website/api/create-durably.md +++ b/website/api/create-durably.md @@ -28,20 +28,22 @@ interface DurablyOptions< leaseRenewIntervalMs?: number leaseMs?: number preserveSteps?: boolean + retainRuns?: string labels?: z.ZodType jobs?: TJobs } ``` -| Option | Type | Default | Description | -| ---------------------- | ----------- | -------- | ------------------------------------------------------------------------------------- | -| `dialect` | `Dialect` | required | Kysely dialect (SQLite, libSQL, or PostgreSQL) | -| `pollingIntervalMs` | `number` | `1000` | How often to check for pending jobs (ms) | -| `leaseRenewIntervalMs` | `number` | `5000` | How often to renew the lease (ms) | -| `leaseMs` | `number` | `30000` | Lease duration — time until a job is considered stale (ms) | -| `labels` | `z.ZodType` | — | Zod schema for labels. Enables type-safe labels and runtime validation on `trigger()` | -| `preserveSteps` | `boolean` | `false` | Keep step output data when runs reach terminal state (completed/failed/cancelled) | -| `jobs` | `TJobs` | — | Job definitions to register. Shorthand for calling `.register()` after creation | +| Option | Type | Default | Description | +| ---------------------- | ----------- | -------- | ----------------------------------------------------------------------------------------------------------------- | +| `dialect` | `Dialect` | required | Kysely dialect (SQLite, libSQL, or PostgreSQL) | +| `pollingIntervalMs` | `number` | `1000` | How often to check for pending jobs (ms) | +| `leaseRenewIntervalMs` | `number` | `5000` | How often to renew the lease (ms) | +| `leaseMs` | `number` | `30000` | Lease duration — time until a job is considered stale (ms) | +| `labels` | `z.ZodType` | — | Zod schema for labels. Enables type-safe labels and runtime validation on `trigger()` | +| `preserveSteps` | `boolean` | `false` | Keep step output data when runs reach terminal state (completed/failed/cancelled) | +| `retainRuns` | `string` | — | Auto-delete terminal runs older than this duration (e.g. `'30d'`, `'12h'`, `'90m'`). Throws if format is invalid. | +| `jobs` | `TJobs` | — | Job definitions to register. Shorthand for calling `.register()` after creation | ## Returns @@ -148,6 +150,19 @@ await durably.deleteRun(runId: string): Promise Deletes a run and its associated steps and logs. +### `purgeRuns()` + +```ts +await durably.purgeRuns(options: { + olderThan: Date // cutoff — terminal runs with completedAt before this are deleted + limit?: number // max rows to delete per call (default: 1000) +}): Promise +``` + +Deletes terminal runs (completed, failed, cancelled) with `completedAt` older than the cutoff. Returns the number of deleted runs. Associated steps, logs, and labels are cascade-deleted. + +For automatic cleanup, use the [`retainRuns`](#options) option instead. + ### `getRun()` ```ts diff --git a/website/api/index.md b/website/api/index.md index ce4f57d..d5f67a0 100644 --- a/website/api/index.md +++ b/website/api/index.md @@ -65,6 +65,7 @@ const durably = createDurably({ pollingIntervalMs: 1000, // Check for jobs every 1s leaseRenewIntervalMs: 5000, // Lease renewal every 5s leaseMs: 30000, // Lease duration (stale after 30s) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days // labels: z.object({ ... }), // Optional: type-safe labels jobs: { importCsv: importCsvJob }, }) @@ -216,6 +217,8 @@ function ImportButton() { | `stop()` | Stop worker gracefully | | `retrigger(runId)` | Retrigger completed/failed/cancelled run (validates input against current schema) | | `cancel(runId)` | Cancel pending or leased run | +| `deleteRun(runId)` | Delete a run and its associated steps, logs, and labels | +| `purgeRuns(options)` | Delete terminal runs older than a cutoff (for cleanup) | ### Step Context diff --git a/website/public/llms.txt b/website/public/llms.txt index 0dc9f71..bccad7c 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -36,6 +36,7 @@ const durably = createDurably({ leaseRenewIntervalMs: 5000, // Lease renewal interval (ms) leaseMs: 30000, // Lease duration (ms); expired leases are reclaimed preserveSteps: false, // Set to true to keep step output data after terminal state (default: false = cleanup) + retainRuns: '30d', // Auto-delete terminal runs older than 30 days (runs during worker polling; supports 'd', 'h', 'm' units) // Optional: type-safe labels with Zod schema // labels: z.object({ organizationId: z.string(), env: z.string() }), jobs: { @@ -229,6 +230,21 @@ await durably.cancel(runId) await durably.deleteRun(runId) ``` +### Purge Old Runs + +Batch-delete terminal runs (completed, failed, cancelled) older than a cutoff date. +Pending and leased runs are never deleted. + +```ts +// Delete terminal runs older than 30 days +const deleted = await durably.purgeRuns({ + olderThan: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000), + limit: 500, // optional batch size (default: 1000) +}) +``` + +For automatic cleanup, use the `retainRuns` option (see Quick Start). Cleanup runs during idle worker polling cycles, at most once per minute, in batches of 100. + ## Events Subscribe to job execution events: