Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ process.on('SIGTERM', async () => {

## Configuration

| Option | Type | Description |
| -------------- | ----------------------------------------- | ------------------------------------------------------------------- |
| `queues` | `Record<string, { concurrency: number }>` | Queues to poll and their concurrency limits. Required for `work()`. |
| `maxAttempts` | `number` | Default max attempts for inserted jobs. |
| `pollInterval` | `number` | Milliseconds between polls. Defaults to `1000`. |
| `clientId` | `string` | Unique ID for this client instance. Defaults to `hostname-pid`. |
| Option | Type | Description |
| ------------------ | ----------------------------------------- | --------------------------------------------------------------------------------------------- |
| `queues` | `Record<string, { concurrency: number }>` | Queues to poll and their concurrency limits. Required for `work()`. |
| `maxAttempts` | `number` | Default max attempts for inserted jobs. |
| `pollInterval` | `number` | Milliseconds between polls. Defaults to `1000`. |
| `busyPollInterval` | `number` | Milliseconds between polls after a full batch is found. Defaults to `pollInterval`. |
| `clientId` | `string` | Unique ID for this client instance. Defaults to `hostname-pid`. |

## License

Expand Down
36 changes: 25 additions & 11 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os from 'os';
import process from 'process';
import { clearTimeout, setTimeout } from 'timers';
import { Driver } from './drivers';
import { ClientConfiguration, InsertOpts, InsertResult, Job, JobArgs, Worker } from './types';
import { CLIENT_CONFIGURATION_DEFAULTS } from './utils';
Expand All @@ -14,7 +13,7 @@ import { CLIENT_CONFIGURATION_DEFAULTS } from './utils';
export default class RiverClient<D extends Driver<Tx>, Tx> {
private readonly running: Map<string, number> = new Map();
private stopped = false;
private pollTimer: ReturnType<typeof setTimeout> | null = null;
private pollTimer: ReturnType<typeof globalThis.setTimeout> | null = null;
private readonly driver: D;
private readonly configuration: ClientConfiguration;
private readonly workers: Map<string, Worker> = new Map();
Expand All @@ -23,7 +22,7 @@ export default class RiverClient<D extends Driver<Tx>, Tx> {
/**
* Creates a new RiverClient instance.
* @param driver - The driver implementation (e.g. PgDriver).
* @param configuration - Client options: queues, max attempts, poll interval, and client ID.
* @param configuration - Client options: queues, max attempts, poll intervals, and client ID.
*/
constructor(driver: D, configuration: ClientConfiguration) {
this.driver = driver;
Expand All @@ -44,7 +43,7 @@ export default class RiverClient<D extends Driver<Tx>, Tx> {
*/
async close(): Promise<void> {
this.stopped = true;
if (this.pollTimer) clearTimeout(this.pollTimer);
if (this.pollTimer) globalThis.clearTimeout(this.pollTimer);
await this.driver.close();
}

Expand Down Expand Up @@ -123,23 +122,36 @@ export default class RiverClient<D extends Driver<Tx>, Tx> {
private poll(): void {
if (this.stopped) return;

this.fetchAndWork().finally(() => {
this.pollTimer = setTimeout(
() => this.poll(),
this.configuration.pollInterval ?? CLIENT_CONFIGURATION_DEFAULTS.pollInterval,
);
});
this.fetchAndWork().then(
(hasFullBatch) => {
const pollInterval =
this.configuration.pollInterval ?? CLIENT_CONFIGURATION_DEFAULTS.pollInterval;
const delay = hasFullBatch
? (this.configuration.busyPollInterval ?? pollInterval)
: pollInterval;

this.pollTimer = globalThis.setTimeout(() => this.poll(), delay);
},
() => {
this.pollTimer = globalThis.setTimeout(
() => this.poll(),
this.configuration.pollInterval ?? CLIENT_CONFIGURATION_DEFAULTS.pollInterval,
);
},
);
}

private async fetchAndWork(): Promise<void> {
private async fetchAndWork(): Promise<boolean> {
const queues = Object.entries(this.configuration.queues ?? {});
let hasFullBatch = false;

for (const [queue, queueConfig] of queues) {
const running = this.running.get(queue) ?? 0;
const slots = queueConfig.concurrency - running;
if (slots <= 0) continue;

const jobs = await this.driver.getAvailableJobs(queue, slots, this.clientId);
if (jobs.length === slots) hasFullBatch = true;

for (const job of jobs) {
this.running.set(job.queue, (this.running.get(job.queue) ?? 0) + 1);
Expand All @@ -148,6 +160,8 @@ export default class RiverClient<D extends Driver<Tx>, Tx> {
});
}
}

return hasFullBatch;
}

private async processJob(job: Job): Promise<void> {
Expand Down
9 changes: 8 additions & 1 deletion src/types/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@ export default interface ClientConfiguration {

/**
* How often (in milliseconds) the client polls the database for available jobs.
* Only applies when `work()` is called.
* Only applies when `work()` is called. Defaults to `1000`.
*/
pollInterval?: number;

/**
* How long to wait before polling again after a queue returns a full batch.
* Lower values reduce latency while a queue is busy, but can increase database
* query volume. Defaults to `pollInterval`.
*/
busyPollInterval?: number;

/**
* Unique identifier for this client instance, recorded in each job's `attemptedBy` list.
* Defaults to `hostname-pid` (e.g. `"web-server-12345"`).
Expand Down
70 changes: 70 additions & 0 deletions test/node-river/unit/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ function makeJob(overrides: Partial<Job> = {}): Job {
};
}

async function flushPromises(): Promise<void> {
await Promise.resolve();
await Promise.resolve();
}

describe('RiverClient worker', () => {
let driver: Driver<unknown>;
let client: RiverClient<Driver<unknown>, unknown>;
Expand All @@ -58,6 +63,7 @@ describe('RiverClient worker', () => {

afterEach(async () => {
await client.close();
jest.useRealTimers();
});

it('calls worker.work() and completeJob when the worker succeeds', async () => {
Expand Down Expand Up @@ -155,4 +161,68 @@ describe('RiverClient worker', () => {
expect(driver.completeJob).toHaveBeenCalledWith(1);
expect(driver.completeJob).toHaveBeenCalledWith(2);
});

it('re-polls after busyPollInterval when a queue returns a full batch', async () => {
jest.useFakeTimers();
client = new RiverClient(driver, {
queues: { default: { concurrency: 2 } },
pollInterval: 60_000,
busyPollInterval: 0,
});
const jobs = [makeJob({ id: 1 }), makeJob({ id: 2 })];
(driver.getAvailableJobs as jest.Mock).mockResolvedValue([]);
(driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce(jobs);

const worker: Worker = { work: jest.fn().mockResolvedValue(undefined) };
client.addWorker('test_job', worker);
client.work();

await flushPromises();
expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(0);

expect(driver.getAvailableJobs).toHaveBeenCalledTimes(2);
});

it('defaults busy polling to the poll interval', async () => {
jest.useFakeTimers();
const jobs = [makeJob({ id: 1 }), makeJob({ id: 2 })];
(driver.getAvailableJobs as jest.Mock).mockResolvedValue([]);
(driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce(jobs);

const worker: Worker = { work: jest.fn().mockResolvedValue(undefined) };
client.addWorker('test_job', worker);
client.work();

await flushPromises();
expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(59_999);
expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(1);

expect(driver.getAvailableJobs).toHaveBeenCalledTimes(2);
});

it('waits the poll interval when a queue returns fewer jobs than available slots', async () => {
jest.useFakeTimers();
(driver.getAvailableJobs as jest.Mock).mockResolvedValue([]);
(driver.getAvailableJobs as jest.Mock).mockResolvedValueOnce([makeJob()]);

const worker: Worker = { work: jest.fn().mockResolvedValue(undefined) };
client.addWorker('test_job', worker);
client.work();

await flushPromises();
expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(59_999);
expect(driver.getAvailableJobs).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(1);

expect(driver.getAvailableJobs).toHaveBeenCalledTimes(2);
});
});
Loading