From d52bb991bb4c80220860bb26bbf3c3f472a2a1c7 Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Mon, 15 Dec 2025 20:30:22 +0100 Subject: [PATCH 1/3] Add editJob functionality and update documentation - Introduced `editJob` method to allow editing of pending jobs with optional fields. - Updated `EditJobOptions` type to define editable fields. - Enhanced job event tracking by recording an 'edited' event with metadata. - Added comprehensive documentation for `editJob` usage and examples. - Updated existing tests to cover editing functionality and ensure correct behavior for pending and non-pending jobs. --- apps/docs/content/docs/api/job-queue.mdx | 48 ++++++ apps/docs/content/docs/usage/edit-jobs.mdx | 175 ++++++++++++++++++++ apps/docs/content/docs/usage/meta.json | 1 + packages/dataqueue/src/index.test.ts | 138 ++++++++++++++++ packages/dataqueue/src/index.ts | 9 ++ packages/dataqueue/src/queue.test.ts | 179 +++++++++++++++++++++ packages/dataqueue/src/queue.ts | 94 ++++++++++- packages/dataqueue/src/types.ts | 21 +++ 8 files changed, 664 insertions(+), 1 deletion(-) create mode 100644 apps/docs/content/docs/usage/edit-jobs.mdx diff --git a/apps/docs/content/docs/api/job-queue.mdx b/apps/docs/content/docs/api/job-queue.mdx index 7f1573e..08e5f6a 100644 --- a/apps/docs/content/docs/api/job-queue.mdx +++ b/apps/docs/content/docs/api/job-queue.mdx @@ -146,6 +146,53 @@ cancelJob(jobId: number): Promise Cancels a job given its ID. +### editJob + +```ts +editJob(jobId: number, updates: EditJobOptions): Promise +``` + +Edits a pending job given its ID. Only works for jobs with status 'pending'. Silently fails for other statuses (processing, completed, failed, cancelled). + +#### EditJobOptions + +```ts +interface EditJobOptions { + payload?: any; + maxAttempts?: number; + priority?: number; + runAt?: Date | null; + timeoutMs?: number; + tags?: string[]; +} +``` + +All fields are optional - only provided fields will be updated. Note that `jobType` cannot be changed. + +#### Example + +```ts +// Edit a pending job's payload and priority +await jobQueue.editJob(jobId, { + payload: { to: 'newemail@example.com', subject: 'Updated' }, + priority: 10, +}); + +// Edit only the scheduled run time +await jobQueue.editJob(jobId, { + runAt: new Date(Date.now() + 60000), // Run in 1 minute +}); + +// Edit multiple fields at once +await jobQueue.editJob(jobId, { + payload: { to: 'updated@example.com' }, + priority: 5, + maxAttempts: 10, + timeoutMs: 30000, + tags: ['urgent', 'priority'], +}); +``` + ### cancelAllUpcomingJobs ```ts @@ -209,6 +256,7 @@ enum JobEventType { Failed = 'failed', Cancelled = 'cancelled', Retried = 'retried', + Edited = 'edited', } ``` diff --git a/apps/docs/content/docs/usage/edit-jobs.mdx b/apps/docs/content/docs/usage/edit-jobs.mdx new file mode 100644 index 0000000..dc1ca63 --- /dev/null +++ b/apps/docs/content/docs/usage/edit-jobs.mdx @@ -0,0 +1,175 @@ +--- +title: Edit Jobs +--- + +You can edit a pending job by its ID to update its properties before it is processed. Only jobs with status 'pending' can be edited. Attempting to edit a job with any other status (processing, completed, failed, cancelled) will silently fail. + +## Basic Usage + +```typescript title="@/app/api/edit-job/route.ts" +import { NextRequest, NextResponse } from 'next/server'; +import { getJobQueue } from '@/lib/queue'; + +export async function POST(request: NextRequest) { + try { + const { jobId, updates } = await request.json(); + // [!code highlight:2] + const jobQueue = getJobQueue(); + await jobQueue.editJob(jobId, updates); + return NextResponse.json({ message: 'Job updated' }); + } catch (error) { + console.error('Error editing job:', error); + return NextResponse.json( + { message: 'Failed to edit job' }, + { status: 500 }, + ); + } +} +``` + +## Editable Fields + +All fields in `EditJobOptions` are optional - only the fields you provide will be updated. The following fields can be edited: + +- `payload` - The job payload data +- `priority` - Job priority (higher runs first) +- `maxAttempts` - Maximum number of attempts +- `runAt` - When to run the job (Date or null) +- `timeoutMs` - Timeout for the job in milliseconds +- `tags` - Tags for grouping, searching, or batch operations + +**Note:** `jobType` cannot be changed. If you need to change the job type, you should cancel the job and create a new one. + +## Examples + +### Edit Payload + +```typescript +// Update the payload of a pending job +await jobQueue.editJob(jobId, { + payload: { to: 'newemail@example.com', subject: 'Updated Subject' }, +}); +``` + +### Edit Priority + +```typescript +// Increase the priority of a job +await jobQueue.editJob(jobId, { + priority: 10, +}); +``` + +### Edit Scheduled Time + +```typescript +// Reschedule a job to run in 1 hour +await jobQueue.editJob(jobId, { + runAt: new Date(Date.now() + 60 * 60 * 1000), +}); + +// Schedule a job to run immediately (or as soon as possible) +await jobQueue.editJob(jobId, { + runAt: null, +}); +``` + +### Edit Multiple Fields + +```typescript +// Update multiple fields at once +await jobQueue.editJob(jobId, { + payload: { to: 'updated@example.com', subject: 'New Subject' }, + priority: 5, + maxAttempts: 10, + timeoutMs: 30000, + tags: ['urgent', 'priority'], +}); +``` + +### Partial Updates + +```typescript +// Only update what you need - other fields remain unchanged +await jobQueue.editJob(jobId, { + priority: 10, + // payload, maxAttempts, runAt, timeoutMs, and tags remain unchanged +}); +``` + +### Clear Tags or Timeout + +```typescript +// Remove tags by setting to undefined +await jobQueue.editJob(jobId, { + tags: undefined, +}); + +// Remove timeout by setting to undefined +await jobQueue.editJob(jobId, { + timeoutMs: undefined, +}); +``` + +## When to Use Edit vs Cancel vs Retry + +- **Edit**: Use when you want to modify a pending job's properties before it runs +- **Cancel**: Use when you want to completely remove a pending job from the queue +- **Retry**: Use when you want to retry a failed job (sets status back to pending) + +## Error Handling + +The `editJob` function silently fails if you try to edit a non-pending job. This means: + +- No error is thrown +- The job remains unchanged +- The operation completes successfully (but does nothing) + +To check if an edit was successful, you can: + +```typescript +const job = await jobQueue.getJob(jobId); +if (job?.status === 'pending') { + // Job is still pending, edit might have succeeded + // Check if the fields you wanted to update actually changed + if (job.priority === newPriority) { + console.log('Edit successful'); + } +} else { + console.log('Job is not pending, edit was ignored'); +} +``` + +## Event Tracking + +When a job is edited, an 'edited' event is recorded in the job's event history. The event metadata contains the fields that were updated: + +```typescript +const events = await jobQueue.getJobEvents(jobId); +const editEvent = events.find((e) => e.eventType === 'edited'); +if (editEvent) { + console.log('Updated fields:', editEvent.metadata); + // { payload: {...}, priority: 10, ... } +} +``` + +## Best Practices + +1. **Check job status before editing**: If you're unsure whether a job is pending, check its status first: + +```typescript +const job = await jobQueue.getJob(jobId); +if (job?.status === 'pending') { + await jobQueue.editJob(jobId, updates); +} else { + console.log('Job is not pending, cannot edit'); +} +``` + +2. **Use partial updates**: Only update the fields you need to change. This is more efficient and reduces the chance of accidentally overwriting other fields. + +3. **Validate updates**: Ensure the updated values are valid for your job handlers. For example, if your handler expects a specific payload structure, make sure the updated payload matches. + +4. **Consider race conditions**: If a job might be picked up for processing while you're editing it, be aware that the edit might not take effect if the job transitions to 'processing' status between your check and the edit operation. + +5. **Monitor events**: Use job events to track when and what was edited for audit purposes. diff --git a/apps/docs/content/docs/usage/meta.json b/apps/docs/content/docs/usage/meta.json index 841b91a..76ae8ef 100644 --- a/apps/docs/content/docs/usage/meta.json +++ b/apps/docs/content/docs/usage/meta.json @@ -13,6 +13,7 @@ "reclaim-jobs", "cleanup-jobs", "cancel-jobs", + "edit-jobs", "job-events", "get-jobs" ], diff --git a/packages/dataqueue/src/index.test.ts b/packages/dataqueue/src/index.test.ts index 4f3f7cb..d38ee22 100644 --- a/packages/dataqueue/src/index.test.ts +++ b/packages/dataqueue/src/index.test.ts @@ -302,4 +302,142 @@ describe('index integration', () => { expect(job1?.status).toBe('cancelled'); expect(job2?.status).toBe('pending'); }); + + it('should edit a pending job via JobQueue API', async () => { + const jobId = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'original@example.com' }, + priority: 0, + maxAttempts: 3, + }); + + await jobQueue.editJob(jobId, { + payload: { to: 'updated@example.com' }, + priority: 10, + maxAttempts: 5, + }); + + const job = await jobQueue.getJob(jobId); + expect(job?.payload).toEqual({ to: 'updated@example.com' }); + expect(job?.priority).toBe(10); + expect(job?.maxAttempts).toBe(5); + expect(job?.status).toBe('pending'); + }); + + it('should edit a job and then process it correctly', async () => { + const handler = vi.fn(async (payload: { to: string }, _signal) => { + expect(payload.to).toBe('updated@example.com'); + }); + const jobId = await jobQueue.addJob({ + jobType: 'test', + payload: { to: 'original@example.com' }, + }); + + // Edit the job before processing + await jobQueue.editJob(jobId, { + payload: { to: 'updated@example.com' }, + }); + + const processor = jobQueue.createProcessor( + { + email: vi.fn(async () => {}), + sms: vi.fn(async () => {}), + test: handler, + }, + { pollInterval: 100 }, + ); + processor.start(); + await new Promise((r) => setTimeout(r, 300)); + processor.stop(); + + expect(handler).toHaveBeenCalledWith( + { to: 'updated@example.com' }, + expect.any(Object), + ); + const job = await jobQueue.getJob(jobId); + expect(job?.status).toBe('completed'); + }); + + it('should silently fail when editing non-pending jobs', async () => { + // Try to edit a completed job + const jobId1 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'original@example.com' }, + }); + const processor = jobQueue.createProcessor( + { + email: vi.fn(async () => {}), + }, + { pollInterval: 100 }, + ); + processor.start(); + await new Promise((r) => setTimeout(r, 300)); + processor.stop(); + + const originalJob = await jobQueue.getJob(jobId1); + expect(originalJob?.status).toBe('completed'); + + await jobQueue.editJob(jobId1, { + payload: { to: 'updated@example.com' }, + }); + + const job = await jobQueue.getJob(jobId1); + expect(job?.status).toBe('completed'); + expect(job?.payload).toEqual({ to: 'original@example.com' }); + + // Try to edit a processing job + // Use a handler that takes longer to ensure job stays in processing state + const slowHandler = vi.fn( + async (payload: { to: string }, _signal) => { + await new Promise((r) => setTimeout(r, 200)); + }, + ); + const processor2 = jobQueue.createProcessor( + { + email: slowHandler, + }, + { pollInterval: 100 }, + ); + const jobId2 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'processing@example.com' }, + }); + processor2.start(); + // Wait a bit for job to be picked up + await new Promise((r) => setTimeout(r, 150)); + // Job should be processing now + const processingJob = await jobQueue.getJob(jobId2); + if (processingJob?.status === 'processing') { + await jobQueue.editJob(jobId2, { + payload: { to: 'updated@example.com' }, + }); + + const job2 = await jobQueue.getJob(jobId2); + // If still processing, payload should be unchanged + if (job2?.status === 'processing') { + expect(job2?.payload).toEqual({ to: 'processing@example.com' }); + } + } + processor2.stop(); + }); + + it('should record edited event when editing via JobQueue API', async () => { + const jobId = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'original@example.com' }, + }); + + await jobQueue.editJob(jobId, { + payload: { to: 'updated@example.com' }, + priority: 10, + }); + + const events = await jobQueue.getJobEvents(jobId); + const editEvent = events.find((e) => e.eventType === 'edited'); + expect(editEvent).not.toBeUndefined(); + expect(editEvent?.metadata).toMatchObject({ + payload: { to: 'updated@example.com' }, + priority: 10, + }); + }); }); diff --git a/packages/dataqueue/src/index.ts b/packages/dataqueue/src/index.ts index 9e942d7..33e2bfc 100644 --- a/packages/dataqueue/src/index.ts +++ b/packages/dataqueue/src/index.ts @@ -11,6 +11,7 @@ import { getJobEvents, getJobsByTags, getJobs, + editJob, } from './queue.js'; import { createProcessor } from './processor.js'; import { @@ -19,6 +20,7 @@ import { JobOptions, ProcessorOptions, JobHandlers, + JobType, } from './types.js'; import { setLogContext } from './log-context.js'; import { createPool } from './db-util.js'; @@ -78,6 +80,13 @@ export const initJobQueue = ( (jobId: number) => cancelJob(pool, jobId), config.verbose ?? false, ), + editJob: withLogContext( + >( + jobId: number, + updates: import('./types.js').EditJobOptions, + ) => editJob(pool, jobId, updates as any), + config.verbose ?? false, + ), cancelAllUpcomingJobs: withLogContext( (filters?: { jobType?: string; diff --git a/packages/dataqueue/src/queue.test.ts b/packages/dataqueue/src/queue.test.ts index b1e615e..0e86fc6 100644 --- a/packages/dataqueue/src/queue.test.ts +++ b/packages/dataqueue/src/queue.test.ts @@ -160,6 +160,185 @@ describe('queue integration', () => { expect(completedJob?.status).toBe('completed'); }); + it('should edit a pending job with all fields', async () => { + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + priority: 0, + maxAttempts: 3, + timeoutMs: 10000, + tags: ['original'], + }); + const originalJob = await queue.getJob(pool, jobId); + const originalUpdatedAt = originalJob?.updatedAt; + + // Wait a bit to ensure updated_at changes + await new Promise((r) => setTimeout(r, 10)); + + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + payload: { to: 'updated@example.com' }, + priority: 10, + maxAttempts: 5, + runAt: new Date(Date.now() + 60000), + timeoutMs: 20000, + tags: ['updated', 'priority'], + }); + + const updatedJob = await queue.getJob(pool, jobId); + expect(updatedJob?.payload).toEqual({ to: 'updated@example.com' }); + expect(updatedJob?.priority).toBe(10); + expect(updatedJob?.maxAttempts).toBe(5); + expect(updatedJob?.timeoutMs).toBe(20000); + expect(updatedJob?.tags).toEqual(['updated', 'priority']); + expect(updatedJob?.status).toBe('pending'); + expect(updatedJob?.updatedAt.getTime()).toBeGreaterThan( + originalUpdatedAt?.getTime() || 0, + ); + }); + + it('should edit a pending job with partial fields', async () => { + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + priority: 0, + maxAttempts: 3, + }); + + // Only update payload + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + payload: { to: 'updated@example.com' }, + }); + + const updatedJob = await queue.getJob(pool, jobId); + expect(updatedJob?.payload).toEqual({ to: 'updated@example.com' }); + expect(updatedJob?.priority).toBe(0); // Unchanged + expect(updatedJob?.maxAttempts).toBe(3); // Unchanged + + // Only update priority + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + priority: 5, + }); + + const updatedJob2 = await queue.getJob(pool, jobId); + expect(updatedJob2?.payload).toEqual({ to: 'updated@example.com' }); // Still updated + expect(updatedJob2?.priority).toBe(5); // Now updated + }); + + it('should silently fail when editing a non-pending job', async () => { + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + }); + await queue.completeJob(pool, jobId); + + // Try to edit a completed job - should silently fail + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + payload: { to: 'updated@example.com' }, + }); + + const job = await queue.getJob(pool, jobId); + expect(job?.status).toBe('completed'); + expect(job?.payload).toEqual({ to: 'original@example.com' }); // Unchanged + + // Try to edit a processing job + const jobId2 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'processing@example.com' }, + }, + ); + await queue.getNextBatch(pool, 'worker-edit', 1); + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId2, { + payload: { to: 'updated@example.com' }, + }); + + const job2 = await queue.getJob(pool, jobId2); + expect(job2?.status).toBe('processing'); + expect(job2?.payload).toEqual({ to: 'processing@example.com' }); // Unchanged + }); + + it('should record edited event when editing a job', async () => { + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + }); + + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + payload: { to: 'updated@example.com' }, + priority: 10, + }); + + const res = await pool.query( + 'SELECT * FROM job_events WHERE job_id = $1 ORDER BY created_at ASC', + [jobId], + ); + const events = res.rows.map((row) => objectKeysToCamelCase(row) as JobEvent); + const editEvent = events.find((e) => e.eventType === JobEventType.Edited); + expect(editEvent).not.toBeUndefined(); + expect(editEvent?.metadata).toMatchObject({ + payload: { to: 'updated@example.com' }, + priority: 10, + }); + }); + + it('should update updated_at timestamp when editing', async () => { + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + }); + const originalJob = await queue.getJob(pool, jobId); + const originalUpdatedAt = originalJob?.updatedAt; + + // Wait a bit to ensure timestamp difference + await new Promise((r) => setTimeout(r, 10)); + + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + priority: 5, + }); + + const updatedJob = await queue.getJob(pool, jobId); + expect(updatedJob?.updatedAt.getTime()).toBeGreaterThan( + originalUpdatedAt?.getTime() || 0, + ); + }); + + it('should handle editing with null values', async () => { + const futureDate = new Date(Date.now() + 60000); + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + runAt: futureDate, + timeoutMs: 10000, + tags: ['original'], + }); + + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, { + runAt: null, + timeoutMs: null, + tags: null, + }); + + const updatedJob = await queue.getJob(pool, jobId); + expect(updatedJob?.runAt).not.toBeNull(); // runAt null means use default (now) + expect(updatedJob?.timeoutMs).toBeNull(); + expect(updatedJob?.tags).toBeNull(); + }); + + it('should do nothing when editing with no fields', async () => { + const jobId = await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'original@example.com' }, + }); + const originalJob = await queue.getJob(pool, jobId); + + await queue.editJob<{ email: { to: string } }, 'email'>(pool, jobId, {}); + + const job = await queue.getJob(pool, jobId); + expect(job?.payload).toEqual(originalJob?.payload); + expect(job?.priority).toBe(originalJob?.priority); + }); + it('should cancel all upcoming jobs', async () => { // Add three pending jobs const jobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( diff --git a/packages/dataqueue/src/queue.ts b/packages/dataqueue/src/queue.ts index f1e045d..7cf9a44 100644 --- a/packages/dataqueue/src/queue.ts +++ b/packages/dataqueue/src/queue.ts @@ -112,7 +112,7 @@ export const getJob = async ( const client = await pool.connect(); try { const result = await client.query( - `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason" FROM job_queue WHERE id = $1`, + `SELECT id, job_type AS "jobType", payload, status, max_attempts AS "maxAttempts", attempts, priority, run_at AS "runAt", timeout_ms AS "timeoutMs", created_at AS "createdAt", updated_at AS "updatedAt", started_at AS "startedAt", completed_at AS "completedAt", last_failed_at AS "lastFailedAt", locked_at AS "lockedAt", locked_by AS "lockedBy", error_history AS "errorHistory", failure_reason AS "failureReason", next_attempt_at AS "nextAttemptAt", last_failed_at AS "lastFailedAt", last_retried_at AS "lastRetriedAt", last_cancelled_at AS "lastCancelledAt", pending_reason AS "pendingReason", tags FROM job_queue WHERE id = $1`, [id], ); @@ -413,6 +413,98 @@ export const cancelJob = async (pool: Pool, jobId: number): Promise => { } }; +/** + * Edit a pending job (only if still pending) + */ +export const editJob = async ( + pool: Pool, + jobId: number, + updates: { + payload?: PayloadMap[T]; + maxAttempts?: number; + priority?: number; + runAt?: Date | null; + timeoutMs?: number; + tags?: string[]; + }, +): Promise => { + const client = await pool.connect(); + try { + const updateFields: string[] = []; + const params: any[] = []; + let paramIdx = 1; + + // Build dynamic UPDATE query based on provided fields + if (updates.payload !== undefined) { + updateFields.push(`payload = $${paramIdx++}`); + params.push(updates.payload); + } + if (updates.maxAttempts !== undefined) { + updateFields.push(`max_attempts = $${paramIdx++}`); + params.push(updates.maxAttempts); + } + if (updates.priority !== undefined) { + updateFields.push(`priority = $${paramIdx++}`); + params.push(updates.priority); + } + if (updates.runAt !== undefined) { + if (updates.runAt === null) { + // null means run now (use current timestamp) + updateFields.push(`run_at = NOW()`); + } else { + updateFields.push(`run_at = $${paramIdx++}`); + params.push(updates.runAt); + } + } + if (updates.timeoutMs !== undefined) { + updateFields.push(`timeout_ms = $${paramIdx++}`); + params.push(updates.timeoutMs ?? null); + } + if (updates.tags !== undefined) { + updateFields.push(`tags = $${paramIdx++}`); + params.push(updates.tags ?? null); + } + + // If no fields to update, return early + if (updateFields.length === 0) { + log(`No fields to update for job ${jobId}`); + return; + } + + // Always update updated_at timestamp + updateFields.push(`updated_at = NOW()`); + + // Add jobId as the last parameter for WHERE clause + params.push(jobId); + + const query = ` + UPDATE job_queue + SET ${updateFields.join(', ')} + WHERE id = $${paramIdx} AND status = 'pending' + `; + + await client.query(query, params); + + // Record edit event with metadata containing updated fields + const metadata: any = {}; + if (updates.payload !== undefined) metadata.payload = updates.payload; + if (updates.maxAttempts !== undefined) + metadata.maxAttempts = updates.maxAttempts; + if (updates.priority !== undefined) metadata.priority = updates.priority; + if (updates.runAt !== undefined) metadata.runAt = updates.runAt; + if (updates.timeoutMs !== undefined) metadata.timeoutMs = updates.timeoutMs; + if (updates.tags !== undefined) metadata.tags = updates.tags; + + await recordJobEvent(pool, jobId, JobEventType.Edited, metadata); + log(`Edited job ${jobId}: ${JSON.stringify(metadata)}`); + } catch (error) { + log(`Error editing job ${jobId}: ${error}`); + throw error; + } finally { + client.release(); + } +}; + /** * Cancel all upcoming jobs (pending and scheduled in the future) with optional filters */ diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index bb1326f..18dea64 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -19,6 +19,15 @@ export interface JobOptions> { tags?: string[]; } +/** + * Options for editing a pending job. + * All fields are optional and only provided fields will be updated. + * Note: jobType cannot be changed. + */ +export type EditJobOptions> = Partial< + Omit, 'jobType'> +>; + export enum JobEventType { Added = 'added', Processing = 'processing', @@ -26,6 +35,7 @@ export enum JobEventType { Failed = 'failed', Cancelled = 'cancelled', Retried = 'retried', + Edited = 'edited', } export interface JobEvent { @@ -269,6 +279,17 @@ export interface JobQueue { * - This will set the job status to 'cancelled' and clear the locked_at and locked_by. */ cancelJob: (jobId: number) => Promise; + /** + * Edit a pending job given its ID. + * - Only works for jobs with status 'pending'. Silently fails for other statuses. + * - All fields in EditJobOptions are optional - only provided fields will be updated. + * - jobType cannot be changed. + * - Records an 'edited' event with the updated fields in metadata. + */ + editJob: >( + jobId: number, + updates: EditJobOptions, + ) => Promise; /** * Reclaim stuck jobs. * - If a process (e.g., API route or worker) crashes after marking a job as 'processing' but before completing it, the job can remain stuck in the 'processing' state indefinitely. This can happen if the process is killed or encounters an unhandled error after updating the job status but before marking it as 'completed' or 'failed'. From e5c9dee062c65f8b31b03c6c2306b45fa10695aa Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Mon, 15 Dec 2025 20:53:31 +0100 Subject: [PATCH 2/3] Add editAllPendingJobs functionality and update documentation - Introduced `editAllPendingJobs` method to allow batch editing of pending jobs based on specified filters. - Updated `EditJobOptions` type to include optional fields for batch updates. - Enhanced documentation with detailed usage examples for `editAllPendingJobs`, including filtering by job type, priority, tags, and scheduled run time. - Added tests to verify the functionality of batch editing and ensure correct behavior for pending jobs. - Recorded edit events for each affected job, capturing updated fields in metadata. --- apps/docs/content/docs/api/job-queue.mdx | 77 +++++ apps/docs/content/docs/usage/edit-jobs.mdx | 93 ++++++ packages/dataqueue/src/index.test.ts | 81 +++++ packages/dataqueue/src/index.ts | 17 + packages/dataqueue/src/queue.test.ts | 345 ++++++++++++++++++++- packages/dataqueue/src/queue.ts | 173 ++++++++++- packages/dataqueue/src/types.ts | 30 +- 7 files changed, 812 insertions(+), 4 deletions(-) diff --git a/apps/docs/content/docs/api/job-queue.mdx b/apps/docs/content/docs/api/job-queue.mdx index 08e5f6a..44ede56 100644 --- a/apps/docs/content/docs/api/job-queue.mdx +++ b/apps/docs/content/docs/api/job-queue.mdx @@ -193,6 +193,83 @@ await jobQueue.editJob(jobId, { }); ``` +### editAllPendingJobs + +```ts +editAllPendingJobs( + filters?: { + jobType?: string; + priority?: number; + runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; + tags?: { values: string[]; mode?: 'all' | 'any' | 'none' | 'exact' }; + }, + updates: EditJobOptions +): Promise +``` + +Edits all pending jobs that match the filters. Only works for jobs with status 'pending'. Non-pending jobs are not affected. Returns the number of jobs that were edited. + +#### Parameters + +- `filters` (optional): Filters to select which jobs to edit. If not provided, all pending jobs are edited. + - `jobType`: Filter by job type + - `priority`: Filter by priority + - `runAt`: Filter by scheduled run time (supports `gt`, `gte`, `lt`, `lte`, `eq` operators or exact Date match) + - `tags`: Filter by tags with mode ('all', 'any', 'none', 'exact') +- `updates`: The fields to update (same as `EditJobOptions`). All fields are optional - only provided fields will be updated. + +#### Returns + +The number of jobs that were successfully edited. + +#### Examples + +```ts +// Edit all pending jobs +const editedCount = await jobQueue.editAllPendingJobs(undefined, { + priority: 10, +}); + +// Edit all pending email jobs +const editedCount = await jobQueue.editAllPendingJobs( + { jobType: 'email' }, + { + priority: 5, + }, +); + +// Edit all pending jobs with 'urgent' tag +const editedCount = await jobQueue.editAllPendingJobs( + { tags: { values: ['urgent'], mode: 'any' } }, + { + priority: 10, + maxAttempts: 5, + }, +); + +// Edit all pending jobs scheduled in the future +const editedCount = await jobQueue.editAllPendingJobs( + { runAt: { gte: new Date() } }, + { + priority: 10, + }, +); + +// Edit with combined filters +const editedCount = await jobQueue.editAllPendingJobs( + { + jobType: 'email', + tags: { values: ['urgent'], mode: 'any' }, + }, + { + priority: 10, + maxAttempts: 5, + }, +); +``` + +**Note:** Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. Edit events are recorded for each affected job, just like single job edits. + ### cancelAllUpcomingJobs ```ts diff --git a/apps/docs/content/docs/usage/edit-jobs.mdx b/apps/docs/content/docs/usage/edit-jobs.mdx index dc1ca63..3cfed1d 100644 --- a/apps/docs/content/docs/usage/edit-jobs.mdx +++ b/apps/docs/content/docs/usage/edit-jobs.mdx @@ -111,6 +111,99 @@ await jobQueue.editJob(jobId, { }); ``` +## Batch Editing + +You can edit multiple pending jobs at once using `editAllPendingJobs`. This is useful when you need to update many jobs that match certain criteria. The function returns the number of jobs that were edited. + +### Basic Batch Edit + +```typescript +// Edit all pending jobs +const editedCount = await jobQueue.editAllPendingJobs(undefined, { + priority: 10, +}); +console.log(`Edited ${editedCount} jobs`); +``` + +### Filter by Job Type + +```typescript +// Edit all pending email jobs +const editedCount = await jobQueue.editAllPendingJobs( + { jobType: 'email' }, + { + priority: 5, + }, +); +``` + +### Filter by Priority + +```typescript +// Edit all pending jobs with priority 1 +const editedCount = await jobQueue.editAllPendingJobs( + { priority: 1 }, + { + priority: 5, + }, +); +``` + +### Filter by Tags + +```typescript +// Edit all pending jobs with 'urgent' tag +const editedCount = await jobQueue.editAllPendingJobs( + { tags: { values: ['urgent'], mode: 'any' } }, + { + priority: 10, + }, +); +``` + +### Filter by Scheduled Time + +```typescript +// Edit all pending jobs scheduled in the future +const editedCount = await jobQueue.editAllPendingJobs( + { runAt: { gte: new Date() } }, + { + priority: 10, + }, +); + +// Edit all pending jobs scheduled before a specific date +const editedCount = await jobQueue.editAllPendingJobs( + { runAt: { lt: new Date('2024-12-31') } }, + { + priority: 5, + }, +); +``` + +### Combined Filters + +```typescript +// Edit all pending email jobs with 'urgent' tag +const editedCount = await jobQueue.editAllPendingJobs( + { + jobType: 'email', + tags: { values: ['urgent'], mode: 'any' }, + }, + { + priority: 10, + maxAttempts: 5, + }, +); +``` + +### Batch Edit Notes + +- Only pending jobs are edited. Jobs with other statuses (processing, completed, failed, cancelled) are not affected. +- The function returns the number of jobs that were successfully edited. +- Edit events are recorded for each affected job, just like single job edits. +- If no fields are provided in the updates object, the function returns 0 and no jobs are modified. + ## When to Use Edit vs Cancel vs Retry - **Edit**: Use when you want to modify a pending job's properties before it runs diff --git a/packages/dataqueue/src/index.test.ts b/packages/dataqueue/src/index.test.ts index d38ee22..8e25fef 100644 --- a/packages/dataqueue/src/index.test.ts +++ b/packages/dataqueue/src/index.test.ts @@ -228,6 +228,87 @@ describe('index integration', () => { expect(job2?.status).toBe('cancelled'); }); + it('should edit all pending jobs via JobQueue API', async () => { + // Add three pending jobs + const jobId1 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'batch1@example.com' }, + priority: 0, + }); + const jobId2 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'batch2@example.com' }, + priority: 0, + }); + const jobId3 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'batch3@example.com' }, + priority: 0, + }); + // Add a completed job + const jobId4 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'done@example.com' }, + priority: 0, + }); + await pool.query( + `UPDATE job_queue SET status = 'completed' WHERE id = $1`, + [jobId4], + ); + + // Edit all pending jobs + const editedCount = await jobQueue.editAllPendingJobs(undefined, { + priority: 10, + }); + expect(editedCount).toBeGreaterThanOrEqual(3); + + // Check that all pending jobs are updated + const job1 = await jobQueue.getJob(jobId1); + const job2 = await jobQueue.getJob(jobId2); + const job3 = await jobQueue.getJob(jobId3); + expect(job1?.priority).toBe(10); + expect(job2?.priority).toBe(10); + expect(job3?.priority).toBe(10); + + // Completed job should remain unchanged + const completedJob = await jobQueue.getJob(jobId4); + expect(completedJob?.priority).toBe(0); + }); + + it('should edit pending jobs with filters via JobQueue API', async () => { + const emailJobId1 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'email1@example.com' }, + priority: 0, + }); + const emailJobId2 = await jobQueue.addJob({ + jobType: 'email', + payload: { to: 'email2@example.com' }, + priority: 0, + }); + const smsJobId = await jobQueue.addJob({ + jobType: 'sms', + payload: { to: 'sms@example.com' }, + priority: 0, + }); + + // Edit only email jobs + const editedCount = await jobQueue.editAllPendingJobs( + { jobType: 'email' }, + { + priority: 5, + }, + ); + expect(editedCount).toBeGreaterThanOrEqual(2); + + const emailJob1 = await jobQueue.getJob(emailJobId1); + const emailJob2 = await jobQueue.getJob(emailJobId2); + const smsJob = await jobQueue.getJob(smsJobId); + expect(emailJob1?.priority).toBe(5); + expect(emailJob2?.priority).toBe(5); + expect(smsJob?.priority).toBe(0); + }); + it('should cancel all upcoming jobs by runAt', async () => { const runAt = new Date(Date.now() + 60 * 60 * 1000); // 1 hour in future const jobId1 = await jobQueue.addJob({ diff --git a/packages/dataqueue/src/index.ts b/packages/dataqueue/src/index.ts index 33e2bfc..f848331 100644 --- a/packages/dataqueue/src/index.ts +++ b/packages/dataqueue/src/index.ts @@ -12,6 +12,7 @@ import { getJobsByTags, getJobs, editJob, + editAllPendingJobs, } from './queue.js'; import { createProcessor } from './processor.js'; import { @@ -87,6 +88,22 @@ export const initJobQueue = ( ) => editJob(pool, jobId, updates as any), config.verbose ?? false, ), + editAllPendingJobs: withLogContext( + >( + filters: + | { + jobType?: string; + priority?: number; + runAt?: + | Date + | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; + tags?: { values: string[]; mode?: import('./types.js').TagQueryMode }; + } + | undefined, + updates: import('./types.js').EditJobOptions, + ) => editAllPendingJobs(pool, filters, updates as any), + config.verbose ?? false, + ), cancelAllUpcomingJobs: withLogContext( (filters?: { jobType?: string; diff --git a/packages/dataqueue/src/queue.test.ts b/packages/dataqueue/src/queue.test.ts index 0e86fc6..3f10859 100644 --- a/packages/dataqueue/src/queue.test.ts +++ b/packages/dataqueue/src/queue.test.ts @@ -273,7 +273,9 @@ describe('queue integration', () => { 'SELECT * FROM job_events WHERE job_id = $1 ORDER BY created_at ASC', [jobId], ); - const events = res.rows.map((row) => objectKeysToCamelCase(row) as JobEvent); + const events = res.rows.map( + (row) => objectKeysToCamelCase(row) as JobEvent, + ); const editEvent = events.find((e) => e.eventType === JobEventType.Edited); expect(editEvent).not.toBeUndefined(); expect(editEvent?.metadata).toMatchObject({ @@ -339,6 +341,347 @@ describe('queue integration', () => { expect(job?.priority).toBe(originalJob?.priority); }); + it('should edit all pending jobs without filters', async () => { + // Add three pending jobs + const jobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'batch1@example.com' }, + priority: 0, + }, + ); + const jobId2 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'batch2@example.com' }, + priority: 0, + }, + ); + const jobId3 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'batch3@example.com' }, + priority: 0, + }, + ); + // Add a completed job + const jobId4 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'done@example.com' }, + }, + ); + await queue.completeJob(pool, jobId4); + + // Edit all pending jobs + const editedCount = await queue.editAllPendingJobs< + { email: { to: string } }, + 'email' + >(pool, undefined, { + priority: 10, + }); + expect(editedCount).toBeGreaterThanOrEqual(3); + + // Check that all pending jobs are updated + const job1 = await queue.getJob(pool, jobId1); + const job2 = await queue.getJob(pool, jobId2); + const job3 = await queue.getJob(pool, jobId3); + expect(job1?.priority).toBe(10); + expect(job2?.priority).toBe(10); + expect(job3?.priority).toBe(10); + + // Completed job should remain unchanged + const completedJob = await queue.getJob(pool, jobId4); + expect(completedJob?.priority).toBe(0); + }); + + it('should edit pending jobs filtered by jobType', async () => { + const emailJobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'email1@example.com' }, + priority: 0, + }, + ); + const emailJobId2 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'email2@example.com' }, + priority: 0, + }, + ); + const smsJobId = await queue.addJob<{ sms: { to: string } }, 'sms'>(pool, { + jobType: 'sms', + payload: { to: 'sms@example.com' }, + priority: 0, + }); + + // Edit only email jobs + const editedCount = await queue.editAllPendingJobs< + { email: { to: string }; sms: { to: string } }, + 'email' + >( + pool, + { jobType: 'email' }, + { + priority: 5, + }, + ); + expect(editedCount).toBeGreaterThanOrEqual(2); + + const emailJob1 = await queue.getJob(pool, emailJobId1); + const emailJob2 = await queue.getJob(pool, emailJobId2); + const smsJob = await queue.getJob(pool, smsJobId); + expect(emailJob1?.priority).toBe(5); + expect(emailJob2?.priority).toBe(5); + expect(smsJob?.priority).toBe(0); + }); + + it('should edit pending jobs filtered by priority', async () => { + const lowPriorityJobId1 = await queue.addJob< + { email: { to: string } }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'low1@example.com' }, + priority: 1, + }); + const lowPriorityJobId2 = await queue.addJob< + { email: { to: string } }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'low2@example.com' }, + priority: 1, + }); + const highPriorityJobId = await queue.addJob< + { email: { to: string } }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'high@example.com' }, + priority: 10, + }); + + // Edit only low priority jobs + const editedCount = await queue.editAllPendingJobs< + { email: { to: string } }, + 'email' + >( + pool, + { priority: 1 }, + { + priority: 5, + }, + ); + expect(editedCount).toBeGreaterThanOrEqual(2); + + const lowJob1 = await queue.getJob(pool, lowPriorityJobId1); + const lowJob2 = await queue.getJob(pool, lowPriorityJobId2); + const highJob = await queue.getJob(pool, highPriorityJobId); + expect(lowJob1?.priority).toBe(5); + expect(lowJob2?.priority).toBe(5); + expect(highJob?.priority).toBe(10); + }); + + it('should edit pending jobs filtered by tags', async () => { + const taggedJobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'tagged1@example.com' }, + tags: ['urgent', 'priority'], + }, + ); + const taggedJobId2 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'tagged2@example.com' }, + tags: ['urgent', 'priority'], + }, + ); + const untaggedJobId = await queue.addJob< + { email: { to: string } }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'untagged@example.com' }, + tags: ['other'], + }); + + // Edit only jobs with 'urgent' tag + const editedCount = await queue.editAllPendingJobs< + { email: { to: string } }, + 'email' + >( + pool, + { tags: { values: ['urgent'], mode: 'any' } }, + { + priority: 10, + }, + ); + expect(editedCount).toBeGreaterThanOrEqual(2); + + const taggedJob1 = await queue.getJob(pool, taggedJobId1); + const taggedJob2 = await queue.getJob(pool, taggedJobId2); + const untaggedJob = await queue.getJob(pool, untaggedJobId); + expect(taggedJob1?.priority).toBe(10); + expect(taggedJob2?.priority).toBe(10); + expect(untaggedJob?.priority).toBe(0); + }); + + it('should edit pending jobs filtered by runAt', async () => { + const futureDate = new Date(Date.now() + 60000); + const pastDate = new Date(Date.now() - 60000); + + const futureJobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'future1@example.com' }, + runAt: futureDate, + }, + ); + const futureJobId2 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'future2@example.com' }, + runAt: futureDate, + }, + ); + const pastJobId = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'past@example.com' }, + runAt: pastDate, + }, + ); + + // Edit only jobs scheduled in the future + const editedCount = await queue.editAllPendingJobs< + { email: { to: string } }, + 'email' + >( + pool, + { runAt: { gte: new Date() } }, + { + priority: 10, + }, + ); + expect(editedCount).toBeGreaterThanOrEqual(2); + + const futureJob1 = await queue.getJob(pool, futureJobId1); + const futureJob2 = await queue.getJob(pool, futureJobId2); + const pastJob = await queue.getJob(pool, pastJobId); + expect(futureJob1?.priority).toBe(10); + expect(futureJob2?.priority).toBe(10); + expect(pastJob?.priority).toBe(0); + }); + + it('should not edit non-pending jobs', async () => { + // Create processingJobId first so it gets picked up by getNextBatch + const processingJobId = await queue.addJob< + { email: { to: string } }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'processing@example.com' }, + priority: 0, + }); + const pendingJobId = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'pending@example.com' }, + priority: 0, + }, + ); + // Mark as processing (this will pick up processingJobId since it was created first) + await queue.getNextBatch(pool, 'worker-batch', 1); + const completedJobId = await queue.addJob< + { email: { to: string } }, + 'email' + >(pool, { + jobType: 'email', + payload: { to: 'completed@example.com' }, + priority: 0, + }); + await queue.completeJob(pool, completedJobId); + + // Edit all pending jobs + const editedCount = await queue.editAllPendingJobs< + { email: { to: string } }, + 'email' + >(pool, undefined, { + priority: 10, + }); + + const pendingJob = await queue.getJob(pool, pendingJobId); + const processingJob = await queue.getJob(pool, processingJobId); + const completedJob = await queue.getJob(pool, completedJobId); + expect(pendingJob?.priority).toBe(10); + expect(processingJob?.priority).toBe(0); + expect(completedJob?.priority).toBe(0); + expect(editedCount).toBeGreaterThanOrEqual(1); + }); + + it('should record edit events for each edited job', async () => { + const jobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'event1@example.com' }, + }, + ); + const jobId2 = await queue.addJob<{ email: { to: string } }, 'email'>( + pool, + { + jobType: 'email', + payload: { to: 'event2@example.com' }, + }, + ); + + await queue.editAllPendingJobs<{ email: { to: string } }, 'email'>( + pool, + undefined, + { + priority: 10, + }, + ); + + const events1 = await queue.getJobEvents(pool, jobId1); + const events2 = await queue.getJobEvents(pool, jobId2); + const editEvent1 = events1.find((e) => e.eventType === JobEventType.Edited); + const editEvent2 = events2.find((e) => e.eventType === JobEventType.Edited); + expect(editEvent1).not.toBeUndefined(); + expect(editEvent2).not.toBeUndefined(); + expect(editEvent1?.metadata).toMatchObject({ priority: 10 }); + expect(editEvent2?.metadata).toMatchObject({ priority: 10 }); + }); + + it('should return 0 when no fields to update', async () => { + await queue.addJob<{ email: { to: string } }, 'email'>(pool, { + jobType: 'email', + payload: { to: 'empty@example.com' }, + }); + + const editedCount = await queue.editAllPendingJobs< + { email: { to: string } }, + 'email' + >(pool, undefined, {}); + + expect(editedCount).toBe(0); + }); + it('should cancel all upcoming jobs', async () => { // Add three pending jobs const jobId1 = await queue.addJob<{ email: { to: string } }, 'email'>( diff --git a/packages/dataqueue/src/queue.ts b/packages/dataqueue/src/queue.ts index 7cf9a44..d4409c8 100644 --- a/packages/dataqueue/src/queue.ts +++ b/packages/dataqueue/src/queue.ts @@ -424,8 +424,8 @@ export const editJob = async ( maxAttempts?: number; priority?: number; runAt?: Date | null; - timeoutMs?: number; - tags?: string[]; + timeoutMs?: number | null; + tags?: string[] | null; }, ): Promise => { const client = await pool.connect(); @@ -505,6 +505,175 @@ export const editJob = async ( } }; +/** + * Edit all pending jobs matching the filters + */ +export const editAllPendingJobs = async ( + pool: Pool, + filters: { + jobType?: string; + priority?: number; + runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; + tags?: { values: string[]; mode?: TagQueryMode }; + } | undefined = undefined, + updates: { + payload?: PayloadMap[T]; + maxAttempts?: number; + priority?: number; + runAt?: Date | null; + timeoutMs?: number; + tags?: string[]; + }, +): Promise => { + const client = await pool.connect(); + try { + // Build SET clause from updates + const updateFields: string[] = []; + const params: any[] = []; + let paramIdx = 1; + + if (updates.payload !== undefined) { + updateFields.push(`payload = $${paramIdx++}`); + params.push(updates.payload); + } + if (updates.maxAttempts !== undefined) { + updateFields.push(`max_attempts = $${paramIdx++}`); + params.push(updates.maxAttempts); + } + if (updates.priority !== undefined) { + updateFields.push(`priority = $${paramIdx++}`); + params.push(updates.priority); + } + if (updates.runAt !== undefined) { + if (updates.runAt === null) { + // null means run now (use current timestamp) + updateFields.push(`run_at = NOW()`); + } else { + updateFields.push(`run_at = $${paramIdx++}`); + params.push(updates.runAt); + } + } + if (updates.timeoutMs !== undefined) { + updateFields.push(`timeout_ms = $${paramIdx++}`); + params.push(updates.timeoutMs ?? null); + } + if (updates.tags !== undefined) { + updateFields.push(`tags = $${paramIdx++}`); + params.push(updates.tags ?? null); + } + + // If no fields to update, return early + if (updateFields.length === 0) { + log(`No fields to update for batch edit`); + return 0; + } + + // Always update updated_at timestamp + updateFields.push(`updated_at = NOW()`); + + // Build WHERE clause from filters + let query = ` + UPDATE job_queue + SET ${updateFields.join(', ')} + WHERE status = 'pending'`; + + if (filters) { + if (filters.jobType) { + query += ` AND job_type = $${paramIdx++}`; + params.push(filters.jobType); + } + if (filters.priority !== undefined) { + query += ` AND priority = $${paramIdx++}`; + params.push(filters.priority); + } + if (filters.runAt) { + if (filters.runAt instanceof Date) { + query += ` AND run_at = $${paramIdx++}`; + params.push(filters.runAt); + } else if (typeof filters.runAt === 'object') { + const ops = filters.runAt; + if (ops.gt) { + query += ` AND run_at > $${paramIdx++}`; + params.push(ops.gt); + } + if (ops.gte) { + query += ` AND run_at >= $${paramIdx++}`; + params.push(ops.gte); + } + if (ops.lt) { + query += ` AND run_at < $${paramIdx++}`; + params.push(ops.lt); + } + if (ops.lte) { + query += ` AND run_at <= $${paramIdx++}`; + params.push(ops.lte); + } + if (ops.eq) { + query += ` AND run_at = $${paramIdx++}`; + params.push(ops.eq); + } + } + } + if ( + filters.tags && + filters.tags.values && + filters.tags.values.length > 0 + ) { + const mode = filters.tags.mode || 'all'; + const tagValues = filters.tags.values; + switch (mode) { + case 'exact': + query += ` AND tags = $${paramIdx++}`; + params.push(tagValues); + break; + case 'all': + query += ` AND tags @> $${paramIdx++}`; + params.push(tagValues); + break; + case 'any': + query += ` AND tags && $${paramIdx++}`; + params.push(tagValues); + break; + case 'none': + query += ` AND NOT (tags && $${paramIdx++})`; + params.push(tagValues); + break; + default: + query += ` AND tags @> $${paramIdx++}`; + params.push(tagValues); + } + } + } + query += '\nRETURNING id'; + + const result = await client.query(query, params); + const editedCount = result.rowCount || 0; + + // Record edit event with metadata containing updated fields for each job + const metadata: any = {}; + if (updates.payload !== undefined) metadata.payload = updates.payload; + if (updates.maxAttempts !== undefined) + metadata.maxAttempts = updates.maxAttempts; + if (updates.priority !== undefined) metadata.priority = updates.priority; + if (updates.runAt !== undefined) metadata.runAt = updates.runAt; + if (updates.timeoutMs !== undefined) metadata.timeoutMs = updates.timeoutMs; + if (updates.tags !== undefined) metadata.tags = updates.tags; + + // Record events for each affected job + for (const row of result.rows) { + await recordJobEvent(pool, row.id, JobEventType.Edited, metadata); + } + + log(`Edited ${editedCount} pending jobs: ${JSON.stringify(metadata)}`); + return editedCount; + } catch (error) { + log(`Error editing pending jobs: ${error}`); + throw error; + } finally { + client.release(); + } +}; + /** * Cancel all upcoming jobs (pending and scheduled in the future) with optional filters */ diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index 18dea64..58577ee 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -23,10 +23,14 @@ export interface JobOptions> { * Options for editing a pending job. * All fields are optional and only provided fields will be updated. * Note: jobType cannot be changed. + * timeoutMs and tags can be set to null to clear them. */ export type EditJobOptions> = Partial< Omit, 'jobType'> ->; +> & { + timeoutMs?: number | null; + tags?: string[] | null; +}; export enum JobEventType { Added = 'added', @@ -290,6 +294,30 @@ export interface JobQueue { jobId: number, updates: EditJobOptions, ) => Promise; + /** + * Edit all pending jobs that match the filters. + * - Only works for jobs with status 'pending'. Non-pending jobs are not affected. + * - All fields in EditJobOptions are optional - only provided fields will be updated. + * - jobType cannot be changed. + * - Records an 'edited' event with the updated fields in metadata for each affected job. + * - Returns the number of jobs that were edited. + * - The filters are: + * - jobType: The job type to edit. + * - priority: The priority of the job to edit. + * - runAt: The time the job is scheduled to run at (now supports gt/gte/lt/lte/eq). + * - tags: An object with 'values' (string[]) and 'mode' (TagQueryMode) for tag-based editing. + */ + editAllPendingJobs: >( + filters: + | { + jobType?: string; + priority?: number; + runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; + tags?: { values: string[]; mode?: TagQueryMode }; + } + | undefined, + updates: EditJobOptions, + ) => Promise; /** * Reclaim stuck jobs. * - If a process (e.g., API route or worker) crashes after marking a job as 'processing' but before completing it, the job can remain stuck in the 'processing' state indefinitely. This can happen if the process is killed or encounters an unhandled error after updating the job status but before marking it as 'completed' or 'failed'. From 0a2655a4a4ef6508423b968bac7075c2bbf772df Mon Sep 17 00:00:00 2001 From: Nico Prananta <311343+nicnocquee@users.noreply.github.com> Date: Mon, 15 Dec 2025 21:10:56 +0100 Subject: [PATCH 3/3] Refactor job payload structure in tests and update type definitions - Changed job payload key from `to` to `foo` in integration tests to align with updated job processing logic. - Enhanced type definitions for job filters and payloads in `editAllPendingJobs` and `JobQueue` interfaces for improved clarity and consistency. - Added new handler for `test` job type in processor to support additional processing scenarios. --- packages/dataqueue/src/index.test.ts | 25 ++++++++++++++---------- packages/dataqueue/src/index.ts | 5 ++++- packages/dataqueue/src/queue.ts | 29 +++++++++++++++++----------- packages/dataqueue/src/types.ts | 4 +++- 4 files changed, 40 insertions(+), 23 deletions(-) diff --git a/packages/dataqueue/src/index.test.ts b/packages/dataqueue/src/index.test.ts index 8e25fef..44c5d94 100644 --- a/packages/dataqueue/src/index.test.ts +++ b/packages/dataqueue/src/index.test.ts @@ -406,17 +406,17 @@ describe('index integration', () => { }); it('should edit a job and then process it correctly', async () => { - const handler = vi.fn(async (payload: { to: string }, _signal) => { - expect(payload.to).toBe('updated@example.com'); + const handler = vi.fn(async (payload: { foo: string }, _signal) => { + expect(payload.foo).toBe('updated@example.com'); }); const jobId = await jobQueue.addJob({ jobType: 'test', - payload: { to: 'original@example.com' }, + payload: { foo: 'original@example.com' }, }); // Edit the job before processing await jobQueue.editJob(jobId, { - payload: { to: 'updated@example.com' }, + payload: { foo: 'updated@example.com' }, }); const processor = jobQueue.createProcessor( @@ -432,7 +432,7 @@ describe('index integration', () => { processor.stop(); expect(handler).toHaveBeenCalledWith( - { to: 'updated@example.com' }, + { foo: 'updated@example.com' }, expect.any(Object), ); const job = await jobQueue.getJob(jobId); @@ -448,6 +448,8 @@ describe('index integration', () => { const processor = jobQueue.createProcessor( { email: vi.fn(async () => {}), + sms: vi.fn(async () => {}), + test: vi.fn(async () => {}), }, { pollInterval: 100 }, ); @@ -468,14 +470,17 @@ describe('index integration', () => { // Try to edit a processing job // Use a handler that takes longer to ensure job stays in processing state - const slowHandler = vi.fn( - async (payload: { to: string }, _signal) => { - await new Promise((r) => setTimeout(r, 200)); - }, - ); + const slowHandler = vi.fn(async (payload: { to: string }, _signal) => { + await new Promise((r) => setTimeout(r, 200)); + }); + const slowHandlerTest = vi.fn(async (payload: { foo: string }, _signal) => { + await new Promise((r) => setTimeout(r, 200)); + }); const processor2 = jobQueue.createProcessor( { email: slowHandler, + sms: slowHandler, + test: slowHandlerTest, }, { pollInterval: 100 }, ); diff --git a/packages/dataqueue/src/index.ts b/packages/dataqueue/src/index.ts index f848331..764f7d0 100644 --- a/packages/dataqueue/src/index.ts +++ b/packages/dataqueue/src/index.ts @@ -97,7 +97,10 @@ export const initJobQueue = ( runAt?: | Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; - tags?: { values: string[]; mode?: import('./types.js').TagQueryMode }; + tags?: { + values: string[]; + mode?: import('./types.js').TagQueryMode; + }; } | undefined, updates: import('./types.js').EditJobOptions, diff --git a/packages/dataqueue/src/queue.ts b/packages/dataqueue/src/queue.ts index d4409c8..df36a34 100644 --- a/packages/dataqueue/src/queue.ts +++ b/packages/dataqueue/src/queue.ts @@ -508,14 +508,21 @@ export const editJob = async ( /** * Edit all pending jobs matching the filters */ -export const editAllPendingJobs = async ( +export const editAllPendingJobs = async < + PayloadMap, + T extends keyof PayloadMap & string, +>( pool: Pool, - filters: { - jobType?: string; - priority?: number; - runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; - tags?: { values: string[]; mode?: TagQueryMode }; - } | undefined = undefined, + filters: + | { + jobType?: string; + priority?: number; + runAt?: + | Date + | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; + tags?: { values: string[]; mode?: TagQueryMode }; + } + | undefined = undefined, updates: { payload?: PayloadMap[T]; maxAttempts?: number; @@ -576,7 +583,7 @@ export const editAllPendingJobs = async { | { jobType?: string; priority?: number; - runAt?: Date | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; + runAt?: + | Date + | { gt?: Date; gte?: Date; lt?: Date; lte?: Date; eq?: Date }; tags?: { values: string[]; mode?: TagQueryMode }; } | undefined,