Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/).
- Extracted HTTP response helpers to `http.ts`
- Centralized error handling in `errors.ts`

#### @coji/durably-react

- Internal code organization improvements (no API changes)
- Extracted shared subscription logic to `shared/` directory
- Created `useSubscription` hook for unified state management
- Extracted `useAutoResume` and `useJobSubscription` hooks
- Unified event subscriber patterns between browser and client modes

## [0.6.0] - 2026-01-02

### Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/durably-react/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@coji/durably-react",
"version": "0.6.0",
"version": "0.6.1",
"description": "React bindings for Durably - step-oriented resumable batch execution",
"type": "module",
"main": "./dist/index.js",
Expand Down
32 changes: 1 addition & 31 deletions packages/durably-react/src/client/create-durably-client.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,8 @@
import type { JobDefinition } from '@coji/durably'
import type { InferInput, InferOutput } from '../types'
import { useJob, type UseJobClientResult } from './use-job'
import { useJobLogs, type UseJobLogsClientResult } from './use-job-logs'
import { useJobRun, type UseJobRunClientResult } from './use-job-run'

/**
* Extract input type from a JobDefinition or JobHandle
*/
type InferInput<T> =
T extends JobDefinition<string, infer TInput, unknown>
? TInput extends Record<string, unknown>
? TInput
: Record<string, unknown>
: T extends { trigger: (input: infer TInput) => unknown }
? TInput extends Record<string, unknown>
? TInput
: Record<string, unknown>
: Record<string, unknown>

/**
* Extract output type from a JobDefinition or JobHandle
*/
type InferOutput<T> =
T extends JobDefinition<string, unknown, infer TOutput>
? TOutput extends Record<string, unknown>
? TOutput
: Record<string, unknown>
: T extends {
trigger: (input: unknown) => Promise<{ output?: infer TOutput }>
}
? TOutput extends Record<string, unknown>
? TOutput
: Record<string, unknown>
: Record<string, unknown>

/**
* Type-safe hooks for a specific job
*/
Expand Down
21 changes: 1 addition & 20 deletions packages/durably-react/src/client/create-job-hooks.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,9 @@
import type { JobDefinition } from '@coji/durably'
import type { InferInput, InferOutput } from '../types'
import { useJob, type UseJobClientResult } from './use-job'
import { useJobLogs, type UseJobLogsClientResult } from './use-job-logs'
import { useJobRun, type UseJobRunClientResult } from './use-job-run'

/**
* Extract input type from a JobDefinition
*/
type InferInput<T> =
T extends JobDefinition<string, infer TInput, unknown>
? TInput extends Record<string, unknown>
? TInput
: Record<string, unknown>
: Record<string, unknown>

/**
* Extract output type from a JobDefinition
*/
type InferOutput<T> =
T extends JobDefinition<string, unknown, infer TOutput>
? TOutput extends Record<string, unknown>
? TOutput
: Record<string, unknown>
: Record<string, unknown>

/**
* Options for createJobHooks
*/
Expand Down
165 changes: 22 additions & 143 deletions packages/durably-react/src/client/use-sse-subscription.ts
Original file line number Diff line number Diff line change
@@ -1,158 +1,37 @@
import { useCallback, useEffect, useRef, useState } from 'react'
import type { DurablyEvent, LogEntry, Progress, RunStatus } from '../types'
import { useMemo } from 'react'
import { createSSEEventSubscriber } from '../shared/sse-event-subscriber'
import {
useSubscription,
type UseSubscriptionOptions,
type UseSubscriptionResult,
} from '../shared/use-subscription'
import type { SubscriptionState } from '../types'

export interface SSESubscriptionState<TOutput = unknown> {
status: RunStatus | null
output: TOutput | null
error: string | null
logs: LogEntry[]
progress: Progress | null
}
/** @deprecated Use SubscriptionState from '../types' instead */
export type SSESubscriptionState<TOutput = unknown> = SubscriptionState<TOutput>

export interface UseSSESubscriptionOptions {
/**
* Maximum number of logs to keep (0 = unlimited)
*/
maxLogs?: number
}
/** @deprecated Use UseSubscriptionOptions from '../shared/use-subscription' instead */
export type UseSSESubscriptionOptions = UseSubscriptionOptions

export interface UseSSESubscriptionResult<
TOutput = unknown,
> extends SSESubscriptionState<TOutput> {
/**
* Clear all logs
*/
clearLogs: () => void
/**
* Reset all state
*/
reset: () => void
}
/** @deprecated Use UseSubscriptionResult from '../shared/use-subscription' instead */
export type UseSSESubscriptionResult<TOutput = unknown> =
UseSubscriptionResult<TOutput>

/**
* Internal hook for subscribing to run events via SSE.
* Used by client-mode hooks (useJob, useJobRun, useJobLogs).
*
* @deprecated Consider using useSubscription with createSSEEventSubscriber directly.
*/
export function useSSESubscription<TOutput = unknown>(
api: string | null,
runId: string | null,
options?: UseSSESubscriptionOptions,
): UseSSESubscriptionResult<TOutput> {
const [status, setStatus] = useState<RunStatus | null>(null)
const [output, setOutput] = useState<TOutput | null>(null)
const [error, setError] = useState<string | null>(null)
const [logs, setLogs] = useState<LogEntry[]>([])
const [progress, setProgress] = useState<Progress | null>(null)

const eventSourceRef = useRef<EventSource | null>(null)
const runIdRef = useRef<string | null>(runId)
const prevRunIdRef = useRef<string | null>(null)

const maxLogs = options?.maxLogs ?? 0

// Reset state when runId changes
if (prevRunIdRef.current !== runId) {
prevRunIdRef.current = runId
// Only reset if this isn't the initial render (runIdRef already set)
if (runIdRef.current !== runId) {
setStatus(null)
setOutput(null)
setError(null)
setLogs([])
setProgress(null)
}
}
runIdRef.current = runId

// Subscribe to SSE events
useEffect(() => {
if (!api || !runId) return

const url = `${api}/subscribe?runId=${encodeURIComponent(runId)}`
const eventSource = new EventSource(url)
eventSourceRef.current = eventSource

eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data) as DurablyEvent
if (data.runId !== runIdRef.current) return

switch (data.type) {
case 'run:start':
setStatus('running')
break
case 'run:complete':
setStatus('completed')
setOutput(data.output as TOutput)
break
case 'run:fail':
setStatus('failed')
setError(data.error)
break
case 'run:cancel':
setStatus('cancelled')
break
case 'run:retry':
setStatus('pending')
setError(null)
break
case 'run:progress':
setProgress(data.progress)
break
case 'log:write':
setLogs((prev) => {
const newLog: LogEntry = {
id: crypto.randomUUID(),
runId: data.runId,
stepName: null,
level: data.level,
message: data.message,
data: data.data,
timestamp: new Date().toISOString(),
}
const newLogs = [...prev, newLog]
if (maxLogs > 0 && newLogs.length > maxLogs) {
return newLogs.slice(-maxLogs)
}
return newLogs
})
break
}
} catch {
// Ignore parse errors
}
}

eventSource.onerror = () => {
setError('Connection failed')
eventSource.close()
}

return () => {
eventSource.close()
eventSourceRef.current = null
}
}, [api, runId, maxLogs])

const clearLogs = useCallback(() => {
setLogs([])
}, [])

const reset = useCallback(() => {
setStatus(null)
setOutput(null)
setError(null)
setLogs([])
setProgress(null)
}, [])
const subscriber = useMemo(
() => (api ? createSSEEventSubscriber(api) : null),
[api],
)

return {
status,
output,
error,
logs,
progress,
clearLogs,
reset,
}
return useSubscription<TOutput>(subscriber, runId, options)
}
79 changes: 79 additions & 0 deletions packages/durably-react/src/hooks/use-auto-resume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import type { JobHandle } from '@coji/durably'
import { useEffect } from 'react'
import type { RunStatus } from '../types'

export interface UseAutoResumeOptions {
/**
* Whether to automatically resume tracking pending/running runs
* @default true
*/
enabled?: boolean
/**
* Skip auto-resume if an initial run ID is provided
*/
skipIfInitialRunId?: boolean
/**
* Initial run ID (if provided, auto-resume is skipped)
*/
initialRunId?: string
}

export interface UseAutoResumeCallbacks {
/**
* Called when a run is found to resume
*/
onRunFound: (runId: string, status: RunStatus) => void
}

/**
* Hook that automatically finds and resumes tracking of pending/running runs.
* Extracted from useJob to separate the auto-resume concern.
*/
export function useAutoResume<
TName extends string,
TInput extends Record<string, unknown>,
TOutput,
>(
jobHandle: JobHandle<TName, TInput, TOutput> | null,
options: UseAutoResumeOptions,
callbacks: UseAutoResumeCallbacks,
): void {
const enabled = options.enabled !== false
const skipIfInitialRunId = options.skipIfInitialRunId !== false
const initialRunId = options.initialRunId

useEffect(() => {
if (!jobHandle) return
if (!enabled) return
if (skipIfInitialRunId && initialRunId) return

let cancelled = false

const findActiveRun = async () => {
// First check for running runs
const runningRuns = await jobHandle.getRuns({ status: 'running' })
if (cancelled) return

if (runningRuns.length > 0) {
const run = runningRuns[0]
callbacks.onRunFound(run.id, run.status as RunStatus)
return
}

// Then check for pending runs
const pendingRuns = await jobHandle.getRuns({ status: 'pending' })
if (cancelled) return

if (pendingRuns.length > 0) {
const run = pendingRuns[0]
callbacks.onRunFound(run.id, run.status as RunStatus)
}
}

findActiveRun()

return () => {
cancelled = true
}
}, [jobHandle, enabled, skipIfInitialRunId, initialRunId, callbacks])
}
Loading