From 4392b4848b9b9d86038351d41625f1134bdf0781 Mon Sep 17 00:00:00 2001 From: Odunlami Zacchaeus Date: Wed, 13 May 2026 11:11:43 +0100 Subject: [PATCH] feat: add configurable busy poll interval --- README.md | 13 +++--- src/client.ts | 36 ++++++++++----- src/types/config.ts | 9 +++- test/node-river/unit/client.test.ts | 70 +++++++++++++++++++++++++++++ 4 files changed, 110 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 06c24bf..b76d8a6 100644 --- a/README.md +++ b/README.md @@ -119,12 +119,13 @@ process.on('SIGTERM', async () => { ## Configuration -| Option | Type | Description | -| -------------- | ----------------------------------------- | ------------------------------------------------------------------- | -| `queues` | `Record` | Queues to poll and their concurrency limits. Required for `work()`. | -| `maxAttempts` | `number` | Default max attempts for inserted jobs. | -| `pollInterval` | `number` | Milliseconds between polls. Defaults to `1000`. | -| `clientId` | `string` | Unique ID for this client instance. Defaults to `hostname-pid`. | +| Option | Type | Description | +| ------------------ | ----------------------------------------- | --------------------------------------------------------------------------------------------- | +| `queues` | `Record` | Queues to poll and their concurrency limits. Required for `work()`. | +| `maxAttempts` | `number` | Default max attempts for inserted jobs. | +| `pollInterval` | `number` | Milliseconds between polls. Defaults to `1000`. | +| `busyPollInterval` | `number` | Milliseconds between polls after a full batch is found. Defaults to `pollInterval`. | +| `clientId` | `string` | Unique ID for this client instance. Defaults to `hostname-pid`. | ## License diff --git a/src/client.ts b/src/client.ts index 613d99a..354e02f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,6 +1,5 @@ import os from 'os'; import process from 'process'; -import { clearTimeout, setTimeout } from 'timers'; import { Driver } from './drivers'; import { ClientConfiguration, InsertOpts, InsertResult, Job, JobArgs, Worker } from './types'; import { CLIENT_CONFIGURATION_DEFAULTS } from './utils'; @@ -14,7 +13,7 @@ import { CLIENT_CONFIGURATION_DEFAULTS } from './utils'; export default class RiverClient, Tx> { private readonly running: Map = new Map(); private stopped = false; - private pollTimer: ReturnType | null = null; + private pollTimer: ReturnType | null = null; private readonly driver: D; private readonly configuration: ClientConfiguration; private readonly workers: Map = new Map(); @@ -23,7 +22,7 @@ export default class RiverClient, Tx> { /** * Creates a new RiverClient instance. * @param driver - The driver implementation (e.g. PgDriver). - * @param configuration - Client options: queues, max attempts, poll interval, and client ID. + * @param configuration - Client options: queues, max attempts, poll intervals, and client ID. */ constructor(driver: D, configuration: ClientConfiguration) { this.driver = driver; @@ -44,7 +43,7 @@ export default class RiverClient, Tx> { */ async close(): Promise { this.stopped = true; - if (this.pollTimer) clearTimeout(this.pollTimer); + if (this.pollTimer) globalThis.clearTimeout(this.pollTimer); await this.driver.close(); } @@ -123,16 +122,28 @@ export default class RiverClient, Tx> { private poll(): void { if (this.stopped) return; - this.fetchAndWork().finally(() => { - this.pollTimer = setTimeout( - () => this.poll(), - this.configuration.pollInterval ?? CLIENT_CONFIGURATION_DEFAULTS.pollInterval, - ); - }); + this.fetchAndWork().then( + (hasFullBatch) => { + const pollInterval = + this.configuration.pollInterval ?? CLIENT_CONFIGURATION_DEFAULTS.pollInterval; + const delay = hasFullBatch + ? (this.configuration.busyPollInterval ?? pollInterval) + : pollInterval; + + this.pollTimer = globalThis.setTimeout(() => this.poll(), delay); + }, + () => { + this.pollTimer = globalThis.setTimeout( + () => this.poll(), + this.configuration.pollInterval ?? CLIENT_CONFIGURATION_DEFAULTS.pollInterval, + ); + }, + ); } - private async fetchAndWork(): Promise { + private async fetchAndWork(): Promise { const queues = Object.entries(this.configuration.queues ?? {}); + let hasFullBatch = false; for (const [queue, queueConfig] of queues) { const running = this.running.get(queue) ?? 0; @@ -140,6 +151,7 @@ export default class RiverClient, Tx> { if (slots <= 0) continue; const jobs = await this.driver.getAvailableJobs(queue, slots, this.clientId); + if (jobs.length === slots) hasFullBatch = true; for (const job of jobs) { this.running.set(job.queue, (this.running.get(job.queue) ?? 0) + 1); @@ -148,6 +160,8 @@ export default class RiverClient, Tx> { }); } } + + return hasFullBatch; } private async processJob(job: Job): Promise { diff --git a/src/types/config.ts b/src/types/config.ts index d432da1..f8998b8 100644 --- a/src/types/config.ts +++ b/src/types/config.ts @@ -17,10 +17,17 @@ export default interface ClientConfiguration { /** * How often (in milliseconds) the client polls the database for available jobs. - * Only applies when `work()` is called. + * Only applies when `work()` is called. Defaults to `1000`. */ pollInterval?: number; + /** + * How long to wait before polling again after a queue returns a full batch. + * Lower values reduce latency while a queue is busy, but can increase database + * query volume. Defaults to `pollInterval`. + */ + busyPollInterval?: number; + /** * Unique identifier for this client instance, recorded in each job's `attemptedBy` list. * Defaults to `hostname-pid` (e.g. `"web-server-12345"`). diff --git a/test/node-river/unit/client.test.ts b/test/node-river/unit/client.test.ts index 25c3478..a2b124d 100644 --- a/test/node-river/unit/client.test.ts +++ b/test/node-river/unit/client.test.ts @@ -43,6 +43,11 @@ function makeJob(overrides: Partial = {}): Job { }; } +async function flushPromises(): Promise { + await Promise.resolve(); + await Promise.resolve(); +} + describe('RiverClient worker', () => { let driver: Driver; let client: RiverClient, unknown>; @@ -58,6 +63,7 @@ describe('RiverClient worker', () => { afterEach(async () => { await client.close(); + jest.useRealTimers(); }); it('calls worker.work() and completeJob when the worker succeeds', async () => { @@ -155,4 +161,68 @@ describe('RiverClient worker', () => { expect(driver.completeJob).toHaveBeenCalledWith(1); expect(driver.completeJob).toHaveBeenCalledWith(2); }); + + it('re-polls after busyPollInterval when a queue returns a full batch', async () => { + jest.useFakeTimers(); + client = new RiverClient(driver, { + queues: { default: { concurrency: 2 } }, + pollInterval: 60_000, + busyPollInterval: 0, + }); + const jobs = [makeJob({ id: 1 }), makeJob({ id: 2 })]; + (driver.getAvailableJobs as jest.Mock).mockResolvedValue([]); + (driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce(jobs); + + const worker: Worker = { work: jest.fn().mockResolvedValue(undefined) }; + client.addWorker('test_job', worker); + client.work(); + + await flushPromises(); + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(0); + + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(2); + }); + + it('defaults busy polling to the poll interval', async () => { + jest.useFakeTimers(); + const jobs = [makeJob({ id: 1 }), makeJob({ id: 2 })]; + (driver.getAvailableJobs as jest.Mock).mockResolvedValue([]); + (driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce(jobs); + + const worker: Worker = { work: jest.fn().mockResolvedValue(undefined) }; + client.addWorker('test_job', worker); + client.work(); + + await flushPromises(); + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(59_999); + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(1); + + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(2); + }); + + it('waits the poll interval when a queue returns fewer jobs than available slots', async () => { + jest.useFakeTimers(); + (driver.getAvailableJobs as jest.Mock).mockResolvedValue([]); + (driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce([makeJob()]); + + const worker: Worker = { work: jest.fn().mockResolvedValue(undefined) }; + client.addWorker('test_job', worker); + client.work(); + + await flushPromises(); + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(59_999); + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(1); + + expect(driver.getAvailableJobs).toHaveBeenCalledTimes(2); + }); });