diff --git a/packages/durably/docs/llms.md b/packages/durably/docs/llms.md index dcecdd0c..34b51347 100644 --- a/packages/durably/docs/llms.md +++ b/packages/durably/docs/llms.md @@ -24,6 +24,7 @@ pnpm add @coji/durably kysely zod sqlocal import { createDurably } from '@coji/durably' import { LibsqlDialect } from '@libsql/kysely-libsql' import { createClient } from '@libsql/client' +import { z } from 'zod' const client = createClient({ url: 'file:local.db' }) const dialect = new LibsqlDialect({ client }) @@ -33,6 +34,8 @@ const durably = createDurably({ pollingInterval: 1000, // Job polling interval (ms) heartbeatInterval: 5000, // Heartbeat update interval (ms) staleThreshold: 30000, // When to consider a job abandoned (ms) + // Optional: type-safe labels with Zod schema + // labels: z.object({ organizationId: z.string(), env: z.string() }), }) ``` @@ -450,40 +453,64 @@ interface StepContext { } } -interface Run { +// TLabels defaults to Record when no labels schema is provided +interface Run = Record> { id: string jobName: string status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' input: unknown - labels: Record - output?: TOutput - error?: string - progress?: { current: number; total?: number; message?: string } + labels: TLabels + output: unknown | null + error: string | null + progress: { current: number; total?: number; message?: string } | null startedAt: string | null completedAt: string | null createdAt: string updatedAt: string } -interface JobHandle { +interface TypedRun< + TOutput, + TLabels extends Record = Record, +> extends Omit, 'output'> { + output: TOutput | null +} + +interface JobHandle< + TName extends string, + TInput, + TOutput, + TLabels extends Record = Record, +> { name: TName - trigger(input: TInput, options?: TriggerOptions): Promise> + trigger( + input: TInput, + options?: TriggerOptions, + ): Promise> triggerAndWait( input: TInput, - options?: TriggerAndWaitOptions, + options?: TriggerAndWaitOptions, ): Promise<{ id: string; output: TOutput }> - batchTrigger(inputs: BatchTriggerInput[]): Promise[]> - getRun(id: string): Promise | null> - getRuns(filter?: RunFilter): Promise[]> + batchTrigger( + inputs: BatchTriggerInput[], + ): Promise[]> + getRun(id: string): Promise | null> + getRuns( + filter?: Omit, 'jobName'>, + ): Promise[]> } -interface TriggerOptions { +interface TriggerOptions< + TLabels extends Record = Record, +> { idempotencyKey?: string concurrencyKey?: string - labels?: Record + labels?: TLabels } -interface TriggerAndWaitOptions extends TriggerOptions { +interface TriggerAndWaitOptions< + TLabels extends Record = Record, +> extends TriggerOptions { timeout?: number onProgress?: (progress: ProgressData) => void | Promise onLog?: (log: LogData) => void | Promise @@ -502,10 +529,12 @@ interface LogData { stepName?: string | null } -interface RunFilter { +interface RunFilter< + TLabels extends Record = Record, +> { status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' jobName?: string | string[] - labels?: Record + labels?: Partial limit?: number offset?: number } diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index f771441f..771dd67e 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -1,5 +1,6 @@ import type { Dialect } from 'kysely' import { Kysely } from 'kysely' +import type { z } from 'zod' import type { JobDefinition } from './define-job' import { type AnyEventInput, @@ -30,11 +31,19 @@ import { type Worker, createWorker } from './worker' /** * Options for creating a Durably instance */ -export interface DurablyOptions { +export interface DurablyOptions< + TLabels extends Record = Record, +> { dialect: Dialect pollingInterval?: number heartbeatInterval?: number staleThreshold?: number + /** + * Zod schema for labels. When provided: + * - Labels are type-checked at compile time + * - Labels are validated at runtime on trigger() + */ + labels?: z.ZodType } /** @@ -52,7 +61,7 @@ const DEFAULTS = { export interface DurablyPlugin { name: string // biome-ignore lint/suspicious/noExplicitAny: plugin needs to accept any Durably instance - install(durably: Durably): void + install(durably: Durably): void } /** @@ -60,13 +69,14 @@ export interface DurablyPlugin { */ type TransformToHandles< TJobs extends Record>, + TLabels extends Record = Record, > = { [K in keyof TJobs]: TJobs[K] extends JobDefinition< infer TName, infer TInput, infer TOutput > - ? JobHandle + ? JobHandle : never } @@ -74,10 +84,11 @@ type TransformToHandles< * Durably instance with type-safe jobs */ export interface Durably< - TJobs extends Record> = Record< + TJobs extends Record< string, - never - >, + JobHandle> + > = Record, + TLabels extends Record = Record, > { /** * Registered job handles (type-safe) @@ -145,7 +156,7 @@ export interface Durably< // biome-ignore lint/suspicious/noExplicitAny: flexible type constraint for job definitions register>>( jobDefs: TNewJobs, - ): Durably> + ): Durably, TLabels> /** * Start the worker polling loop @@ -187,7 +198,9 @@ export interface Durably< * const typedRun = await durably.getRun(runId) * ``` */ - getRun(runId: string): Promise + getRun = Run>( + runId: string, + ): Promise /** * Get runs with optional filtering @@ -201,7 +214,9 @@ export interface Durably< * const typedRuns = await durably.getRuns({ jobName: 'my-job' }) * ``` */ - getRuns(filter?: RunFilter): Promise + getRuns = Run>( + filter?: RunFilter, + ): Promise /** * Register a plugin @@ -214,7 +229,7 @@ export interface Durably< */ getJob( name: TName, - ): JobHandle, unknown> | undefined + ): JobHandle, unknown, TLabels> | undefined /** * Subscribe to events for a specific run @@ -232,6 +247,7 @@ interface DurablyState { eventEmitter: EventEmitter jobRegistry: JobRegistry worker: Worker + labelsSchema: z.ZodType | undefined migrating: Promise | null migrated: boolean } @@ -240,11 +256,15 @@ interface DurablyState { * Create a Durably instance implementation */ function createDurablyInstance< - TJobs extends Record>, ->(state: DurablyState, jobs: TJobs): Durably { + TJobs extends Record< + string, + JobHandle> + >, + TLabels extends Record = Record, +>(state: DurablyState, jobs: TJobs): Durably { const { db, storage, eventEmitter, jobRegistry, worker } = state - const durably: Durably = { + const durably: Durably = { db, storage, jobs, @@ -257,8 +277,8 @@ function createDurablyInstance< // biome-ignore lint/suspicious/noExplicitAny: flexible type constraint for job definitions register>>( jobDefs: TNewJobs, - ): Durably> { - const newHandles = {} as TransformToHandles + ): Durably, TLabels> { + const newHandles = {} as TransformToHandles for (const key of Object.keys(jobDefs) as (keyof TNewJobs)[]) { const jobDef = jobDefs[key] @@ -267,14 +287,21 @@ function createDurablyInstance< storage, eventEmitter, jobRegistry, + state.labelsSchema as z.ZodType | undefined, ) - newHandles[key] = handle as TransformToHandles[typeof key] + newHandles[key] = handle as TransformToHandles< + TNewJobs, + TLabels + >[typeof key] } // Create new instance with merged jobs const mergedJobs = { ...jobs, ...newHandles } as TJobs & - TransformToHandles - return createDurablyInstance(state, mergedJobs) + TransformToHandles + return createDurablyInstance( + state, + mergedJobs, + ) }, getRun: storage.getRun, @@ -286,7 +313,7 @@ function createDurablyInstance< getJob( name: TName, - ): JobHandle, unknown> | undefined { + ): JobHandle, unknown, TLabels> | undefined { const registeredJob = jobRegistry.get(name) if (!registeredJob) { return undefined @@ -294,7 +321,8 @@ function createDurablyInstance< return registeredJob.handle as JobHandle< TName, Record, - unknown + unknown, + TLabels > }, @@ -525,9 +553,9 @@ function createDurablyInstance< /** * Create a Durably instance */ -export function createDurably( - options: DurablyOptions, -): Durably> { +export function createDurably< + TLabels extends Record = Record, +>(options: DurablyOptions): Durably, TLabels> { const config = { pollingInterval: options.pollingInterval ?? DEFAULTS.pollingInterval, heartbeatInterval: options.heartbeatInterval ?? DEFAULTS.heartbeatInterval, @@ -546,9 +574,10 @@ export function createDurably( eventEmitter, jobRegistry, worker, + labelsSchema: options.labels, migrating: null, migrated: false, } - return createDurablyInstance(state, {}) + return createDurablyInstance, TLabels>(state, {}) } diff --git a/packages/durably/src/index.ts b/packages/durably/src/index.ts index 9c222358..eb7db9e7 100644 --- a/packages/durably/src/index.ts +++ b/packages/durably/src/index.ts @@ -38,10 +38,12 @@ export type { // Job types export type { + BatchTriggerInput, JobHandle, StepContext, TriggerAndWaitOptions, TriggerAndWaitResult, + TriggerOptions, } from './job' // Schema types (for advanced users) diff --git a/packages/durably/src/job.ts b/packages/durably/src/job.ts index a5171985..443392df 100644 --- a/packages/durably/src/job.ts +++ b/packages/durably/src/job.ts @@ -62,16 +62,20 @@ export type JobFunction = ( /** * Trigger options for trigger() and batchTrigger() */ -export interface TriggerOptions { +export interface TriggerOptions< + TLabels extends Record = Record, +> { idempotencyKey?: string concurrencyKey?: string - labels?: Record + labels?: TLabels } /** * Options for triggerAndWait() (extends TriggerOptions with wait-specific options) */ -export interface TriggerAndWaitOptions extends TriggerOptions { +export interface TriggerAndWaitOptions< + TLabels extends Record = Record, +> extends TriggerOptions { /** Timeout in milliseconds */ timeout?: number /** Called when step.progress() is invoked during execution */ @@ -83,16 +87,20 @@ export interface TriggerAndWaitOptions extends TriggerOptions { /** * Typed run with output type */ -export interface TypedRun extends Omit { +export interface TypedRun< + TOutput, + TLabels extends Record = Record, +> extends Omit, 'output'> { output: TOutput | null } /** * Batch trigger input - either just the input or input with options */ -export type BatchTriggerInput = - | TInput - | { input: TInput; options?: TriggerOptions } +export type BatchTriggerInput< + TInput, + TLabels extends Record = Record, +> = TInput | { input: TInput; options?: TriggerOptions } /** * Result of triggerAndWait @@ -105,13 +113,21 @@ export interface TriggerAndWaitResult { /** * Job handle returned by defineJob */ -export interface JobHandle { +export interface JobHandle< + TName extends string, + TInput, + TOutput, + TLabels extends Record = Record, +> { readonly name: TName /** * Trigger a new run */ - trigger(input: TInput, options?: TriggerOptions): Promise> + trigger( + input: TInput, + options?: TriggerOptions, + ): Promise> /** * Trigger a new run and wait for completion @@ -119,7 +135,7 @@ export interface JobHandle { */ triggerAndWait( input: TInput, - options?: TriggerAndWaitOptions, + options?: TriggerAndWaitOptions, ): Promise> /** @@ -127,18 +143,20 @@ export interface JobHandle { * All inputs are validated before any runs are created */ batchTrigger( - inputs: BatchTriggerInput[], - ): Promise[]> + inputs: BatchTriggerInput[], + ): Promise[]> /** * Get a run by ID */ - getRun(id: string): Promise | null> + getRun(id: string): Promise | null> /** * Get runs with optional filter */ - getRuns(filter?: Omit): Promise[]> + getRuns( + filter?: Omit, 'jobName'>, + ): Promise[]> } /** @@ -148,9 +166,11 @@ export interface RegisteredJob { name: string inputSchema: z.ZodType outputSchema: z.ZodType | undefined + labelsSchema: z.ZodType | undefined fn: JobFunction jobDef: JobDefinition - handle: JobHandle + // biome-ignore lint/suspicious/noExplicitAny: handle may have any labels type + handle: JobHandle } /** @@ -197,18 +217,24 @@ export function createJobRegistry(): JobRegistry { /** * Create a job handle from a JobDefinition */ -export function createJobHandle( +export function createJobHandle< + TName extends string, + TInput, + TOutput, + TLabels extends Record = Record, +>( jobDef: JobDefinition, storage: Storage, eventEmitter: EventEmitter, registry: JobRegistry, -): JobHandle { + labelsSchema?: z.ZodType, +): JobHandle { // Check if same JobDefinition is already registered (idempotent) const existingJob = registry.get(jobDef.name) if (existingJob) { // If same JobDefinition (same reference), return existing handle if (existingJob.jobDef === jobDef) { - return existingJob.handle as JobHandle + return existingJob.handle as JobHandle } // Different JobDefinition with same name - error throw new Error( @@ -219,16 +245,21 @@ export function createJobHandle( const inputSchema = jobDef.input as z.ZodType const outputSchema = jobDef.output as z.ZodType | undefined - const handle: JobHandle = { + const handle: JobHandle = { name: jobDef.name, async trigger( input: TInput, - options?: TriggerOptions, - ): Promise> { + options?: TriggerOptions, + ): Promise> { // Validate input const validatedInput = validateJobInputOrThrow(inputSchema, input) + // Validate labels if schema provided + if (labelsSchema && options?.labels) { + validateJobInputOrThrow(labelsSchema, options.labels, 'labels') + } + // Create the run const run = await storage.createRun({ jobName: jobDef.name, @@ -247,12 +278,12 @@ export function createJobHandle( labels: run.labels, }) - return run as TypedRun + return run as TypedRun }, async triggerAndWait( input: TInput, - options?: TriggerAndWaitOptions, + options?: TriggerAndWaitOptions, ): Promise> { // Trigger the run const run = await this.trigger(input, options) @@ -357,8 +388,8 @@ export function createJobHandle( }, async batchTrigger( - inputs: (TInput | { input: TInput; options?: TriggerOptions })[], - ): Promise[]> { + inputs: (TInput | { input: TInput; options?: TriggerOptions })[], + ): Promise[]> { if (inputs.length === 0) { return [] } @@ -366,19 +397,29 @@ export function createJobHandle( // Normalize inputs to { input, options } format const normalized = inputs.map((item) => { if (item && typeof item === 'object' && 'input' in item) { - return item as { input: TInput; options?: TriggerOptions } + return item as { input: TInput; options?: TriggerOptions } } return { input: item as TInput, options: undefined } }) - // Validate all inputs first (before creating any runs) - const validated: { input: unknown; options?: TriggerOptions }[] = [] + // Validate all inputs and labels first (before creating any runs) + const validated: { + input: unknown + options?: TriggerOptions + }[] = [] for (let i = 0; i < normalized.length; i++) { const validatedInput = validateJobInputOrThrow( inputSchema, normalized[i].input, `at index ${i}`, ) + if (labelsSchema && normalized[i].options?.labels) { + validateJobInputOrThrow( + labelsSchema, + normalized[i].options?.labels, + `labels at index ${i}`, + ) + } validated.push({ input: validatedInput, options: normalized[i].options, @@ -407,25 +448,25 @@ export function createJobHandle( }) } - return runs as TypedRun[] + return runs as TypedRun[] }, - async getRun(id: string): Promise | null> { + async getRun(id: string): Promise | null> { const run = await storage.getRun(id) if (!run || run.jobName !== jobDef.name) { return null } - return run as TypedRun + return run as TypedRun }, async getRuns( - filter?: Omit, - ): Promise[]> { + filter?: Omit, 'jobName'>, + ): Promise[]> { const runs = await storage.getRuns({ ...filter, jobName: jobDef.name, }) - return runs as TypedRun[] + return runs as TypedRun[] }, } @@ -434,9 +475,10 @@ export function createJobHandle( name: jobDef.name, inputSchema, outputSchema, + labelsSchema, fn: jobDef.run as JobFunction, jobDef: jobDef as JobDefinition, - handle: handle as JobHandle, + handle, }) return handle diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index 880d3a95..304aba6d 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -7,18 +7,22 @@ const ulid = monotonicFactory() /** * Run data for creating a new run */ -export interface CreateRunInput { +export interface CreateRunInput< + TLabels extends Record = Record, +> { jobName: string input: unknown idempotencyKey?: string concurrencyKey?: string - labels?: Record + labels?: TLabels } /** * Run data returned from storage */ -export interface Run { +export interface Run< + TLabels extends Record = Record, +> { id: string jobName: string input: unknown @@ -30,7 +34,7 @@ export interface Run { progress: { current: number; total?: number; message?: string } | null output: unknown | null error: string | null - labels: Record + labels: TLabels heartbeatAt: string startedAt: string | null completedAt: string | null @@ -55,11 +59,14 @@ export interface UpdateRunInput { /** * Run filter options */ -export interface RunFilter { +export interface RunFilter< + TLabels extends Record = Record, +> { status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' /** Filter by job name(s). Pass a string for one, or an array for multiple (OR). */ jobName?: string | string[] - labels?: Record + /** Filter by labels (all specified labels must match) */ + labels?: { [K in keyof TLabels]?: TLabels[K] } /** Maximum number of runs to return */ limit?: number /** Number of runs to skip (for pagination) */ @@ -122,15 +129,19 @@ export interface Log { * A client-safe subset of Run, excluding internal fields like * heartbeatAt, idempotencyKey, concurrencyKey, and updatedAt. */ -export type ClientRun = Omit< - Run, +export type ClientRun< + TLabels extends Record = Record, +> = Omit< + Run, 'idempotencyKey' | 'concurrencyKey' | 'heartbeatAt' | 'updatedAt' > /** * Project a full Run to a ClientRun by stripping internal fields. */ -export function toClientRun(run: Run): ClientRun { +export function toClientRun< + TLabels extends Record = Record, +>(run: Run): ClientRun { const { idempotencyKey, concurrencyKey, @@ -423,8 +434,10 @@ export function createKyselyStorage(db: Kysely): Storage { } } if (filter?.labels) { - validateLabels(filter.labels) - for (const [key, value] of Object.entries(filter.labels)) { + const labels = filter.labels as Record + validateLabels(labels) + for (const [key, value] of Object.entries(labels)) { + if (value === undefined) continue query = query.where( sql`json_extract(durably_runs.labels, ${`$."${key}"`})`, '=', diff --git a/packages/durably/tests/node/types.test.ts b/packages/durably/tests/node/types.test.ts index ee4f3b03..4b5d1cdc 100644 --- a/packages/durably/tests/node/types.test.ts +++ b/packages/durably/tests/node/types.test.ts @@ -7,7 +7,15 @@ import { describe, expectTypeOf, it } from 'vitest' import { z } from 'zod' -import { defineJob } from '../../src' +import { + createDurably, + defineJob, + type Durably, + type JobHandle, + type Run, + type RunFilter, + type TriggerOptions, +} from '../../src' describe('Type inference', () => { describe('defineJob with branded types', () => { @@ -64,4 +72,68 @@ describe('Type inference', () => { expectTypeOf(job.name).toEqualTypeOf<'branded-helpers'>() }) }) + + describe('type-safe labels', () => { + type Labels = { organizationId: string; env: string } + + it('TriggerOptions accepts typed labels', () => { + expectTypeOf>().toMatchTypeOf<{ + labels?: Labels + }>() + }) + + it('Run has typed labels', () => { + expectTypeOf>().toMatchTypeOf<{ + labels: Labels + }>() + }) + + it('RunFilter accepts partial labels', () => { + expectTypeOf>().toMatchTypeOf<{ + labels?: { organizationId?: string; env?: string } + }>() + }) + + it('Durably.getRun returns Run with typed labels', () => { + type D = Durably, Labels> + expectTypeOf< + D['getRun'] + >().returns.resolves.toMatchTypeOf | null>() + }) + + it('Durably.getRuns accepts RunFilter with typed labels', () => { + type D = Durably, Labels> + expectTypeOf() + .parameter(0) + .toMatchTypeOf | undefined>() + }) + + it('defaults to Record without labels schema', () => { + expectTypeOf().toMatchTypeOf>>() + expectTypeOf().toMatchTypeOf< + RunFilter> + >() + }) + + it('JobHandle trigger accepts typed labels', () => { + type Handle = JobHandle<'test', { id: string }, void, Labels> + expectTypeOf() + .parameter(1) + .toMatchTypeOf | undefined>() + }) + + it('createDurably infers labels type from schema', () => { + const labelsSchema = z.object({ + organizationId: z.string(), + env: z.string(), + }) + + // When labels schema is provided, the return type should have Labels + const fn = (opts: { dialect: never; labels: typeof labelsSchema }) => + createDurably(opts) + expectTypeOf(fn).returns.toMatchTypeOf< + Durably, Labels> + >() + }) + }) }) diff --git a/packages/durably/tests/shared/run-api.shared.ts b/packages/durably/tests/shared/run-api.shared.ts index 88c6f044..26f7b118 100644 --- a/packages/durably/tests/shared/run-api.shared.ts +++ b/packages/durably/tests/shared/run-api.shared.ts @@ -507,6 +507,101 @@ export function createRunApiTests(createDialect: () => Dialect) { }) }) + describe('labels schema validation', () => { + it('rejects invalid labels on trigger()', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingInterval: 50, + labels: z.object({ + organizationId: z.string(), + env: z.string(), + }), + }) + await d.migrate() + + const registered = d.register({ + job: defineJob({ + name: 'labels-validation-test', + input: z.object({}), + run: async () => {}, + }), + }) + + await expect( + registered.jobs.job.trigger( + {}, + // @ts-expect-error -- missing 'env' + { labels: { organizationId: 'org_1' } }, + ), + ).rejects.toThrow('labels') + + await d.db.destroy() + }) + + it('rejects invalid labels on batchTrigger()', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingInterval: 50, + labels: z.object({ + organizationId: z.string(), + env: z.string(), + }), + }) + await d.migrate() + + const registered = d.register({ + job: defineJob({ + name: 'labels-batch-validation-test', + input: z.object({}), + run: async () => {}, + }), + }) + + await expect( + registered.jobs.job.batchTrigger([ + { + input: {}, + // @ts-expect-error -- missing 'env' + options: { labels: { organizationId: 'org_1' } }, + }, + ]), + ).rejects.toThrow('labels') + + await d.db.destroy() + }) + + it('accepts valid labels on trigger()', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingInterval: 50, + labels: z.object({ + organizationId: z.string(), + env: z.string(), + }), + }) + await d.migrate() + + const registered = d.register({ + job: defineJob({ + name: 'labels-valid-test', + input: z.object({}), + run: async () => {}, + }), + }) + + const run = await registered.jobs.job.trigger( + {}, + { labels: { organizationId: 'org_1', env: 'prod' } }, + ) + expect(run.labels).toEqual({ + organizationId: 'org_1', + env: 'prod', + }) + + await d.db.destroy() + }) + }) + describe('step.progress()', () => { it('saves progress with current value', async () => { const d = durably.register({ diff --git a/website/api/create-durably.md b/website/api/create-durably.md index 66664bd8..6440a360 100644 --- a/website/api/create-durably.md +++ b/website/api/create-durably.md @@ -5,26 +5,32 @@ Creates a new Durably instance. ## Signature ```ts -function createDurably(options: DurablyOptions): Durably +function createDurably( + options: DurablyOptions, +): Durably<{}, TLabels> ``` ## Options ```ts -interface DurablyOptions { +interface DurablyOptions< + TLabels extends Record = Record, +> { dialect: Dialect pollingInterval?: number heartbeatInterval?: number staleThreshold?: number + labels?: z.ZodType } ``` -| Option | Type | Default | Description | -| ------------------- | --------- | -------- | ----------------------------------------- | -| `dialect` | `Dialect` | required | Kysely SQLite dialect | -| `pollingInterval` | `number` | `1000` | How often to check for pending jobs (ms) | -| `heartbeatInterval` | `number` | `5000` | How often to update heartbeat (ms) | -| `staleThreshold` | `number` | `30000` | Time until a job is considered stale (ms) | +| Option | Type | Default | Description | +| ------------------- | ----------- | -------- | ------------------------------------------------------------------------------------- | +| `dialect` | `Dialect` | required | Kysely SQLite dialect | +| `pollingInterval` | `number` | `1000` | How often to check for pending jobs (ms) | +| `heartbeatInterval` | `number` | `5000` | How often to update heartbeat (ms) | +| `staleThreshold` | `number` | `30000` | Time until a job is considered stale (ms) | +| `labels` | `z.ZodType` | — | Zod schema for labels. Enables type-safe labels and runtime validation on `trigger()` | ## Returns @@ -122,7 +128,7 @@ Deletes a run and its associated steps and logs. ### `getRun()` ```ts -await durably.getRun(runId: string): Promise +await durably.getRun = Run>(runId: string): Promise ``` Gets a single run by ID. Supports generic type parameter for type-safe access. @@ -142,12 +148,12 @@ const typedRun = await durably.getRun(runId) ### `getRuns()` ```ts -await durably.getRuns(filter?: RunFilter): Promise +await durably.getRuns = Run>(filter?: RunFilter): Promise -interface RunFilter { +interface RunFilter = Record> { jobName?: string | string[] // single or multiple job names status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' - labels?: Record + labels?: Partial // filter by labels (all specified must match) limit?: number offset?: number } @@ -175,7 +181,7 @@ const runs = await durably.getRuns({ jobName: 'my-job' }) The `Run` object returned by `getRun()` and `getRuns()`: ```ts -interface Run { +interface Run = Record> { id: string jobName: string input: unknown @@ -187,7 +193,7 @@ interface Run { progress: { current: number; total?: number; message?: string } | null output: unknown | null error: string | null - labels: Record + labels: TLabels heartbeatAt: string startedAt: string | null completedAt: string | null @@ -196,25 +202,25 @@ interface Run { } ``` -| Field | Type | Description | -| ------------------ | ------------------------------------------------------------------ | ---------------------------------------------- | -| `id` | `string` | Unique run ID | -| `jobName` | `string` | Name of the job | -| `input` | `unknown` | Input payload passed to the job | -| `status` | `'pending' \| 'running' \| 'completed' \| 'failed' \| 'cancelled'` | Current run status | -| `idempotencyKey` | `string \| null` | Deduplication key | -| `concurrencyKey` | `string \| null` | Concurrency group key | -| `currentStepIndex` | `number` | Index of the current step being executed | -| `stepCount` | `number` | Total number of completed steps | -| `progress` | `{ current: number; total?: number; message?: string } \| null` | Latest progress report | -| `output` | `unknown \| null` | Return value of the job (when completed) | -| `error` | `string \| null` | Error message (when failed) | -| `labels` | `Record` | Arbitrary key/value labels for filtering | -| `heartbeatAt` | `string` | ISO timestamp of the last heartbeat | -| `startedAt` | `string \| null` | ISO timestamp when the run started | -| `completedAt` | `string \| null` | ISO timestamp when the run completed or failed | -| `createdAt` | `string` | ISO timestamp when the run was created | -| `updatedAt` | `string` | ISO timestamp of the last update | +| Field | Type | Description | +| ------------------ | ------------------------------------------------------------------ | --------------------------------------------------------------- | +| `id` | `string` | Unique run ID | +| `jobName` | `string` | Name of the job | +| `input` | `unknown` | Input payload passed to the job | +| `status` | `'pending' \| 'running' \| 'completed' \| 'failed' \| 'cancelled'` | Current run status | +| `idempotencyKey` | `string \| null` | Deduplication key | +| `concurrencyKey` | `string \| null` | Concurrency group key | +| `currentStepIndex` | `number` | Index of the current step being executed | +| `stepCount` | `number` | Total number of completed steps | +| `progress` | `{ current: number; total?: number; message?: string } \| null` | Latest progress report | +| `output` | `unknown \| null` | Return value of the job (when completed) | +| `error` | `string \| null` | Error message (when failed) | +| `labels` | `TLabels` (defaults to `Record`) | Key/value labels for filtering (type-safe when schema provided) | +| `heartbeatAt` | `string` | ISO timestamp of the last heartbeat | +| `startedAt` | `string \| null` | ISO timestamp when the run started | +| `completedAt` | `string \| null` | ISO timestamp when the run completed or failed | +| `createdAt` | `string` | ISO timestamp when the run was created | +| `updatedAt` | `string` | ISO timestamp of the last update | ### `getJob()` diff --git a/website/api/define-job.md b/website/api/define-job.md index 03473871..7d254604 100644 --- a/website/api/define-job.md +++ b/website/api/define-job.md @@ -76,25 +76,36 @@ Triggers a new job run. #### Trigger Options ```ts -interface TriggerOptions { +interface TriggerOptions> { idempotencyKey?: string concurrencyKey?: string - timeout?: number // For triggerAndWait only + labels?: TLabels +} + +interface TriggerAndWaitOptions< + TLabels = Record, +> extends TriggerOptions { + timeout?: number + onProgress?: (progress: ProgressData) => void | Promise + onLog?: (log: LogData) => void | Promise } ``` -| Option | Description | -| ---------------- | ----------------------------------------- | -| `idempotencyKey` | Prevents duplicate runs with the same key | -| `concurrencyKey` | Groups jobs for concurrency control | -| `timeout` | Timeout in ms for `triggerAndWait()` | +| Option | Description | +| ---------------- | --------------------------------------------------------------- | +| `idempotencyKey` | Prevents duplicate runs with the same key | +| `concurrencyKey` | Groups jobs for concurrency control | +| `labels` | Key/value labels for filtering (type-safe when schema provided) | +| `timeout` | Timeout in ms (`triggerAndWait` only) | +| `onProgress` | Progress callback (`triggerAndWait` only) | +| `onLog` | Log callback (`triggerAndWait` only) | ### `triggerAndWait()` ```ts await job.triggerAndWait( input: TInput, - options?: TriggerOptions + options?: TriggerAndWaitOptions ): Promise<{ id: string; output: TOutput }> ``` diff --git a/website/api/index.md b/website/api/index.md index a1cbac5a..eff7372f 100644 --- a/website/api/index.md +++ b/website/api/index.md @@ -65,6 +65,7 @@ const durably = createDurably({ pollingInterval: 1000, // Check for jobs every 1s heartbeatInterval: 5000, // Heartbeat every 5s staleThreshold: 30000, // Stale after 30s + // labels: z.object({ ... }), // Optional: type-safe labels }).register({ importCsv: importCsvJob, }) @@ -261,20 +262,20 @@ import { toClientRun } from '@coji/durably' Key fields on the `Run` object returned by `getRun()` and `getRuns()`: -| Field | Type | Description | -| ------------- | ------------------------------------------------------------------ | ---------------------------------------------- | -| `id` | `string` | Unique run ID | -| `jobName` | `string` | Name of the job | -| `input` | `unknown` | Input payload passed to the job | -| `status` | `'pending' \| 'running' \| 'completed' \| 'failed' \| 'cancelled'` | Current run status | -| `output` | `unknown \| null` | Return value of the job (when completed) | -| `error` | `string \| null` | Error message (when failed) | -| `progress` | `{ current: number; total?: number; message?: string } \| null` | Latest progress report | -| `labels` | `Record` | Arbitrary key/value labels for filtering | -| `startedAt` | `string \| null` | ISO timestamp when the run started | -| `completedAt` | `string \| null` | ISO timestamp when the run completed or failed | -| `createdAt` | `string` | ISO timestamp when the run was created | -| `updatedAt` | `string` | ISO timestamp of the last update | +| Field | Type | Description | +| ------------- | ------------------------------------------------------------------ | --------------------------------------------------------------- | +| `id` | `string` | Unique run ID | +| `jobName` | `string` | Name of the job | +| `input` | `unknown` | Input payload passed to the job | +| `status` | `'pending' \| 'running' \| 'completed' \| 'failed' \| 'cancelled'` | Current run status | +| `output` | `unknown \| null` | Return value of the job (when completed) | +| `error` | `string \| null` | Error message (when failed) | +| `progress` | `{ current: number; total?: number; message?: string } \| null` | Latest progress report | +| `labels` | `TLabels` (defaults to `Record`) | Key/value labels for filtering (type-safe when schema provided) | +| `startedAt` | `string \| null` | ISO timestamp when the run started | +| `completedAt` | `string \| null` | ISO timestamp when the run completed or failed | +| `createdAt` | `string` | ISO timestamp when the run was created | +| `updatedAt` | `string` | ISO timestamp of the last update | HTTP endpoints (`/runs`, `/run`) return `ClientRun` — the same fields minus `idempotencyKey`, `concurrencyKey`, `heartbeatAt`, and `updatedAt`. Use `toClientRun(run)` to apply the same projection in custom code. diff --git a/website/public/llms.txt b/website/public/llms.txt index 9bceba6f..86491b5e 100644 --- a/website/public/llms.txt +++ b/website/public/llms.txt @@ -24,6 +24,7 @@ pnpm add @coji/durably kysely zod sqlocal import { createDurably } from '@coji/durably' import { LibsqlDialect } from '@libsql/kysely-libsql' import { createClient } from '@libsql/client' +import { z } from 'zod' const client = createClient({ url: 'file:local.db' }) const dialect = new LibsqlDialect({ client }) @@ -33,6 +34,8 @@ const durably = createDurably({ pollingInterval: 1000, // Job polling interval (ms) heartbeatInterval: 5000, // Heartbeat update interval (ms) staleThreshold: 30000, // When to consider a job abandoned (ms) + // Optional: type-safe labels with Zod schema + // labels: z.object({ organizationId: z.string(), env: z.string() }), }) ``` @@ -450,40 +453,64 @@ interface StepContext { } } -interface Run { +// TLabels defaults to Record when no labels schema is provided +interface Run = Record> { id: string jobName: string status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' input: unknown - labels: Record - output?: TOutput - error?: string - progress?: { current: number; total?: number; message?: string } + labels: TLabels + output: unknown | null + error: string | null + progress: { current: number; total?: number; message?: string } | null startedAt: string | null completedAt: string | null createdAt: string updatedAt: string } -interface JobHandle { +interface TypedRun< + TOutput, + TLabels extends Record = Record, +> extends Omit, 'output'> { + output: TOutput | null +} + +interface JobHandle< + TName extends string, + TInput, + TOutput, + TLabels extends Record = Record, +> { name: TName - trigger(input: TInput, options?: TriggerOptions): Promise> + trigger( + input: TInput, + options?: TriggerOptions, + ): Promise> triggerAndWait( input: TInput, - options?: TriggerAndWaitOptions, + options?: TriggerAndWaitOptions, ): Promise<{ id: string; output: TOutput }> - batchTrigger(inputs: BatchTriggerInput[]): Promise[]> - getRun(id: string): Promise | null> - getRuns(filter?: RunFilter): Promise[]> + batchTrigger( + inputs: BatchTriggerInput[], + ): Promise[]> + getRun(id: string): Promise | null> + getRuns( + filter?: Omit, 'jobName'>, + ): Promise[]> } -interface TriggerOptions { +interface TriggerOptions< + TLabels extends Record = Record, +> { idempotencyKey?: string concurrencyKey?: string - labels?: Record + labels?: TLabels } -interface TriggerAndWaitOptions extends TriggerOptions { +interface TriggerAndWaitOptions< + TLabels extends Record = Record, +> extends TriggerOptions { timeout?: number onProgress?: (progress: ProgressData) => void | Promise onLog?: (log: LogData) => void | Promise @@ -502,10 +529,12 @@ interface LogData { stepName?: string | null } -interface RunFilter { +interface RunFilter< + TLabels extends Record = Record, +> { status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled' jobName?: string | string[] - labels?: Record + labels?: Partial limit?: number offset?: number }