From 3f00d9b5c3a1a9eef8a6b42bc8aba7db7769203f Mon Sep 17 00:00:00 2001 From: Odunlami Zacchaeus Date: Wed, 13 May 2026 12:18:12 +0100 Subject: [PATCH] fetch jobs from queues concurrently --- src/client.ts | 19 ++++++++++++++----- test/node-river/unit/client.test.ts | 28 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/client.ts b/src/client.ts index 354e02f..ba3ebb2 100644 --- a/src/client.ts +++ b/src/client.ts @@ -143,14 +143,23 @@ export default class RiverClient, Tx> { private async fetchAndWork(): Promise { const queues = Object.entries(this.configuration.queues ?? {}); - let hasFullBatch = false; - - for (const [queue, queueConfig] of queues) { + const queueFetches = queues.flatMap(([queue, queueConfig]) => { const running = this.running.get(queue) ?? 0; const slots = queueConfig.concurrency - running; - if (slots <= 0) continue; + if (slots <= 0) return []; + + return [ + this.driver.getAvailableJobs(queue, slots, this.clientId).then((jobs) => ({ + jobs, + slots, + })), + ]; + }); + + const queueResults = await Promise.all(queueFetches); + let hasFullBatch = false; - const jobs = await this.driver.getAvailableJobs(queue, slots, this.clientId); + for (const { jobs, slots } of queueResults) { if (jobs.length === slots) hasFullBatch = true; for (const job of jobs) { diff --git a/test/node-river/unit/client.test.ts b/test/node-river/unit/client.test.ts index a2b124d..c58c24c 100644 --- a/test/node-river/unit/client.test.ts +++ b/test/node-river/unit/client.test.ts @@ -140,6 +140,34 @@ describe('RiverClient worker', () => { expect(driver.getAvailableJobs).toHaveBeenCalledWith('default', 5, expect.any(String)); }); + it('polls multiple queues in parallel', async () => { + client = new RiverClient(driver, { + queues: { + default: { concurrency: 2 }, + urgent: { concurrency: 3 }, + }, + pollInterval: 60_000, + }); + + let resolveDefaultQueue: (jobs: Job[]) => void; + const defaultQueue = new Promise((resolve) => { + resolveDefaultQueue = resolve; + }); + + (driver.getAvailableJobs as jest.Mock).mockImplementation((queue: string) => { + if (queue === 'default') return defaultQueue; + return Promise.resolve([]); + }); + + client.work(); + await flushPromises(); + + expect(driver.getAvailableJobs).toHaveBeenCalledWith('default', 2, expect.any(String)); + expect(driver.getAvailableJobs).toHaveBeenCalledWith('urgent', 3, expect.any(String)); + + resolveDefaultQueue!([]); + }); + it('processes multiple jobs from the same poll concurrently', async () => { const jobs = [makeJob({ id: 1 }), makeJob({ id: 2 })]; (driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce(jobs);