From 8baa97c71927675a769b7f35ca660b4ddd89b1aa Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Mon, 15 Dec 2025 20:13:54 +0100 Subject: [PATCH 1/2] Added force kill on timeout feature --- apps/docs/content/docs/api/job-options.mdx | 4 + .../content/docs/usage/force-kill-timeout.mdx | 139 ++++++ apps/docs/content/docs/usage/job-timeout.mdx | 17 + ...add_force_kill_on_timeout_to_job_queue.sql | 6 + .../dataqueue/src/handler-validation.test.ts | 414 ++++++++++++++++++ packages/dataqueue/src/handler-validation.ts | 168 +++++++ packages/dataqueue/src/index.ts | 4 + packages/dataqueue/src/processor.test.ts | 55 +++ packages/dataqueue/src/processor.ts | 278 +++++++++++- packages/dataqueue/src/queue.ts | 27 +- packages/dataqueue/src/types.ts | 51 +++ 11 files changed, 1137 insertions(+), 26 deletions(-) create mode 100644 apps/docs/content/docs/usage/force-kill-timeout.mdx create mode 100644 packages/dataqueue/migrations/1765809419000_add_force_kill_on_timeout_to_job_queue.sql create mode 100644 packages/dataqueue/src/handler-validation.test.ts create mode 100644 packages/dataqueue/src/handler-validation.ts diff --git a/apps/docs/content/docs/api/job-options.mdx b/apps/docs/content/docs/api/job-options.mdx index 70de6c0..9ff534c 100644 --- a/apps/docs/content/docs/api/job-options.mdx +++ b/apps/docs/content/docs/api/job-options.mdx @@ -16,6 +16,9 @@ The `JobOptions` interface defines the options for creating a new job in the que - `runAt?`: _Date | null_ — When to run the job (default: now). - `timeoutMs?`: _number_ — Timeout for this job in milliseconds. If not set, uses the processor default or unlimited. +- `forceKillOnTimeout?`: _boolean_ — If true, the job will be forcefully terminated (using Worker Threads) when timeout is reached. If false (default), the job will only receive an AbortSignal and must handle the abort gracefully. + + **⚠️ Runtime Requirements**: This option requires **Node.js** and will **not work** in Bun or other runtimes without worker thread support. See [Force Kill on Timeout](/docs/usage/force-kill-timeout) for details. - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. ## Example @@ -28,6 +31,7 @@ const job = { priority: 10, runAt: new Date(Date.now() + 60000), // run in 1 minute timeoutMs: 30000, // 30 seconds + forceKillOnTimeout: false, // Use graceful shutdown (default) tags: ['welcome', 'user'], // tags for grouping/searching }; ``` diff --git a/apps/docs/content/docs/usage/force-kill-timeout.mdx b/apps/docs/content/docs/usage/force-kill-timeout.mdx new file mode 100644 index 0000000..cd28877 --- /dev/null +++ b/apps/docs/content/docs/usage/force-kill-timeout.mdx @@ -0,0 +1,139 @@ +--- +title: Force Kill on Timeout +--- + +When you set `forceKillOnTimeout: true` on a job, the handler will be forcefully terminated (using Worker Threads) when the timeout is reached, rather than just receiving an AbortSignal. + +## Runtime Requirements + +**⚠️ IMPORTANT**: `forceKillOnTimeout` requires **Node.js** and uses the `worker_threads` module. It will **not work** in Bun or other runtimes that don't support Node.js worker threads. + +- ✅ **Node.js**: Fully supported (Node.js v10.5.0+) +- ❌ **Bun**: Not supported - use `forceKillOnTimeout: false` (default) and ensure your handler checks `signal.aborted` + +If you're using Bun or another runtime without worker thread support, use the default graceful shutdown approach (`forceKillOnTimeout: false`) and make sure your handlers check `signal.aborted` to exit gracefully when timed out. + +## Handler Serialization Requirements + +**IMPORTANT**: When using `forceKillOnTimeout`, your handler must be **serializable**. This means the handler function can be converted to a string and executed in a separate worker thread. + +### ✅ Serializable Handlers + +These handlers will work with `forceKillOnTimeout`: + +```typescript +// Standalone function +const handler = async (payload, signal) => { + await doSomething(payload); +}; + +// Function that imports dependencies inside +const handler = async (payload, signal) => { + const { api } = await import('./api'); + await api.call(payload); +}; + +// Function with local variables +const handler = async (payload, signal) => { + const localVar = 'value'; + await process(payload, localVar); +}; +``` + +### ❌ Non-Serializable Handlers + +These handlers will **NOT** work with `forceKillOnTimeout`: + +```typescript +// ❌ Closure over external variable +const db = getDatabase(); +const handler = async (payload, signal) => { + await db.query(payload); // 'db' is captured from closure +}; + +// ❌ Uses 'this' context +class MyHandler { + async handle(payload, signal) { + await this.doSomething(payload); // 'this' won't work + } +} + +// ❌ Closure over imported module +import { someService } from './services'; +const handler = async (payload, signal) => { + await someService.process(payload); // 'someService' is from closure +}; +``` + +## Validating Handler Serialization + +You can validate that your handlers are serializable before using them: + +```typescript +import { + validateHandlerSerializable, + testHandlerSerialization, +} from '@nicnocquee/dataqueue'; + +const handler = async (payload, signal) => { + await doSomething(payload); +}; + +// Quick validation (synchronous) +const result = validateHandlerSerializable(handler, 'myJob'); +if (!result.isSerializable) { + console.error('Handler is not serializable:', result.error); +} + +// Thorough test (asynchronous, actually tries to serialize) +const testResult = await testHandlerSerialization(handler, 'myJob'); +if (!testResult.isSerializable) { + console.error('Handler failed serialization test:', testResult.error); +} +``` + +## Best Practices + +1. **Use standalone functions**: Define handlers as standalone functions, not closures +2. **Import dependencies inside**: If you need external dependencies, import them inside the handler function +3. **Avoid 'this' context**: Don't use class methods as handlers unless they're bound +4. **Test early**: Use `validateHandlerSerializable` during development to catch issues early +5. **When in doubt, use graceful shutdown**: If your handler can't be serialized, use `forceKillOnTimeout: false` (default) and ensure your handler checks `signal.aborted` + +## Example: Converting a Non-Serializable Handler + +**Before** (not serializable): + +```typescript +import { db } from './db'; + +export const jobHandlers = { + processData: async (payload, signal) => { + // ❌ 'db' is captured from closure + await db.query('SELECT * FROM data WHERE id = $1', [payload.id]); + }, +}; +``` + +**After** (serializable): + +```typescript +export const jobHandlers = { + processData: async (payload, signal) => { + // ✅ Import inside the handler + const { db } = await import('./db'); + await db.query('SELECT * FROM data WHERE id = $1', [payload.id]); + }, +}; +``` + +## Runtime Validation + +The library automatically validates handlers when `forceKillOnTimeout` is enabled. If a handler cannot be serialized, you'll get a clear error message: + +``` +Handler for job type "myJob" uses 'this' context which cannot be serialized. +Use a regular function or avoid 'this' references when forceKillOnTimeout is enabled. +``` + +This validation happens when the job is processed, so you'll catch serialization issues early in development. diff --git a/apps/docs/content/docs/usage/job-timeout.mdx b/apps/docs/content/docs/usage/job-timeout.mdx index a80e394..36ab614 100644 --- a/apps/docs/content/docs/usage/job-timeout.mdx +++ b/apps/docs/content/docs/usage/job-timeout.mdx @@ -27,3 +27,20 @@ const handler = async (payload, signal) => { ``` If the job times out, the signal will be aborted and your handler should exit early. If your handler does not check for `signal.aborted`, it will keep running in the background even after the job is marked as failed due to timeout. For best results, always make your handlers abortable if they might run for a long time. + +## Force Kill on Timeout + +If you need to forcefully terminate jobs that don't respond to the abort signal, you can use `forceKillOnTimeout: true`. This will run the handler in a Worker Thread and forcefully terminate it when the timeout is reached. + +**⚠️ Runtime Requirements**: `forceKillOnTimeout` requires **Node.js** and will **not work** in Bun or other runtimes without worker thread support. See [Force Kill on Timeout](/docs/usage/force-kill-timeout) for details. + +**Important**: When using `forceKillOnTimeout`, your handler must be serializable. See [Force Kill on Timeout](/docs/usage/force-kill-timeout) for details. + +```typescript +await queue.addJob({ + jobType: 'longRunningTask', + payload: { data: '...' }, + timeoutMs: 5000, + forceKillOnTimeout: true, // Forcefully terminate if timeout is reached +}); +``` diff --git a/packages/dataqueue/migrations/1765809419000_add_force_kill_on_timeout_to_job_queue.sql b/packages/dataqueue/migrations/1765809419000_add_force_kill_on_timeout_to_job_queue.sql new file mode 100644 index 0000000..26db5fd --- /dev/null +++ b/packages/dataqueue/migrations/1765809419000_add_force_kill_on_timeout_to_job_queue.sql @@ -0,0 +1,6 @@ +-- Up Migration: Add force_kill_on_timeout to job_queue +ALTER TABLE job_queue ADD COLUMN force_kill_on_timeout BOOLEAN DEFAULT FALSE; + +-- Down Migration: Remove force_kill_on_timeout from job_queue +ALTER TABLE job_queue DROP COLUMN IF EXISTS force_kill_on_timeout; + diff --git a/packages/dataqueue/src/handler-validation.test.ts b/packages/dataqueue/src/handler-validation.test.ts new file mode 100644 index 0000000..588814a --- /dev/null +++ b/packages/dataqueue/src/handler-validation.test.ts @@ -0,0 +1,414 @@ +import { describe, expect, it } from 'vitest'; +import { + validateHandlerSerializable, + testHandlerSerialization, +} from './handler-validation.js'; +import { JobHandler } from './types.js'; + +// Define test payload map +interface TestPayloadMap { + simple: { data: string }; + complex: { id: number; name: string }; +} + +describe('validateHandlerSerializable', () => { + it('should validate a simple standalone handler as serializable', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + expect(result.error).toBeUndefined(); + }); + + it('should validate a handler with local variables as serializable', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + const localVar = 'test'; + const anotherVar = 123; + await Promise.resolve(localVar + anotherVar); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + expect(result.error).toBeUndefined(); + }); + + it('should validate a handler that imports dependencies inside as serializable', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + const { default: something } = await import('path'); + await Promise.resolve(); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + expect(result.error).toBeUndefined(); + }); + + it('should reject a handler that uses "this" context', () => { + // Create a handler that uses 'this' by mocking toString to show 'this.' in the body + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + // Mock toString to simulate a handler that uses 'this' + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => + 'async (payload) => { return this.value; }'; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(false); + expect(result.error).toContain("uses 'this' context"); + expect(result.error).toContain('cannot be serialized'); + + // Restore + (handler as any).toString = originalToString; + }); + + it('should reject a handler with native code', () => { + // Create a handler that might contain native code + // This is tricky to test directly, but we can test the detection logic + const handler: JobHandler = async ( + payload, + signal, + ) => { + // Native methods like Array.prototype methods might show as native code + Array.isArray([]); + await Promise.resolve(); + }; + + // Mock toString to return native code indicator + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => '[native code]'; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(false); + expect(result.error).toContain('contains native code'); + expect(result.error).toContain('cannot be serialized'); + + // Restore + (handler as any).toString = originalToString; + }); + + it('should reject a handler that cannot be parsed', () => { + // Create a handler with invalid syntax when stringified + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + // Mock toString to return invalid function code + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => + 'async (payload) => { invalid syntax here !@#$'; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(false); + expect(result.error).toContain('cannot be serialized'); + + // Restore + (handler as any).toString = originalToString; + }); + + it('should warn about potential closures but still mark as serializable', () => { + // This handler has a pattern that might indicate closures + // but the validation can't be 100% sure, so it warns + const handler: JobHandler = async ( + payload, + signal, + ) => { + const local = 'test'; + await Promise.resolve(local); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + // The current implementation might return a warning, but it's still considered serializable + // This test checks the behavior - if it warns, that's OK + if (result.error) { + expect(result.error).toContain('Warning'); + expect(result.error).toContain('may have closures'); + } + }); + + it('should work without jobType parameter', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + const result = validateHandlerSerializable(handler); + expect(result.isSerializable).toBe(true); + }); + + it('should provide helpful error messages with jobType', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + // Mock toString to simulate a handler that uses 'this' + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => + 'async (payload) => { return this.value; }'; + + const result = validateHandlerSerializable(handler, 'myJobType'); + expect(result.isSerializable).toBe(false); + expect(result.error).toContain('myJobType'); + expect(result.error).toContain("uses 'this' context"); + + // Restore + (handler as any).toString = originalToString; + }); + + it('should handle errors during validation gracefully', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + // Mock toString to throw an error + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => { + throw new Error('toString failed'); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(false); + expect(result.error).toContain('Failed to validate handler serialization'); + + // Restore + (handler as any).toString = originalToString; + }); +}); + +describe('testHandlerSerialization', () => { + it('should validate a simple handler as serializable', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + const result = await testHandlerSerialization(handler, 'simple'); + expect(result.isSerializable).toBe(true); + expect(result.error).toBeUndefined(); + }); + + it('should reject a handler that fails basic validation', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + // Mock toString to simulate a handler that uses 'this' + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => + 'async (payload) => { return this.value; }'; + + const result = await testHandlerSerialization(handler, 'simple'); + expect(result.isSerializable).toBe(false); + expect(result.error).toContain("uses 'this' context"); + + // Restore + (handler as any).toString = originalToString; + }); + + it('should handle handlers that complete quickly', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + return Promise.resolve(); + }; + + const result = await testHandlerSerialization(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); + + it('should handle handlers that take time but still validate as serializable', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + // Handler that takes longer than the test timeout (100ms) + await new Promise((resolve) => setTimeout(resolve, 200)); + }; + + const result = await testHandlerSerialization(handler, 'simple'); + // Should still be considered serializable even if it times out during test + expect(result.isSerializable).toBe(true); + }); + + it('should handle handlers that throw errors during execution', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + throw new Error('Handler error'); + }; + + const result = await testHandlerSerialization(handler, 'simple'); + // Execution errors are OK - we just want to know if it can be deserialized + // The handler is still considered serializable + expect(result.isSerializable).toBe(true); + }); + + it('should handle serialization errors', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + // Mock toString to return invalid code + const originalToString = handler.toString.bind(handler); + (handler as any).toString = () => 'invalid function code !@#$'; + + const result = await testHandlerSerialization(handler, 'simple'); + expect(result.isSerializable).toBe(false); + expect(result.error).toBeDefined(); + + // Restore + (handler as any).toString = originalToString; + }); + + it('should work without jobType parameter', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + const result = await testHandlerSerialization(handler); + expect(result.isSerializable).toBe(true); + }); + + it('should handle complex payload types', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + const { id, name } = payload; + await Promise.resolve(`${id}: ${name}`); + }; + + const result = await testHandlerSerialization(handler, 'complex'); + expect(result.isSerializable).toBe(true); + }); + + it('should handle handlers that use signal parameter', async () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + if (signal.aborted) { + return; + } + await Promise.resolve(); + }; + + const result = await testHandlerSerialization(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); +}); + +describe('handler validation edge cases', () => { + it('should handle arrow functions correctly', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + await Promise.resolve(); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); + + it('should handle regular function declarations', () => { + async function handler( + payload: TestPayloadMap['simple'], + signal: AbortSignal, + ): Promise { + await Promise.resolve(); + } + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); + + it('should handle handlers with multiple statements', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + const step1 = 'first'; + const step2 = 'second'; + const step3 = step1 + step2; + await Promise.resolve(step3); + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); + + it('should handle handlers with conditional logic', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + if (signal.aborted) { + return; + } + if (payload.data === 'test') { + await Promise.resolve('matched'); + } else { + await Promise.resolve('not matched'); + } + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); + + it('should handle handlers with try-catch blocks', () => { + const handler: JobHandler = async ( + payload, + signal, + ) => { + try { + await Promise.resolve(); + } catch (error) { + throw error; + } + }; + + const result = validateHandlerSerializable(handler, 'simple'); + expect(result.isSerializable).toBe(true); + }); +}); diff --git a/packages/dataqueue/src/handler-validation.ts b/packages/dataqueue/src/handler-validation.ts new file mode 100644 index 0000000..40679f7 --- /dev/null +++ b/packages/dataqueue/src/handler-validation.ts @@ -0,0 +1,168 @@ +import { JobHandler } from './types.js'; + +/** + * Validates that a job handler can be serialized for use with forceKillOnTimeout. + * + * This function checks if a handler can be safely serialized and executed in a worker thread. + * Use this function during development to catch serialization issues early. + * + * @param handler - The job handler function to validate + * @param jobType - Optional job type name for better error messages + * @returns An object with `isSerializable` boolean and optional `error` message + * + * @example + * ```ts + * const handler = async (payload, signal) => { + * await doSomething(payload); + * }; + * + * const result = validateHandlerSerializable(handler, 'myJob'); + * if (!result.isSerializable) { + * console.error('Handler is not serializable:', result.error); + * } + * ``` + */ +export function validateHandlerSerializable< + PayloadMap, + T extends keyof PayloadMap & string, +>( + handler: JobHandler, + jobType?: string, +): { isSerializable: boolean; error?: string } { + try { + const handlerString = handler.toString(); + const typeLabel = jobType ? `job type "${jobType}"` : 'handler'; + + // Check for common patterns that indicate non-serializable handlers + // 1. Arrow functions that capture 'this' (indicated by 'this' in the function body but not in parameters) + if ( + handlerString.includes('this.') && + !handlerString.match(/\([^)]*this[^)]*\)/) + ) { + return { + isSerializable: false, + error: + `Handler for ${typeLabel} uses 'this' context which cannot be serialized. ` + + `Use a regular function or avoid 'this' references when forceKillOnTimeout is enabled.`, + }; + } + + // 2. Check if handler string looks like it might have closures + // This is a heuristic - we can't perfectly detect closures, but we can warn about common patterns + if (handlerString.includes('[native code]')) { + return { + isSerializable: false, + error: + `Handler for ${typeLabel} contains native code which cannot be serialized. ` + + `Ensure your handler is a plain function when forceKillOnTimeout is enabled.`, + }; + } + + // 3. Try to create a function from the string to validate it's parseable + // This will catch syntax errors early + try { + new Function('return ' + handlerString); + } catch (parseError) { + return { + isSerializable: false, + error: + `Handler for ${typeLabel} cannot be serialized: ${parseError instanceof Error ? parseError.message : String(parseError)}. ` + + `When using forceKillOnTimeout, handlers must be serializable functions without closures over external variables.`, + }; + } + + // 4. Check for common closure patterns (heuristic) + // Look for variable references that might be from outer scope + // This is not perfect but can catch some common issues + const hasPotentialClosure = + /const\s+\w+\s*=\s*[^;]+;\s*async\s*\(/.test(handlerString) || + /let\s+\w+\s*=\s*[^;]+;\s*async\s*\(/.test(handlerString); + + if (hasPotentialClosure) { + // This is just a warning, not a hard error, since we can't be 100% sure + // The actual serialization will fail at runtime if there's a real issue + return { + isSerializable: true, // Still serializable, but might have issues + error: + `Warning: Handler for ${typeLabel} may have closures over external variables. ` + + `Test thoroughly with forceKillOnTimeout enabled. If the handler fails to execute in a worker thread, ` + + `ensure all dependencies are imported within the handler function.`, + }; + } + + return { isSerializable: true }; + } catch (error) { + return { + isSerializable: false, + error: `Failed to validate handler serialization${jobType ? ` for job type "${jobType}"` : ''}: ${error instanceof Error ? error.message : String(error)}`, + }; + } +} + +/** + * Test if a handler can be serialized and executed in a worker thread. + * This is a more thorough check that actually attempts to serialize and deserialize the handler. + * + * @param handler - The job handler function to test + * @param jobType - Optional job type name for better error messages + * @returns Promise that resolves to validation result + * + * @example + * ```ts + * const handler = async (payload, signal) => { + * await doSomething(payload); + * }; + * + * const result = await testHandlerSerialization(handler, 'myJob'); + * if (!result.isSerializable) { + * console.error('Handler failed serialization test:', result.error); + * } + * ``` + */ +export async function testHandlerSerialization< + PayloadMap, + T extends keyof PayloadMap & string, +>( + handler: JobHandler, + jobType?: string, +): Promise<{ isSerializable: boolean; error?: string }> { + // First do the basic validation + const basicValidation = validateHandlerSerializable(handler, jobType); + if (!basicValidation.isSerializable) { + return basicValidation; + } + + // Then try to actually serialize and deserialize in a worker-like context + try { + const handlerString = handler.toString(); + const handlerFn = new Function('return ' + handlerString)(); + + // Try to call it with dummy parameters to see if it executes + // We use a very short timeout to avoid hanging + const testPromise = handlerFn({}, new AbortController().signal); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error('Handler test timeout')), 100), + ); + + try { + await Promise.race([testPromise, timeoutPromise]); + } catch (execError) { + // Execution errors are OK - we just want to know if it can be deserialized + // The actual job execution will handle real errors + if ( + execError instanceof Error && + execError.message === 'Handler test timeout' + ) { + // Handler is taking too long, but that's OK for serialization test + return { isSerializable: true }; + } + } + + return { isSerializable: true }; + } catch (error) { + return { + isSerializable: false, + error: `Handler failed serialization test: ${error instanceof Error ? error.message : String(error)}`, + }; + } +} diff --git a/packages/dataqueue/src/index.ts b/packages/dataqueue/src/index.ts index 9e942d7..5184c8f 100644 --- a/packages/dataqueue/src/index.ts +++ b/packages/dataqueue/src/index.ts @@ -123,3 +123,7 @@ const withLogContext = }; export * from './types.js'; +export { + validateHandlerSerializable, + testHandlerSerialization, +} from './handler-validation.js'; diff --git a/packages/dataqueue/src/processor.test.ts b/packages/dataqueue/src/processor.test.ts index f13edbd..9b05e9d 100644 --- a/packages/dataqueue/src/processor.test.ts +++ b/packages/dataqueue/src/processor.test.ts @@ -475,4 +475,59 @@ describe('per-job timeout', () => { const completed = await queue.getJob(pool, jobId); expect(completed?.status).toBe('completed'); }); + + it('should forcefully terminate job when forceKillOnTimeout is true', async () => { + // Create a handler that ignores the abort signal (simulating a handler that doesn't check signal.aborted) + // Note: We use a real function (not vi.fn) because vi.fn doesn't serialize properly for worker threads + const handler: JobHandler<{ test: {} }, 'test'> = async ( + _payload, + _signal, + ) => { + // This handler will run indefinitely, ignoring the abort signal + await new Promise((resolve) => { + setTimeout(resolve, 1000); // Will never complete in time + }); + }; + const handlers: { test: JobHandler<{ test: {} }, 'test'> } = { + test: handler, + }; + const jobId = await queue.addJob<{ test: {} }, 'test'>(pool, { + jobType: 'test', + payload: {}, + timeoutMs: 50, // 50ms timeout + forceKillOnTimeout: true, // Force kill on timeout + }); + const job = await queue.getJob<{ test: {} }, 'test'>(pool, jobId); + expect(job).not.toBeNull(); + expect(job?.forceKillOnTimeout).toBe(true); + await processJobWithHandlers(pool, job!, handlers); + const failed = await queue.getJob(pool, jobId); + expect(failed?.status).toBe('failed'); + expect(failed?.errorHistory?.[0]?.message).toContain('timed out'); + expect(failed?.failureReason).toBe(FailureReason.Timeout); + }); + + it('should complete job with forceKillOnTimeout if handler finishes before timeout', async () => { + // Note: We use a real function (not vi.fn) because vi.fn doesn't serialize properly for worker threads + const handler: JobHandler<{ test: {} }, 'test'> = async ( + _payload, + _signal, + ) => { + await new Promise((r) => setTimeout(r, 20)); + }; + const handlers: { test: JobHandler<{ test: {} }, 'test'> } = { + test: handler, + }; + const jobId = await queue.addJob<{ test: {} }, 'test'>(pool, { + jobType: 'test', + payload: {}, + timeoutMs: 200, // 200ms + forceKillOnTimeout: true, + }); + const job = await queue.getJob<{ test: {} }, 'test'>(pool, jobId); + expect(job).not.toBeNull(); + await processJobWithHandlers(pool, job!, handlers); + const completed = await queue.getJob(pool, jobId); + expect(completed?.status).toBe('completed'); + }); }); diff --git a/packages/dataqueue/src/processor.ts b/packages/dataqueue/src/processor.ts index 6f2228c..9be8690 100644 --- a/packages/dataqueue/src/processor.ts +++ b/packages/dataqueue/src/processor.ts @@ -1,4 +1,5 @@ import { Pool } from 'pg'; +import { Worker } from 'worker_threads'; import { JobRecord, ProcessorOptions, @@ -16,6 +17,242 @@ import { } from './queue.js'; import { log, setLogContext } from './log-context.js'; +/** + * Validates that a handler can be serialized for worker thread execution. + * Throws an error with helpful message if serialization fails. + */ +function validateHandlerSerializable< + PayloadMap, + T extends keyof PayloadMap & string, +>(handler: JobHandler, jobType: string): void { + try { + const handlerString = handler.toString(); + + // Check for common patterns that indicate non-serializable handlers + // 1. Arrow functions that capture 'this' (indicated by 'this' in the function body but not in parameters) + if ( + handlerString.includes('this.') && + !handlerString.match(/\([^)]*this[^)]*\)/) + ) { + throw new Error( + `Handler for job type "${jobType}" uses 'this' context which cannot be serialized. ` + + `Use a regular function or avoid 'this' references when forceKillOnTimeout is enabled.`, + ); + } + + // 2. Check if handler string looks like it might have closures + // This is a heuristic - we can't perfectly detect closures, but we can warn about common patterns + if (handlerString.includes('[native code]')) { + throw new Error( + `Handler for job type "${jobType}" contains native code which cannot be serialized. ` + + `Ensure your handler is a plain function when forceKillOnTimeout is enabled.`, + ); + } + + // 3. Try to create a function from the string to validate it's parseable + // This will catch syntax errors early + try { + new Function('return ' + handlerString); + } catch (parseError) { + throw new Error( + `Handler for job type "${jobType}" cannot be serialized: ${parseError instanceof Error ? parseError.message : String(parseError)}. ` + + `When using forceKillOnTimeout, handlers must be serializable functions without closures over external variables.`, + ); + } + } catch (error) { + if (error instanceof Error) { + throw error; + } + throw new Error( + `Failed to validate handler serialization for job type "${jobType}": ${String(error)}`, + ); + } +} + +/** + * Run a handler in a worker thread for force-kill capability. + * + * **IMPORTANT**: The handler must be serializable for this to work. This means: + * - The handler should be a standalone function or arrow function + * - It should not capture variables from outer scopes (closures) that reference external dependencies + * - It should not use 'this' context unless it's a bound method + * - All dependencies must be importable in the worker thread context + * + * If your handler doesn't meet these requirements, use the default graceful shutdown + * (forceKillOnTimeout: false) and ensure your handler checks signal.aborted. + * + * @throws {Error} If the handler cannot be serialized + */ +async function runHandlerInWorker< + PayloadMap, + T extends keyof PayloadMap & string, +>( + handler: JobHandler, + payload: PayloadMap[T], + timeoutMs: number, + jobType: string, +): Promise { + // Validate handler can be serialized before attempting to run in worker + validateHandlerSerializable(handler, jobType); + + return new Promise((resolve, reject) => { + // Use inline worker code for better compatibility + // Note: This requires the handler to be serializable (no closures with external dependencies) + // Wrap in IIFE to allow return statements + const workerCode = ` + (function() { + const { parentPort, workerData } = require('worker_threads'); + const { handlerCode, payload, timeoutMs } = workerData; + + // Create an AbortController for the handler + const controller = new AbortController(); + const signal = controller.signal; + + // Set up timeout + const timeoutId = setTimeout(() => { + controller.abort(); + parentPort.postMessage({ type: 'timeout' }); + }, timeoutMs); + + try { + // Execute the handler + // Note: This uses Function constructor which requires the handler to be serializable. + // The handler should be validated before reaching this point. + let handlerFn; + try { + // Wrap handlerCode in parentheses to ensure it's treated as an expression + // This handles both arrow functions and regular functions + const wrappedCode = handlerCode.trim().startsWith('async') || handlerCode.trim().startsWith('function') + ? handlerCode + : '(' + handlerCode + ')'; + handlerFn = new Function('return ' + wrappedCode)(); + } catch (parseError) { + clearTimeout(timeoutId); + parentPort.postMessage({ + type: 'error', + error: { + message: 'Handler cannot be deserialized in worker thread. ' + + 'Ensure your handler is a standalone function without closures over external variables. ' + + 'Original error: ' + (parseError instanceof Error ? parseError.message : String(parseError)), + stack: parseError instanceof Error ? parseError.stack : undefined, + name: 'SerializationError', + }, + }); + return; + } + + // Ensure handlerFn is actually a function + if (typeof handlerFn !== 'function') { + clearTimeout(timeoutId); + parentPort.postMessage({ + type: 'error', + error: { + message: 'Handler deserialization did not produce a function. ' + + 'Ensure your handler is a valid function when forceKillOnTimeout is enabled.', + name: 'SerializationError', + }, + }); + return; + } + + handlerFn(payload, signal) + .then(() => { + clearTimeout(timeoutId); + parentPort.postMessage({ type: 'success' }); + }) + .catch((error) => { + clearTimeout(timeoutId); + parentPort.postMessage({ + type: 'error', + error: { + message: error.message, + stack: error.stack, + name: error.name, + }, + }); + }); + } catch (error) { + clearTimeout(timeoutId); + parentPort.postMessage({ + type: 'error', + error: { + message: error.message, + stack: error.stack, + name: error.name, + }, + }); + } + })(); + `; + + const worker = new Worker(workerCode, { + eval: true, + workerData: { + handlerCode: handler.toString(), + payload, + timeoutMs, + }, + }); + + let resolved = false; + + worker.on('message', (message: { type: string; error?: any }) => { + if (resolved) return; + resolved = true; + + if (message.type === 'success') { + resolve(); + } else if (message.type === 'timeout') { + const timeoutError = new Error( + `Job timed out after ${timeoutMs} ms and was forcefully terminated`, + ); + // @ts-ignore + timeoutError.failureReason = FailureReason.Timeout; + reject(timeoutError); + } else if (message.type === 'error') { + const error = new Error(message.error.message); + error.stack = message.error.stack; + error.name = message.error.name; + reject(error); + } + }); + + worker.on('error', (error) => { + if (resolved) return; + resolved = true; + reject(error); + }); + + worker.on('exit', (code) => { + if (resolved) return; + if (code !== 0) { + resolved = true; + reject(new Error(`Worker stopped with exit code ${code}`)); + } + }); + + // Force terminate worker on timeout + setTimeout(() => { + if (!resolved) { + resolved = true; + worker + .terminate() + .then(() => { + const timeoutError = new Error( + `Job timed out after ${timeoutMs} ms and was forcefully terminated`, + ); + // @ts-ignore + timeoutError.failureReason = FailureReason.Timeout; + reject(timeoutError); + }) + .catch((err) => { + reject(err); + }); + } + }, timeoutMs + 100); // Small buffer to ensure timeout is handled + }); +} + /** * Process a single job using the provided handler map */ @@ -46,27 +283,34 @@ export async function processJobWithHandlers< // Per-job timeout logic const timeoutMs = job.timeoutMs ?? undefined; + const forceKillOnTimeout = job.forceKillOnTimeout ?? false; let timeoutId: NodeJS.Timeout | undefined; const controller = new AbortController(); try { - const jobPromise = handler(job.payload, controller.signal); - if (timeoutMs && timeoutMs > 0) { - await Promise.race([ - jobPromise, - new Promise((_, reject) => { - timeoutId = setTimeout(() => { - controller.abort(); - const timeoutError = new Error( - `Job timed out after ${timeoutMs} ms`, - ); - // @ts-ignore - timeoutError.failureReason = FailureReason.Timeout; - reject(timeoutError); - }, timeoutMs); - }), - ]); + // If forceKillOnTimeout is true, run handler in a worker thread + if (forceKillOnTimeout && timeoutMs && timeoutMs > 0) { + await runHandlerInWorker(handler, job.payload, timeoutMs, job.jobType); } else { - await jobPromise; + // Default: graceful shutdown with AbortController + const jobPromise = handler(job.payload, controller.signal); + if (timeoutMs && timeoutMs > 0) { + await Promise.race([ + jobPromise, + new Promise((_, reject) => { + timeoutId = setTimeout(() => { + controller.abort(); + const timeoutError = new Error( + `Job timed out after ${timeoutMs} ms`, + ); + // @ts-ignore + timeoutError.failureReason = FailureReason.Timeout; + reject(timeoutError); + }, timeoutMs); + }), + ]); + } else { + await jobPromise; + } } if (timeoutId) clearTimeout(timeoutId); await completeJob(pool, job.id); diff --git a/packages/dataqueue/src/queue.ts b/packages/dataqueue/src/queue.ts index f1e045d..87953b1 100644 --- a/packages/dataqueue/src/queue.ts +++ b/packages/dataqueue/src/queue.ts @@ -44,6 +44,7 @@ export const addJob = async ( priority = 0, runAt = null, timeoutMs = undefined, + forceKillOnTimeout = false, tags = undefined, }: JobOptions, ): Promise => { @@ -53,8 +54,8 @@ export const addJob = async ( if (runAt) { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, run_at, timeout_ms, tags) - VALUES ($1, $2, $3, $4, $5, $6, $7) + (job_type, payload, max_attempts, priority, run_at, timeout_ms, force_kill_on_timeout, tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id`, [ jobType, @@ -63,6 +64,7 @@ export const addJob = async ( priority, runAt, timeoutMs ?? null, + forceKillOnTimeout ?? false, tags ?? null, ], ); @@ -72,8 +74,8 @@ export const addJob = async ( } else { result = await client.query( `INSERT INTO job_queue - (job_type, payload, max_attempts, priority, timeout_ms, tags) - VALUES ($1, $2, $3, $4, $5, $6) + (job_type, payload, max_attempts, priority, timeout_ms, force_kill_on_timeout, tags) + VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`, [ jobType, @@ -81,6 +83,7 @@ export const addJob = async ( maxAttempts, priority, timeoutMs ?? null, + forceKillOnTimeout ?? false, tags ?? null, ], ); @@ -112,7 +115,7 @@ export const getJob = async ( const client = await pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue WHERE id = $1`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue WHERE id = $1`, [id], ); @@ -129,6 +132,7 @@ export const getJob = async ( ...job, payload: job.payload, timeoutMs: job.timeoutMs, + forceKillOnTimeout: job.forceKillOnTimeout, failureReason: job.failureReason, }; } catch (error) { @@ -154,7 +158,7 @@ export const getJobsByStatus = async < const client = await pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue WHERE status = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3`, [status, limit, offset], ); @@ -164,6 +168,7 @@ export const getJobsByStatus = async < ...job, payload: job.payload, timeoutMs: job.timeoutMs, + forceKillOnTimeout: job.forceKillOnTimeout, failureReason: job.failureReason, })); } catch (error) { @@ -230,7 +235,7 @@ export const getNextBatch = async < LIMIT $2 FOR UPDATE SKIP LOCKED ) - RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" + RETURNING id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" `, params, ); @@ -249,6 +254,7 @@ export const getNextBatch = async < ...job, payload: job.payload, timeoutMs: job.timeoutMs, + forceKillOnTimeout: job.forceKillOnTimeout, })); } catch (error) { log(`Error getting next batch: ${error}`); @@ -526,7 +532,7 @@ export const getAllJobs = async < const client = await pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue ORDER BY created_at DESC LIMIT $1 OFFSET $2`, [limit, offset], ); log(`Found ${result.rows.length} jobs (all)`); @@ -534,6 +540,7 @@ export const getAllJobs = async < ...job, payload: job.payload, timeoutMs: job.timeoutMs, + forceKillOnTimeout: job.forceKillOnTimeout, })); } catch (error) { log(`Error getting all jobs: ${error}`); @@ -674,6 +681,7 @@ export const getJobsByTags = async < ...job, payload: job.payload, timeoutMs: job.timeoutMs, + forceKillOnTimeout: job.forceKillOnTimeout, failureReason: job.failureReason, })); } catch (error) { @@ -699,7 +707,7 @@ export const getJobs = async ( ): Promise[]> => { const client = await pool.connect(); try { - let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags FROM job_queue`; + let query = `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", force_kill_on_timeout AS "forceKillOnTimeout", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags FROM job_queue`; const params: any[] = []; let where: string[] = []; let paramIdx = 1; @@ -796,6 +804,7 @@ export const getJobs = async ( ...job, payload: job.payload, timeoutMs: job.timeoutMs, + forceKillOnTimeout: job.forceKillOnTimeout, failureReason: job.failureReason, })); } catch (error) { diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index bb1326f..58d7c7d 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -13,6 +13,52 @@ export interface JobOptions> { * Timeout for this job in milliseconds. If not set, uses the processor default or unlimited. */ timeoutMs?: number; + /** + * If true, the job will be forcefully terminated (using Worker Threads) when timeout is reached. + * If false (default), the job will only receive an AbortSignal and must handle the abort gracefully. + * + * **⚠️ RUNTIME REQUIREMENTS**: This option requires **Node.js** and uses the `worker_threads` module. + * It will **not work** in Bun or other runtimes that don't support Node.js worker threads. + * + * **IMPORTANT**: When `forceKillOnTimeout` is true, the handler must be serializable. This means: + * - The handler should be a standalone function (not a closure over external variables) + * - It should not capture variables from outer scopes that reference external dependencies + * - It should not use 'this' context unless it's a bound method + * - All dependencies must be importable in the worker thread context + * + * **Examples of serializable handlers:** + * ```ts + * // ✅ Good - standalone function + * const handler = async (payload, signal) => { + * await doSomething(payload); + * }; + * + * // ✅ Good - function that imports dependencies + * const handler = async (payload, signal) => { + * const { api } = await import('./api'); + * await api.call(payload); + * }; + * + * // ❌ Bad - closure over external variable + * const db = getDatabase(); + * const handler = async (payload, signal) => { + * await db.query(payload); // 'db' is captured from closure + * }; + * + * // ❌ Bad - uses 'this' context + * class MyHandler { + * async handle(payload, signal) { + * await this.doSomething(payload); // 'this' won't work + * } + * } + * ``` + * + * If your handler doesn't meet these requirements, use `forceKillOnTimeout: false` (default) + * and ensure your handler checks `signal.aborted` to exit gracefully. + * + * Note: forceKillOnTimeout requires timeoutMs to be set. + */ + forceKillOnTimeout?: boolean; /** * Tags for this job. Used for grouping, searching, or batch operations. */ @@ -69,6 +115,11 @@ export interface JobRecord> { * Timeout for this job in milliseconds (null means no timeout). */ timeoutMs?: number | null; + /** + * If true, the job will be forcefully terminated (using Worker Threads) when timeout is reached. + * If false (default), the job will only receive an AbortSignal and must handle the abort gracefully. + */ + forceKillOnTimeout?: boolean | null; /** * The reason for the last failure, if any. */ From 733a9fc79cb23a81ce12c21282bbf6d2d720a002 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:12:09 +0100 Subject: [PATCH 2/2] Fix format --- apps/docs/content/docs/api/job-options.mdx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/docs/content/docs/api/job-options.mdx b/apps/docs/content/docs/api/job-options.mdx index 9ff534c..38ac295 100644 --- a/apps/docs/content/docs/api/job-options.mdx +++ b/apps/docs/content/docs/api/job-options.mdx @@ -17,8 +17,9 @@ The `JobOptions` interface defines the options for creating a new job in the que - `timeoutMs?`: _number_ — Timeout for this job in milliseconds. If not set, uses the processor default or unlimited. - `forceKillOnTimeout?`: _boolean_ — If true, the job will be forcefully terminated (using Worker Threads) when timeout is reached. If false (default), the job will only receive an AbortSignal and must handle the abort gracefully. - + **⚠️ Runtime Requirements**: This option requires **Node.js** and will **not work** in Bun or other runtimes without worker thread support. See [Force Kill on Timeout](/docs/usage/force-kill-timeout) for details. + - `tags?`: _string[]_ — Tags for this job. Used for grouping, searching, or batch operations. ## Example