Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions packages/durably/docs/llms.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pnpm add @coji/durably kysely zod sqlocal
import { createDurably } from '@coji/durably'
import { LibsqlDialect } from '@libsql/kysely-libsql'
import { createClient } from '@libsql/client'
import { z } from 'zod'

const client = createClient({ url: 'file:local.db' })
const dialect = new LibsqlDialect({ client })
Expand All @@ -33,6 +34,8 @@ const durably = createDurably({
pollingInterval: 1000, // Job polling interval (ms)
heartbeatInterval: 5000, // Heartbeat update interval (ms)
staleThreshold: 30000, // When to consider a job abandoned (ms)
// Optional: type-safe labels with Zod schema
// labels: z.object({ organizationId: z.string(), env: z.string() }),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
})
```

Expand Down Expand Up @@ -450,40 +453,64 @@ interface StepContext {
}
}

interface Run<TOutput = unknown> {
// TLabels defaults to Record<string, string> when no labels schema is provided
interface Run<TLabels extends Record<string, string> = Record<string, string>> {
id: string
jobName: string
status: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
input: unknown
labels: Record<string, string>
output?: TOutput
error?: string
progress?: { current: number; total?: number; message?: string }
labels: TLabels
output: unknown | null
error: string | null
progress: { current: number; total?: number; message?: string } | null
startedAt: string | null
completedAt: string | null
createdAt: string
updatedAt: string
}

interface JobHandle<TName, TInput, TOutput> {
interface TypedRun<
TOutput,
TLabels extends Record<string, string> = Record<string, string>,
> extends Omit<Run<TLabels>, 'output'> {
output: TOutput | null
}

interface JobHandle<
TName extends string,
TInput,
TOutput,
TLabels extends Record<string, string> = Record<string, string>,
> {
name: TName
trigger(input: TInput, options?: TriggerOptions): Promise<Run<TOutput>>
trigger(
input: TInput,
options?: TriggerOptions<TLabels>,
): Promise<TypedRun<TOutput, TLabels>>
triggerAndWait(
input: TInput,
options?: TriggerAndWaitOptions,
options?: TriggerAndWaitOptions<TLabels>,
): Promise<{ id: string; output: TOutput }>
batchTrigger(inputs: BatchTriggerInput<TInput>[]): Promise<Run<TOutput>[]>
getRun(id: string): Promise<Run<TOutput> | null>
getRuns(filter?: RunFilter): Promise<Run<TOutput>[]>
batchTrigger(
inputs: BatchTriggerInput<TInput, TLabels>[],
): Promise<TypedRun<TOutput, TLabels>[]>
Comment thread
coderabbitai[bot] marked this conversation as resolved.
getRun(id: string): Promise<TypedRun<TOutput, TLabels> | null>
getRuns(
filter?: Omit<RunFilter<TLabels>, 'jobName'>,
): Promise<TypedRun<TOutput, TLabels>[]>
}

interface TriggerOptions {
interface TriggerOptions<
TLabels extends Record<string, string> = Record<string, string>,
> {
idempotencyKey?: string
concurrencyKey?: string
labels?: Record<string, string>
labels?: TLabels
}

interface TriggerAndWaitOptions extends TriggerOptions {
interface TriggerAndWaitOptions<
TLabels extends Record<string, string> = Record<string, string>,
> extends TriggerOptions<TLabels> {
timeout?: number
onProgress?: (progress: ProgressData) => void | Promise<void>
onLog?: (log: LogData) => void | Promise<void>
Expand All @@ -502,10 +529,12 @@ interface LogData {
stepName?: string | null
}

interface RunFilter {
interface RunFilter<
TLabels extends Record<string, string> = Record<string, string>,
> {
status?: 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
jobName?: string | string[]
labels?: Record<string, string>
labels?: Partial<TLabels>
limit?: number
offset?: number
}
Expand Down
77 changes: 53 additions & 24 deletions packages/durably/src/durably.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Dialect } from 'kysely'
import { Kysely } from 'kysely'
import type { z } from 'zod'
import type { JobDefinition } from './define-job'
import {
type AnyEventInput,
Expand Down Expand Up @@ -30,11 +31,19 @@ import { type Worker, createWorker } from './worker'
/**
* Options for creating a Durably instance
*/
export interface DurablyOptions {
export interface DurablyOptions<
TLabels extends Record<string, string> = Record<string, string>,
> {
dialect: Dialect
pollingInterval?: number
heartbeatInterval?: number
staleThreshold?: number
/**
* Zod schema for labels. When provided:
* - Labels are type-checked at compile time
* - Labels are validated at runtime on trigger()
*/
labels?: z.ZodType<TLabels>
}

/**
Expand All @@ -52,32 +61,34 @@ const DEFAULTS = {
export interface DurablyPlugin {
name: string
// biome-ignore lint/suspicious/noExplicitAny: plugin needs to accept any Durably instance
install(durably: Durably<any>): void
install(durably: Durably<any, any>): void
}

/**
* Helper type to transform JobDefinition record to JobHandle record
*/
type TransformToHandles<
TJobs extends Record<string, JobDefinition<string, unknown, unknown>>,
TLabels extends Record<string, string> = Record<string, string>,
> = {
[K in keyof TJobs]: TJobs[K] extends JobDefinition<
infer TName,
infer TInput,
infer TOutput
>
? JobHandle<TName & string, TInput, TOutput>
? JobHandle<TName & string, TInput, TOutput, TLabels>
: never
}

/**
* Durably instance with type-safe jobs
*/
export interface Durably<
TJobs extends Record<string, JobHandle<string, unknown, unknown>> = Record<
TJobs extends Record<
string,
never
>,
JobHandle<string, unknown, unknown, Record<string, string>>
> = Record<string, never>,
TLabels extends Record<string, string> = Record<string, string>,
> {
/**
* Registered job handles (type-safe)
Expand Down Expand Up @@ -145,7 +156,7 @@ export interface Durably<
// biome-ignore lint/suspicious/noExplicitAny: flexible type constraint for job definitions
register<TNewJobs extends Record<string, JobDefinition<string, any, any>>>(
jobDefs: TNewJobs,
): Durably<TJobs & TransformToHandles<TNewJobs>>
): Durably<TJobs & TransformToHandles<TNewJobs, TLabels>, TLabels>

/**
* Start the worker polling loop
Expand Down Expand Up @@ -187,7 +198,9 @@ export interface Durably<
* const typedRun = await durably.getRun<MyRun>(runId)
* ```
*/
getRun<T extends Run = Run>(runId: string): Promise<T | null>
getRun<T extends Run<TLabels> = Run<TLabels>>(
runId: string,
): Promise<T | null>

/**
* Get runs with optional filtering
Expand All @@ -201,7 +214,9 @@ export interface Durably<
* const typedRuns = await durably.getRuns<MyRun>({ jobName: 'my-job' })
* ```
*/
getRuns<T extends Run = Run>(filter?: RunFilter): Promise<T[]>
getRuns<T extends Run<TLabels> = Run<TLabels>>(
filter?: RunFilter<TLabels>,
): Promise<T[]>

/**
* Register a plugin
Expand All @@ -214,7 +229,7 @@ export interface Durably<
*/
getJob<TName extends string = string>(
name: TName,
): JobHandle<TName, Record<string, unknown>, unknown> | undefined
): JobHandle<TName, Record<string, unknown>, unknown, TLabels> | undefined

/**
* Subscribe to events for a specific run
Expand All @@ -232,6 +247,7 @@ interface DurablyState {
eventEmitter: EventEmitter
jobRegistry: JobRegistry
worker: Worker
labelsSchema: z.ZodType | undefined
migrating: Promise<void> | null
migrated: boolean
}
Expand All @@ -240,11 +256,15 @@ interface DurablyState {
* Create a Durably instance implementation
*/
function createDurablyInstance<
TJobs extends Record<string, JobHandle<string, unknown, unknown>>,
>(state: DurablyState, jobs: TJobs): Durably<TJobs> {
TJobs extends Record<
string,
JobHandle<string, unknown, unknown, Record<string, string>>
>,
TLabels extends Record<string, string> = Record<string, string>,
>(state: DurablyState, jobs: TJobs): Durably<TJobs, TLabels> {
const { db, storage, eventEmitter, jobRegistry, worker } = state

const durably: Durably<TJobs> = {
const durably: Durably<TJobs, TLabels> = {
db,
storage,
jobs,
Expand All @@ -257,8 +277,8 @@ function createDurablyInstance<
// biome-ignore lint/suspicious/noExplicitAny: flexible type constraint for job definitions
register<TNewJobs extends Record<string, JobDefinition<string, any, any>>>(
jobDefs: TNewJobs,
): Durably<TJobs & TransformToHandles<TNewJobs>> {
const newHandles = {} as TransformToHandles<TNewJobs>
): Durably<TJobs & TransformToHandles<TNewJobs, TLabels>, TLabels> {
const newHandles = {} as TransformToHandles<TNewJobs, TLabels>

for (const key of Object.keys(jobDefs) as (keyof TNewJobs)[]) {
const jobDef = jobDefs[key]
Expand All @@ -267,14 +287,21 @@ function createDurablyInstance<
storage,
eventEmitter,
jobRegistry,
state.labelsSchema as z.ZodType<TLabels> | undefined,
)
newHandles[key] = handle as TransformToHandles<TNewJobs>[typeof key]
newHandles[key] = handle as TransformToHandles<
TNewJobs,
TLabels
>[typeof key]
}

// Create new instance with merged jobs
const mergedJobs = { ...jobs, ...newHandles } as TJobs &
TransformToHandles<TNewJobs>
return createDurablyInstance(state, mergedJobs)
TransformToHandles<TNewJobs, TLabels>
return createDurablyInstance<typeof mergedJobs, TLabels>(
state,
mergedJobs,
)
},

getRun: storage.getRun,
Expand All @@ -286,15 +313,16 @@ function createDurablyInstance<

getJob<TName extends string = string>(
name: TName,
): JobHandle<TName, Record<string, unknown>, unknown> | undefined {
): JobHandle<TName, Record<string, unknown>, unknown, TLabels> | undefined {
const registeredJob = jobRegistry.get(name)
if (!registeredJob) {
return undefined
}
return registeredJob.handle as JobHandle<
TName,
Record<string, unknown>,
unknown
unknown,
TLabels
>
},

Expand Down Expand Up @@ -525,9 +553,9 @@ function createDurablyInstance<
/**
* Create a Durably instance
*/
export function createDurably(
options: DurablyOptions,
): Durably<Record<string, never>> {
export function createDurably<
TLabels extends Record<string, string> = Record<string, string>,
>(options: DurablyOptions<TLabels>): Durably<Record<string, never>, TLabels> {
const config = {
pollingInterval: options.pollingInterval ?? DEFAULTS.pollingInterval,
heartbeatInterval: options.heartbeatInterval ?? DEFAULTS.heartbeatInterval,
Expand All @@ -546,9 +574,10 @@ export function createDurably(
eventEmitter,
jobRegistry,
worker,
labelsSchema: options.labels,
migrating: null,
migrated: false,
}

return createDurablyInstance(state, {})
return createDurablyInstance<Record<string, never>, TLabels>(state, {})
}
2 changes: 2 additions & 0 deletions packages/durably/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ export type {

// Job types
export type {
BatchTriggerInput,
JobHandle,
StepContext,
TriggerAndWaitOptions,
TriggerAndWaitResult,
TriggerOptions,
} from './job'

// Schema types (for advanced users)
Expand Down
Loading