diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be03f8a..43f67dbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/). - Extracted HTTP response helpers to `http.ts` - Centralized error handling in `errors.ts` +#### @coji/durably-react + +- Internal code organization improvements (no API changes) + - Extracted shared subscription logic to `shared/` directory + - Created `useSubscription` hook for unified state management + - Extracted `useAutoResume` and `useJobSubscription` hooks + - Unified event subscriber patterns between browser and client modes + ## [0.6.0] - 2026-01-02 ### Breaking Changes diff --git a/packages/durably-react/package.json b/packages/durably-react/package.json index 02e26b3f..570dc3b1 100644 --- a/packages/durably-react/package.json +++ b/packages/durably-react/package.json @@ -1,6 +1,6 @@ { "name": "@coji/durably-react", - "version": "0.6.0", + "version": "0.6.1", "description": "React bindings for Durably - step-oriented resumable batch execution", "type": "module", "main": "./dist/index.js", diff --git a/packages/durably-react/src/client/create-durably-client.ts b/packages/durably-react/src/client/create-durably-client.ts index df2b6665..1eaef017 100644 --- a/packages/durably-react/src/client/create-durably-client.ts +++ b/packages/durably-react/src/client/create-durably-client.ts @@ -1,38 +1,8 @@ -import type { JobDefinition } from '@coji/durably' +import type { InferInput, InferOutput } from '../types' import { useJob, type UseJobClientResult } from './use-job' import { useJobLogs, type UseJobLogsClientResult } from './use-job-logs' import { useJobRun, type UseJobRunClientResult } from './use-job-run' -/** - * Extract input type from a JobDefinition or JobHandle - */ -type InferInput = - T extends JobDefinition - ? TInput extends Record - ? TInput - : Record - : T extends { trigger: (input: infer TInput) => unknown } - ? TInput extends Record - ? TInput - : Record - : Record - -/** - * Extract output type from a JobDefinition or JobHandle - */ -type InferOutput = - T extends JobDefinition - ? TOutput extends Record - ? TOutput - : Record - : T extends { - trigger: (input: unknown) => Promise<{ output?: infer TOutput }> - } - ? TOutput extends Record - ? TOutput - : Record - : Record - /** * Type-safe hooks for a specific job */ diff --git a/packages/durably-react/src/client/create-job-hooks.ts b/packages/durably-react/src/client/create-job-hooks.ts index 772e2050..90d3f091 100644 --- a/packages/durably-react/src/client/create-job-hooks.ts +++ b/packages/durably-react/src/client/create-job-hooks.ts @@ -1,28 +1,9 @@ import type { JobDefinition } from '@coji/durably' +import type { InferInput, InferOutput } from '../types' import { useJob, type UseJobClientResult } from './use-job' import { useJobLogs, type UseJobLogsClientResult } from './use-job-logs' import { useJobRun, type UseJobRunClientResult } from './use-job-run' -/** - * Extract input type from a JobDefinition - */ -type InferInput = - T extends JobDefinition - ? TInput extends Record - ? TInput - : Record - : Record - -/** - * Extract output type from a JobDefinition - */ -type InferOutput = - T extends JobDefinition - ? TOutput extends Record - ? TOutput - : Record - : Record - /** * Options for createJobHooks */ diff --git a/packages/durably-react/src/client/use-sse-subscription.ts b/packages/durably-react/src/client/use-sse-subscription.ts index 48d6b74a..ed3ec762 100644 --- a/packages/durably-react/src/client/use-sse-subscription.ts +++ b/packages/durably-react/src/client/use-sse-subscription.ts @@ -1,158 +1,37 @@ -import { useCallback, useEffect, useRef, useState } from 'react' -import type { DurablyEvent, LogEntry, Progress, RunStatus } from '../types' +import { useMemo } from 'react' +import { createSSEEventSubscriber } from '../shared/sse-event-subscriber' +import { + useSubscription, + type UseSubscriptionOptions, + type UseSubscriptionResult, +} from '../shared/use-subscription' +import type { SubscriptionState } from '../types' -export interface SSESubscriptionState { - status: RunStatus | null - output: TOutput | null - error: string | null - logs: LogEntry[] - progress: Progress | null -} +/** @deprecated Use SubscriptionState from '../types' instead */ +export type SSESubscriptionState = SubscriptionState -export interface UseSSESubscriptionOptions { - /** - * Maximum number of logs to keep (0 = unlimited) - */ - maxLogs?: number -} +/** @deprecated Use UseSubscriptionOptions from '../shared/use-subscription' instead */ +export type UseSSESubscriptionOptions = UseSubscriptionOptions -export interface UseSSESubscriptionResult< - TOutput = unknown, -> extends SSESubscriptionState { - /** - * Clear all logs - */ - clearLogs: () => void - /** - * Reset all state - */ - reset: () => void -} +/** @deprecated Use UseSubscriptionResult from '../shared/use-subscription' instead */ +export type UseSSESubscriptionResult = + UseSubscriptionResult /** * Internal hook for subscribing to run events via SSE. * Used by client-mode hooks (useJob, useJobRun, useJobLogs). + * + * @deprecated Consider using useSubscription with createSSEEventSubscriber directly. */ export function useSSESubscription( api: string | null, runId: string | null, options?: UseSSESubscriptionOptions, ): UseSSESubscriptionResult { - const [status, setStatus] = useState(null) - const [output, setOutput] = useState(null) - const [error, setError] = useState(null) - const [logs, setLogs] = useState([]) - const [progress, setProgress] = useState(null) - - const eventSourceRef = useRef(null) - const runIdRef = useRef(runId) - const prevRunIdRef = useRef(null) - - const maxLogs = options?.maxLogs ?? 0 - - // Reset state when runId changes - if (prevRunIdRef.current !== runId) { - prevRunIdRef.current = runId - // Only reset if this isn't the initial render (runIdRef already set) - if (runIdRef.current !== runId) { - setStatus(null) - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) - } - } - runIdRef.current = runId - - // Subscribe to SSE events - useEffect(() => { - if (!api || !runId) return - - const url = `${api}/subscribe?runId=${encodeURIComponent(runId)}` - const eventSource = new EventSource(url) - eventSourceRef.current = eventSource - - eventSource.onmessage = (event) => { - try { - const data = JSON.parse(event.data) as DurablyEvent - if (data.runId !== runIdRef.current) return - - switch (data.type) { - case 'run:start': - setStatus('running') - break - case 'run:complete': - setStatus('completed') - setOutput(data.output as TOutput) - break - case 'run:fail': - setStatus('failed') - setError(data.error) - break - case 'run:cancel': - setStatus('cancelled') - break - case 'run:retry': - setStatus('pending') - setError(null) - break - case 'run:progress': - setProgress(data.progress) - break - case 'log:write': - setLogs((prev) => { - const newLog: LogEntry = { - id: crypto.randomUUID(), - runId: data.runId, - stepName: null, - level: data.level, - message: data.message, - data: data.data, - timestamp: new Date().toISOString(), - } - const newLogs = [...prev, newLog] - if (maxLogs > 0 && newLogs.length > maxLogs) { - return newLogs.slice(-maxLogs) - } - return newLogs - }) - break - } - } catch { - // Ignore parse errors - } - } - - eventSource.onerror = () => { - setError('Connection failed') - eventSource.close() - } - - return () => { - eventSource.close() - eventSourceRef.current = null - } - }, [api, runId, maxLogs]) - - const clearLogs = useCallback(() => { - setLogs([]) - }, []) - - const reset = useCallback(() => { - setStatus(null) - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) - }, []) + const subscriber = useMemo( + () => (api ? createSSEEventSubscriber(api) : null), + [api], + ) - return { - status, - output, - error, - logs, - progress, - clearLogs, - reset, - } + return useSubscription(subscriber, runId, options) } diff --git a/packages/durably-react/src/hooks/use-auto-resume.ts b/packages/durably-react/src/hooks/use-auto-resume.ts new file mode 100644 index 00000000..84d08943 --- /dev/null +++ b/packages/durably-react/src/hooks/use-auto-resume.ts @@ -0,0 +1,79 @@ +import type { JobHandle } from '@coji/durably' +import { useEffect } from 'react' +import type { RunStatus } from '../types' + +export interface UseAutoResumeOptions { + /** + * Whether to automatically resume tracking pending/running runs + * @default true + */ + enabled?: boolean + /** + * Skip auto-resume if an initial run ID is provided + */ + skipIfInitialRunId?: boolean + /** + * Initial run ID (if provided, auto-resume is skipped) + */ + initialRunId?: string +} + +export interface UseAutoResumeCallbacks { + /** + * Called when a run is found to resume + */ + onRunFound: (runId: string, status: RunStatus) => void +} + +/** + * Hook that automatically finds and resumes tracking of pending/running runs. + * Extracted from useJob to separate the auto-resume concern. + */ +export function useAutoResume< + TName extends string, + TInput extends Record, + TOutput, +>( + jobHandle: JobHandle | null, + options: UseAutoResumeOptions, + callbacks: UseAutoResumeCallbacks, +): void { + const enabled = options.enabled !== false + const skipIfInitialRunId = options.skipIfInitialRunId !== false + const initialRunId = options.initialRunId + + useEffect(() => { + if (!jobHandle) return + if (!enabled) return + if (skipIfInitialRunId && initialRunId) return + + let cancelled = false + + const findActiveRun = async () => { + // First check for running runs + const runningRuns = await jobHandle.getRuns({ status: 'running' }) + if (cancelled) return + + if (runningRuns.length > 0) { + const run = runningRuns[0] + callbacks.onRunFound(run.id, run.status as RunStatus) + return + } + + // Then check for pending runs + const pendingRuns = await jobHandle.getRuns({ status: 'pending' }) + if (cancelled) return + + if (pendingRuns.length > 0) { + const run = pendingRuns[0] + callbacks.onRunFound(run.id, run.status as RunStatus) + } + } + + findActiveRun() + + return () => { + cancelled = true + } + }, [jobHandle, enabled, skipIfInitialRunId, initialRunId, callbacks]) +} diff --git a/packages/durably-react/src/hooks/use-job-subscription.ts b/packages/durably-react/src/hooks/use-job-subscription.ts new file mode 100644 index 00000000..c4c008ae --- /dev/null +++ b/packages/durably-react/src/hooks/use-job-subscription.ts @@ -0,0 +1,213 @@ +import type { Durably } from '@coji/durably' +import { useCallback, useEffect, useReducer, useRef } from 'react' +import { + initialSubscriptionState, + subscriptionReducer, + type SubscriptionAction, +} from '../shared/subscription-reducer' +import type { SubscriptionState } from '../types' + +export interface UseJobSubscriptionOptions { + /** + * Automatically switch to tracking the latest running job when a new run starts. + * @default true + */ + followLatest?: boolean + /** + * Maximum number of logs to keep (0 = unlimited) + */ + maxLogs?: number +} + +export interface UseJobSubscriptionResult< + TOutput = unknown, +> extends SubscriptionState { + /** + * Current run ID being tracked + */ + currentRunId: string | null + /** + * Set the current run ID to track + */ + setCurrentRunId: (runId: string | null) => void + /** + * Clear all logs + */ + clearLogs: () => void + /** + * Reset all state including currentRunId + */ + reset: () => void +} + +// Extended state for job subscription (includes currentRunId) +interface JobSubscriptionState< + TOutput = unknown, +> extends SubscriptionState { + currentRunId: string | null +} + +// Extended actions for job subscription +type JobSubscriptionAction = + | SubscriptionAction + | { type: 'set_run_id'; runId: string | null } + | { + type: 'switch_to_run' + runId: string + } + +function jobSubscriptionReducer( + state: JobSubscriptionState, + action: JobSubscriptionAction, +): JobSubscriptionState { + switch (action.type) { + case 'set_run_id': + return { ...state, currentRunId: action.runId } + + case 'switch_to_run': + // Switch to a new run, resetting state + return { + ...initialSubscriptionState, + currentRunId: action.runId, + status: 'running', + } as JobSubscriptionState + + case 'reset': + return { + ...(initialSubscriptionState as SubscriptionState), + currentRunId: null, + } + + default: + // Delegate to base subscription reducer + return { + ...subscriptionReducer(state, action as SubscriptionAction), + currentRunId: state.currentRunId, + } + } +} + +/** + * Hook for subscribing to job events with followLatest support. + * This is a specialized version of useSubscription for job-level tracking. + */ +export function useJobSubscription( + durably: Durably | null, + jobName: string, + options?: UseJobSubscriptionOptions, +): UseJobSubscriptionResult { + const initialState: JobSubscriptionState = { + ...(initialSubscriptionState as SubscriptionState), + currentRunId: null, + } + + const [state, dispatch] = useReducer( + jobSubscriptionReducer, + initialState, + ) + + const currentRunIdRef = useRef(null) + currentRunIdRef.current = state.currentRunId + + const followLatest = options?.followLatest !== false + const maxLogs = options?.maxLogs ?? 0 + + useEffect(() => { + if (!durably) return + + const unsubscribes: (() => void)[] = [] + + unsubscribes.push( + durably.on('run:start', (event) => { + if (event.jobName !== jobName) return + + if (followLatest) { + // Switch to tracking the new run + dispatch({ type: 'switch_to_run', runId: event.runId }) + currentRunIdRef.current = event.runId + } else { + // Only update if this is our current run + if (event.runId !== currentRunIdRef.current) return + dispatch({ type: 'run:start' }) + } + }), + ) + + unsubscribes.push( + durably.on('run:complete', (event) => { + if (event.runId !== currentRunIdRef.current) return + dispatch({ type: 'run:complete', output: event.output as TOutput }) + }), + ) + + unsubscribes.push( + durably.on('run:fail', (event) => { + if (event.runId !== currentRunIdRef.current) return + dispatch({ type: 'run:fail', error: event.error }) + }), + ) + + unsubscribes.push( + durably.on('run:cancel', (event) => { + if (event.runId !== currentRunIdRef.current) return + dispatch({ type: 'run:cancel' }) + }), + ) + + unsubscribes.push( + durably.on('run:retry', (event) => { + if (event.runId !== currentRunIdRef.current) return + dispatch({ type: 'run:retry' }) + }), + ) + + unsubscribes.push( + durably.on('run:progress', (event) => { + if (event.runId !== currentRunIdRef.current) return + dispatch({ type: 'run:progress', progress: event.progress }) + }), + ) + + unsubscribes.push( + durably.on('log:write', (event) => { + if (event.runId !== currentRunIdRef.current) return + dispatch({ + type: 'log:write', + runId: event.runId, + stepName: event.stepName, + level: event.level, + message: event.message, + data: event.data, + maxLogs, + }) + }), + ) + + return () => { + for (const unsubscribe of unsubscribes) { + unsubscribe() + } + } + }, [durably, jobName, followLatest, maxLogs]) + + const setCurrentRunId = useCallback((runId: string | null) => { + dispatch({ type: 'set_run_id', runId }) + currentRunIdRef.current = runId + }, []) + + const clearLogs = useCallback(() => { + dispatch({ type: 'clear_logs' }) + }, []) + + const reset = useCallback(() => { + dispatch({ type: 'reset' }) + currentRunIdRef.current = null + }, []) + + return { + ...state, + setCurrentRunId, + clearLogs, + reset, + } +} diff --git a/packages/durably-react/src/hooks/use-job.ts b/packages/durably-react/src/hooks/use-job.ts index 8b86c1fa..d7dd05ea 100644 --- a/packages/durably-react/src/hooks/use-job.ts +++ b/packages/durably-react/src/hooks/use-job.ts @@ -1,7 +1,9 @@ import type { JobDefinition, JobHandle } from '@coji/durably' -import { useCallback, useEffect, useRef, useState } from 'react' +import { useCallback, useEffect, useMemo, useRef } from 'react' import { useDurably } from '../context' import type { LogEntry, Progress, RunStatus } from '../types' +import { useAutoResume } from './use-auto-resume' +import { useJobSubscription } from './use-job-subscription' export interface UseJobOptions { /** @@ -93,170 +95,53 @@ export function useJob< ): UseJobResult { const { durably } = useDurably() - const [status, setStatus] = useState(null) - const [output, setOutput] = useState(null) - const [error, setError] = useState(null) - const [logs, setLogs] = useState([]) - const [progress, setProgress] = useState(null) - const [currentRunId, setCurrentRunId] = useState( - options?.initialRunId ?? null, - ) - const jobHandleRef = useRef | null>(null) - // Use ref to track the latest runId for event filtering - const currentRunIdRef = useRef(currentRunId) - currentRunIdRef.current = currentRunId - // Register job and set up event listeners + // Register job useEffect(() => { if (!durably) return - // Register the job (use fixed key for simpler type handling) const d = durably.register({ _job: jobDefinition, }) - const jobHandle = d.jobs._job - jobHandleRef.current = jobHandle - - // Subscribe to each event type separately - const unsubscribes: (() => void)[] = [] - - unsubscribes.push( - durably.on('run:start', (event) => { - // Check if this is a run for our job - if (event.jobName !== jobDefinition.name) return - - // If followLatest is disabled, only update if this is our current run - if (options?.followLatest === false) { - if (event.runId !== currentRunIdRef.current) return - setStatus('running') - return - } - - // Switch to tracking the running job (followLatest: true, default) - setCurrentRunId(event.runId) - currentRunIdRef.current = event.runId - setStatus('running') - // Reset output/error when switching to a new run - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) - }), - ) - - unsubscribes.push( - durably.on('run:complete', (event) => { - if (event.runId !== currentRunIdRef.current) return - setStatus('completed') - setOutput(event.output as TOutput) - }), - ) - - unsubscribes.push( - durably.on('run:fail', (event) => { - if (event.runId !== currentRunIdRef.current) return - setStatus('failed') - setError(event.error) - }), - ) - - unsubscribes.push( - durably.on('run:progress', (event) => { - if (event.runId !== currentRunIdRef.current) return - setProgress(event.progress) - }), - ) - - unsubscribes.push( - durably.on('log:write', (event) => { - if (event.runId !== currentRunIdRef.current) return - setLogs((prev) => [ - ...prev, - { - id: crypto.randomUUID(), - runId: event.runId, - stepName: event.stepName, - level: event.level, - message: event.message, - data: event.data, - timestamp: new Date().toISOString(), - }, - ]) - }), - ) - - // If we have an initialRunId, fetch its current state - if (options?.initialRunId && currentRunIdRef.current) { - jobHandle.getRun(currentRunIdRef.current).then((run) => { - if (run) { - setStatus(run.status as RunStatus) - if (run.status === 'completed' && run.output) { - setOutput(run.output as TOutput) - } - if (run.status === 'failed' && run.error) { - setError(run.error) - } - } - }) - } + jobHandleRef.current = d.jobs._job + }, [durably, jobDefinition]) - // Auto-resume: find any pending or running runs for this job (default: true) - if (options?.autoResume !== false && !options?.initialRunId) { - ;(async () => { - // First check for running runs - const runningRuns = await jobHandle.getRuns({ status: 'running' }) - if (runningRuns.length > 0) { - const run = runningRuns[0] - setCurrentRunId(run.id) - currentRunIdRef.current = run.id - setStatus(run.status as RunStatus) - return - } + // Use the extracted job subscription hook + const subscription = useJobSubscription( + durably, + jobDefinition.name, + { + followLatest: options?.followLatest, + }, + ) - // Then check for pending runs - const pendingRuns = await jobHandle.getRuns({ status: 'pending' }) - if (pendingRuns.length > 0) { - const run = pendingRuns[0] - setCurrentRunId(run.id) - currentRunIdRef.current = run.id - setStatus(run.status as RunStatus) - } - })() - } + // Auto-resume callbacks - stable reference + const autoResumeCallbacks = useMemo( + () => ({ + onRunFound: (runId: string, _status: RunStatus) => { + subscription.setCurrentRunId(runId) + }, + }), + [subscription.setCurrentRunId], + ) - return () => { - for (const unsubscribe of unsubscribes) { - unsubscribe() - } - } - }, [ - durably, - jobDefinition, - options?.initialRunId, - options?.autoResume, - options?.followLatest, - ]) + // Use the extracted auto-resume hook + useAutoResume( + jobHandleRef.current, + { + enabled: options?.autoResume, + initialRunId: options?.initialRunId, + }, + autoResumeCallbacks, + ) - // Update state when currentRunId changes (for initialRunId scenario) + // Handle initialRunId - set it to start tracking useEffect(() => { - if (!durably || !currentRunId) return + if (!durably || !options?.initialRunId) return - const jobHandle = jobHandleRef.current - if (jobHandle && options?.initialRunId) { - jobHandle.getRun(currentRunId).then((run) => { - if (run) { - setStatus(run.status as RunStatus) - if (run.status === 'completed' && run.output) { - setOutput(run.output as TOutput) - } - if (run.status === 'failed' && run.error) { - setError(run.error) - } - } - }) - } - }, [durably, currentRunId, options?.initialRunId]) + subscription.setCurrentRunId(options.initialRunId) + }, [durably, options?.initialRunId, subscription.setCurrentRunId]) const trigger = useCallback( async (input: TInput): Promise<{ runId: string }> => { @@ -265,20 +150,15 @@ export function useJob< throw new Error('Job not ready') } - // Reset state - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) + // Reset state before triggering + subscription.reset() const run = await jobHandle.trigger(input) - setCurrentRunId(run.id) - currentRunIdRef.current = run.id - setStatus('pending') + subscription.setCurrentRunId(run.id) return { runId: run.id } }, - [], + [subscription], ) const triggerAndWait = useCallback( @@ -288,18 +168,13 @@ export function useJob< throw new Error('Job not ready') } - // Reset state - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) + // Reset state before triggering + subscription.reset() const run = await jobHandle.trigger(input) - setCurrentRunId(run.id) - currentRunIdRef.current = run.id - setStatus('pending') + subscription.setCurrentRunId(run.id) - // Wait for completion + // Wait for completion by polling return new Promise((resolve, reject) => { const checkCompletion = async () => { const updatedRun = await jobHandle.getRun(run.id) @@ -322,32 +197,23 @@ export function useJob< checkCompletion() }) }, - [durably], + [durably, subscription], ) - const reset = useCallback(() => { - setStatus(null) - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) - setCurrentRunId(null) - }, []) - return { trigger, triggerAndWait, - status, - output, - error, - logs, - progress, - isRunning: status === 'running', - isPending: status === 'pending', - isCompleted: status === 'completed', - isFailed: status === 'failed', - isCancelled: status === 'cancelled', - currentRunId, - reset, + status: subscription.status, + output: subscription.output, + error: subscription.error, + logs: subscription.logs, + progress: subscription.progress, + isRunning: subscription.status === 'running', + isPending: subscription.status === 'pending', + isCompleted: subscription.status === 'completed', + isFailed: subscription.status === 'failed', + isCancelled: subscription.status === 'cancelled', + currentRunId: subscription.currentRunId, + reset: subscription.reset, } } diff --git a/packages/durably-react/src/hooks/use-run-subscription.ts b/packages/durably-react/src/hooks/use-run-subscription.ts index d5ed8add..bf8472a4 100644 --- a/packages/durably-react/src/hooks/use-run-subscription.ts +++ b/packages/durably-react/src/hooks/use-run-subscription.ts @@ -1,156 +1,38 @@ import type { Durably } from '@coji/durably' -import { useEffect, useRef, useState } from 'react' -import type { LogEntry, Progress, RunStatus } from '../types' +import { useMemo } from 'react' +import { createDurablyEventSubscriber } from '../shared/durably-event-subscriber' +import { + useSubscription, + type UseSubscriptionOptions, + type UseSubscriptionResult, +} from '../shared/use-subscription' +import type { SubscriptionState } from '../types' -export interface RunSubscriptionState { - status: RunStatus | null - output: TOutput | null - error: string | null - logs: LogEntry[] - progress: Progress | null -} +/** @deprecated Use SubscriptionState from '../types' instead */ +export type RunSubscriptionState = SubscriptionState -export interface UseRunSubscriptionOptions { - /** - * Maximum number of logs to keep (0 = unlimited) - */ - maxLogs?: number -} +/** @deprecated Use UseSubscriptionOptions from '../shared/use-subscription' instead */ +export type UseRunSubscriptionOptions = UseSubscriptionOptions -export interface UseRunSubscriptionResult< - TOutput = unknown, -> extends RunSubscriptionState { - /** - * Clear all logs - */ - clearLogs: () => void - /** - * Reset all state - */ - reset: () => void -} +/** @deprecated Use UseSubscriptionResult from '../shared/use-subscription' instead */ +export type UseRunSubscriptionResult = + UseSubscriptionResult /** - * Internal hook for subscribing to run events. + * Internal hook for subscribing to run events via Durably.on(). * Shared by useJob, useJobRun, and useJobLogs. + * + * @deprecated Consider using useSubscription with createDurablyEventSubscriber directly. */ export function useRunSubscription( durably: Durably | null, runId: string | null, options?: UseRunSubscriptionOptions, ): UseRunSubscriptionResult { - const [status, setStatus] = useState(null) - const [output, setOutput] = useState(null) - const [error, setError] = useState(null) - const [logs, setLogs] = useState([]) - const [progress, setProgress] = useState(null) - - // Use ref to track the latest runId for event filtering - const runIdRef = useRef(runId) - runIdRef.current = runId - - const maxLogs = options?.maxLogs ?? 0 - - // Subscribe to events - useEffect(() => { - if (!durably || !runId) return - - const unsubscribes: (() => void)[] = [] - - unsubscribes.push( - durably.on('run:start', (event) => { - if (event.runId !== runIdRef.current) return - setStatus('running') - }), - ) - - unsubscribes.push( - durably.on('run:complete', (event) => { - if (event.runId !== runIdRef.current) return - setStatus('completed') - setOutput(event.output as TOutput) - }), - ) - - unsubscribes.push( - durably.on('run:fail', (event) => { - if (event.runId !== runIdRef.current) return - setStatus('failed') - setError(event.error) - }), - ) - - unsubscribes.push( - durably.on('run:cancel', (event) => { - if (event.runId !== runIdRef.current) return - setStatus('cancelled') - }), - ) - - unsubscribes.push( - durably.on('run:retry', (event) => { - if (event.runId !== runIdRef.current) return - setStatus('pending') - setError(null) - }), - ) - - unsubscribes.push( - durably.on('run:progress', (event) => { - if (event.runId !== runIdRef.current) return - setProgress(event.progress) - }), - ) - - unsubscribes.push( - durably.on('log:write', (event) => { - if (event.runId !== runIdRef.current) return - setLogs((prev) => { - const newLog: LogEntry = { - id: crypto.randomUUID(), - runId: event.runId, - stepName: event.stepName, - level: event.level, - message: event.message, - data: event.data, - timestamp: new Date().toISOString(), - } - const newLogs = [...prev, newLog] - // Apply maxLogs limit if set - if (maxLogs > 0 && newLogs.length > maxLogs) { - return newLogs.slice(-maxLogs) - } - return newLogs - }) - }), - ) - - return () => { - for (const unsubscribe of unsubscribes) { - unsubscribe() - } - } - }, [durably, runId, maxLogs]) - - const clearLogs = () => { - setLogs([]) - } - - const reset = () => { - setStatus(null) - setOutput(null) - setError(null) - setLogs([]) - setProgress(null) - } + const subscriber = useMemo( + () => (durably ? createDurablyEventSubscriber(durably) : null), + [durably], + ) - return { - status, - output, - error, - logs, - progress, - clearLogs, - reset, - } + return useSubscription(subscriber, runId, options) } diff --git a/packages/durably-react/src/shared/create-log-entry.ts b/packages/durably-react/src/shared/create-log-entry.ts new file mode 100644 index 00000000..96683bf3 --- /dev/null +++ b/packages/durably-react/src/shared/create-log-entry.ts @@ -0,0 +1,40 @@ +import type { LogEntry } from '../types' + +export interface CreateLogEntryParams { + runId: string + stepName: string | null + level: 'info' | 'warn' | 'error' + message: string + data: unknown +} + +/** + * Creates a LogEntry with auto-generated id and timestamp. + * Extracted to eliminate duplication between subscription hooks. + */ +export function createLogEntry(params: CreateLogEntryParams): LogEntry { + return { + id: crypto.randomUUID(), + runId: params.runId, + stepName: params.stepName, + level: params.level, + message: params.message, + data: params.data, + timestamp: new Date().toISOString(), + } +} + +/** + * Appends a log entry to the array, respecting maxLogs limit. + */ +export function appendLog( + logs: LogEntry[], + newLog: LogEntry, + maxLogs: number, +): LogEntry[] { + const newLogs = [...logs, newLog] + if (maxLogs > 0 && newLogs.length > maxLogs) { + return newLogs.slice(-maxLogs) + } + return newLogs +} diff --git a/packages/durably-react/src/shared/durably-event-subscriber.ts b/packages/durably-react/src/shared/durably-event-subscriber.ts new file mode 100644 index 00000000..0f27d96e --- /dev/null +++ b/packages/durably-react/src/shared/durably-event-subscriber.ts @@ -0,0 +1,81 @@ +import type { Durably } from '@coji/durably' +import type { EventSubscriber, SubscriptionEvent } from './event-subscriber' + +/** + * EventSubscriber implementation using Durably.on() for direct subscriptions. + * Used in browser environments where Durably instance is available. + */ +export function createDurablyEventSubscriber( + durably: Durably, +): EventSubscriber { + return { + subscribe( + runId: string, + onEvent: (event: SubscriptionEvent) => void, + ): () => void { + const unsubscribes: (() => void)[] = [] + + unsubscribes.push( + durably.on('run:start', (event) => { + if (event.runId !== runId) return + onEvent({ type: 'run:start' }) + }), + ) + + unsubscribes.push( + durably.on('run:complete', (event) => { + if (event.runId !== runId) return + onEvent({ type: 'run:complete', output: event.output as TOutput }) + }), + ) + + unsubscribes.push( + durably.on('run:fail', (event) => { + if (event.runId !== runId) return + onEvent({ type: 'run:fail', error: event.error }) + }), + ) + + unsubscribes.push( + durably.on('run:cancel', (event) => { + if (event.runId !== runId) return + onEvent({ type: 'run:cancel' }) + }), + ) + + unsubscribes.push( + durably.on('run:retry', (event) => { + if (event.runId !== runId) return + onEvent({ type: 'run:retry' }) + }), + ) + + unsubscribes.push( + durably.on('run:progress', (event) => { + if (event.runId !== runId) return + onEvent({ type: 'run:progress', progress: event.progress }) + }), + ) + + unsubscribes.push( + durably.on('log:write', (event) => { + if (event.runId !== runId) return + onEvent({ + type: 'log:write', + runId: event.runId, + stepName: event.stepName, + level: event.level, + message: event.message, + data: event.data, + }) + }), + ) + + return () => { + for (const unsubscribe of unsubscribes) { + unsubscribe() + } + } + }, + } +} diff --git a/packages/durably-react/src/shared/event-subscriber.ts b/packages/durably-react/src/shared/event-subscriber.ts new file mode 100644 index 00000000..16c35308 --- /dev/null +++ b/packages/durably-react/src/shared/event-subscriber.ts @@ -0,0 +1,39 @@ +import type { Progress } from '../types' + +/** + * Common event types emitted by both Durably.on and SSE subscriptions. + * This abstraction allows hooks to work with either event source. + */ +export type SubscriptionEvent = + | { type: 'run:start' } + | { type: 'run:complete'; output: TOutput } + | { type: 'run:fail'; error: string } + | { type: 'run:cancel' } + | { type: 'run:retry' } + | { type: 'run:progress'; progress: Progress } + | { + type: 'log:write' + runId: string + stepName: string | null + level: 'info' | 'warn' | 'error' + message: string + data: unknown + } + | { type: 'connection_error'; error: string } + +/** + * Common interface for subscribing to run events. + * Implemented by both DurablyEventSubscriber and SSEEventSubscriber. + */ +export interface EventSubscriber { + /** + * Subscribe to events for a specific run. + * @param runId The run ID to subscribe to + * @param onEvent Callback for each event + * @returns Cleanup function to unsubscribe + */ + subscribe( + runId: string, + onEvent: (event: SubscriptionEvent) => void, + ): () => void +} diff --git a/packages/durably-react/src/shared/sse-event-subscriber.ts b/packages/durably-react/src/shared/sse-event-subscriber.ts new file mode 100644 index 00000000..51d88b1b --- /dev/null +++ b/packages/durably-react/src/shared/sse-event-subscriber.ts @@ -0,0 +1,70 @@ +import type { DurablyEvent } from '../types' +import type { EventSubscriber, SubscriptionEvent } from './event-subscriber' + +/** + * EventSubscriber implementation using Server-Sent Events (SSE). + * Used in client environments that communicate with a Durably server via HTTP. + */ +export function createSSEEventSubscriber(apiBaseUrl: string): EventSubscriber { + return { + subscribe( + runId: string, + onEvent: (event: SubscriptionEvent) => void, + ): () => void { + const url = `${apiBaseUrl}/subscribe?runId=${encodeURIComponent(runId)}` + const eventSource = new EventSource(url) + + eventSource.onmessage = (messageEvent) => { + try { + const data = JSON.parse(messageEvent.data) as DurablyEvent + if (data.runId !== runId) return + + switch (data.type) { + case 'run:start': + onEvent({ type: 'run:start' }) + break + case 'run:complete': + onEvent({ + type: 'run:complete', + output: data.output as TOutput, + }) + break + case 'run:fail': + onEvent({ type: 'run:fail', error: data.error }) + break + case 'run:cancel': + onEvent({ type: 'run:cancel' }) + break + case 'run:retry': + onEvent({ type: 'run:retry' }) + break + case 'run:progress': + onEvent({ type: 'run:progress', progress: data.progress }) + break + case 'log:write': + onEvent({ + type: 'log:write', + runId: data.runId, + stepName: null, + level: data.level, + message: data.message, + data: data.data, + }) + break + } + } catch { + // Ignore parse errors + } + } + + eventSource.onerror = () => { + onEvent({ type: 'connection_error', error: 'Connection failed' }) + eventSource.close() + } + + return () => { + eventSource.close() + } + }, + } +} diff --git a/packages/durably-react/src/shared/subscription-reducer.ts b/packages/durably-react/src/shared/subscription-reducer.ts new file mode 100644 index 00000000..aed0de4e --- /dev/null +++ b/packages/durably-react/src/shared/subscription-reducer.ts @@ -0,0 +1,83 @@ +import type { Progress, SubscriptionState } from '../types' +import { appendLog, createLogEntry } from './create-log-entry' + +// Action types for subscription state transitions +export type SubscriptionAction = + | { type: 'run:start' } + | { type: 'run:complete'; output: TOutput } + | { type: 'run:fail'; error: string } + | { type: 'run:cancel' } + | { type: 'run:retry' } + | { type: 'run:progress'; progress: Progress } + | { + type: 'log:write' + runId: string + stepName: string | null + level: 'info' | 'warn' | 'error' + message: string + data: unknown + maxLogs: number + } + | { type: 'reset' } + | { type: 'clear_logs' } + | { type: 'connection_error'; error: string } + +export const initialSubscriptionState: SubscriptionState = { + status: null, + output: null, + error: null, + logs: [], + progress: null, +} + +/** + * Pure reducer for subscription state transitions. + * Extracted to eliminate duplication between useRunSubscription and useSSESubscription. + */ +export function subscriptionReducer( + state: SubscriptionState, + action: SubscriptionAction, +): SubscriptionState { + switch (action.type) { + case 'run:start': + return { ...state, status: 'running' } + + case 'run:complete': + return { ...state, status: 'completed', output: action.output } + + case 'run:fail': + return { ...state, status: 'failed', error: action.error } + + case 'run:cancel': + return { ...state, status: 'cancelled' } + + case 'run:retry': + return { ...state, status: 'pending', error: null } + + case 'run:progress': + return { ...state, progress: action.progress } + + case 'log:write': { + const newLog = createLogEntry({ + runId: action.runId, + stepName: action.stepName, + level: action.level, + message: action.message, + data: action.data, + }) + return { ...state, logs: appendLog(state.logs, newLog, action.maxLogs) } + } + + case 'reset': + return initialSubscriptionState as SubscriptionState + + case 'clear_logs': + return { ...state, logs: [] } + + case 'connection_error': + return { ...state, error: action.error } + + default: + return state + } +} diff --git a/packages/durably-react/src/shared/use-subscription.ts b/packages/durably-react/src/shared/use-subscription.ts new file mode 100644 index 00000000..9efbf69b --- /dev/null +++ b/packages/durably-react/src/shared/use-subscription.ts @@ -0,0 +1,112 @@ +import { useCallback, useEffect, useReducer, useRef } from 'react' +import type { SubscriptionState } from '../types' +import type { EventSubscriber } from './event-subscriber' +import { + initialSubscriptionState, + subscriptionReducer, +} from './subscription-reducer' + +export interface UseSubscriptionOptions { + /** + * Maximum number of logs to keep (0 = unlimited) + */ + maxLogs?: number +} + +export interface UseSubscriptionResult< + TOutput = unknown, +> extends SubscriptionState { + /** + * Clear all logs + */ + clearLogs: () => void + /** + * Reset all state + */ + reset: () => void +} + +/** + * Core subscription hook that works with any EventSubscriber implementation. + * This unifies the subscription logic between Durably.on and SSE. + */ +export function useSubscription( + subscriber: EventSubscriber | null, + runId: string | null, + options?: UseSubscriptionOptions, +): UseSubscriptionResult { + const [state, dispatch] = useReducer( + subscriptionReducer, + initialSubscriptionState as SubscriptionState, + ) + + const runIdRef = useRef(runId) + const prevRunIdRef = useRef(null) + + const maxLogs = options?.maxLogs ?? 0 + + // Reset state when runId changes + if (prevRunIdRef.current !== runId) { + prevRunIdRef.current = runId + if (runIdRef.current !== runId) { + dispatch({ type: 'reset' }) + } + } + runIdRef.current = runId + + useEffect(() => { + if (!subscriber || !runId) return + + const unsubscribe = subscriber.subscribe(runId, (event) => { + // Verify runId hasn't changed during async operation + if (runIdRef.current !== runId) return + + switch (event.type) { + case 'run:start': + case 'run:cancel': + case 'run:retry': + dispatch({ type: event.type }) + break + case 'run:complete': + dispatch({ type: 'run:complete', output: event.output }) + break + case 'run:fail': + dispatch({ type: 'run:fail', error: event.error }) + break + case 'run:progress': + dispatch({ type: 'run:progress', progress: event.progress }) + break + case 'log:write': + dispatch({ + type: 'log:write', + runId: event.runId, + stepName: event.stepName, + level: event.level, + message: event.message, + data: event.data, + maxLogs, + }) + break + case 'connection_error': + dispatch({ type: 'connection_error', error: event.error }) + break + } + }) + + return unsubscribe + }, [subscriber, runId, maxLogs]) + + const clearLogs = useCallback(() => { + dispatch({ type: 'clear_logs' }) + }, []) + + const reset = useCallback(() => { + dispatch({ type: 'reset' }) + }, []) + + return { + ...state, + clearLogs, + reset, + } +} diff --git a/packages/durably-react/src/types.ts b/packages/durably-react/src/types.ts index d1f7ce16..5a396c4e 100644 --- a/packages/durably-react/src/types.ts +++ b/packages/durably-react/src/types.ts @@ -1,5 +1,32 @@ // Shared type definitions for @coji/durably-react +import type { JobDefinition } from '@coji/durably' + +// Type inference utilities for extracting Input/Output types from JobDefinition +export type InferInput = + T extends JobDefinition + ? TInput extends Record + ? TInput + : Record + : T extends { trigger: (input: infer TInput) => unknown } + ? TInput extends Record + ? TInput + : Record + : Record + +export type InferOutput = + T extends JobDefinition + ? TOutput extends Record + ? TOutput + : Record + : T extends { + trigger: (input: unknown) => Promise<{ output?: infer TOutput }> + } + ? TOutput extends Record + ? TOutput + : Record + : Record + export type RunStatus = | 'pending' | 'running' @@ -23,6 +50,15 @@ export interface LogEntry { timestamp: string } +// Shared subscription state (used by both direct and SSE subscriptions) +export interface SubscriptionState { + status: RunStatus | null + output: TOutput | null + error: string | null + logs: LogEntry[] + progress: Progress | null +} + // SSE event types (sent from server) export type DurablyEvent = | { type: 'run:start'; runId: string; jobName: string; payload: unknown }