From 8638d98d5ede3ad4cb9163011ea58170a976589e Mon Sep 17 00:00:00 2001 From: Odunlami Zacchaeus Date: Wed, 13 May 2026 13:03:39 +0100 Subject: [PATCH 1/2] batch non-unique insertMany jobs --- src/drivers/pg/pg-driver.ts | 291 ++++++++++++++++--------- src/drivers/pg/types/index.ts | 3 - src/drivers/pg/types/query-executor.ts | 8 - 3 files changed, 184 insertions(+), 118 deletions(-) delete mode 100644 src/drivers/pg/types/index.ts delete mode 100644 src/drivers/pg/types/query-executor.ts diff --git a/src/drivers/pg/pg-driver.ts b/src/drivers/pg/pg-driver.ts index 9f6f19e..77a0375 100644 --- a/src/drivers/pg/pg-driver.ts +++ b/src/drivers/pg/pg-driver.ts @@ -5,7 +5,26 @@ import { InsertOpts, InsertResult, Job, JobArgs, JobState } from '../../types'; import { bitmaskToJobStates, mapToUniqueKey } from '../../utils'; import Driver from '../driver'; import Options from './pg-options'; -import { QueryExecutor } from './types'; + +const JOB_RETURNING_COLUMNS = ` + id, + state, + attempt, + max_attempts as "maxAttempts", + attempted_at as "attemptedAt", + created_at as "createdAt", + finalized_at as "finalizedAt", + scheduled_at as "scheduledAt", + priority, + args, + attempted_by as "attemptedBy", + errors, + kind, + metadata, + queue, + tags, + unique_key as "uniqueKey", + unique_states as "uniqueStates"`; // Implements the RiverQueue Driver interface using the 'pg' library. export default class PgDriver implements Driver { @@ -55,110 +74,6 @@ export default class PgDriver implements Driver { return this.insertWithQueryExecutor(tx, args, opts); } - private async insertWithQueryExecutor( - executor: QueryExecutor, - args: T, - opts: InsertOpts, - ): Promise> { - if (!opts.maxAttempts) { - throw new Error('maxAttempts is required in InsertOpts'); - } - - let uniqueKey: Buffer | undefined; - if (opts.uniqueOpts) { - uniqueKey = mapToUniqueKey(args, opts); - } - - if (uniqueKey) { - const stateList = opts.uniqueOpts?.byState || []; - - let query = 'SELECT * FROM river_job WHERE unique_key = $1'; - let values: (Buffer | string[])[] = [uniqueKey]; - - if (stateList.length > 0) { - query += ' AND state = ANY($2)'; - values.push(stateList); - } - - query += ' LIMIT 1'; - - const result = await executor.query< - Omit & { uniqueStates: Buffer | null } - >(query, values); - - if (result.rows.length > 0) { - const row = result.rows[0]; - - return this.mapRowToInsertResult(row, true); - } - } - - const { kind, ...restArgs } = args; - const columns = ['kind', 'args', 'queue', 'max_attempts']; - const values: (string | number | Buffer)[] = [ - kind, - JSON.stringify(restArgs), - opts.queue, - opts.maxAttempts, - ]; - - if (opts.tags) { - columns.push('tags'); - values.push(JSON.stringify(opts.tags)); - } - - if (opts.priority) { - columns.push('priority'); - values.push(opts.priority); - } - - if (opts.metadata) { - columns.push('metadata'); - values.push(JSON.stringify(opts.metadata)); - } - - if (opts.scheduledAt) { - columns.push('scheduled_at'); - values.push( - opts.scheduledAt instanceof Date ? opts.scheduledAt.toISOString() : opts.scheduledAt, - ); - } - - if (uniqueKey) { - columns.push('unique_key'); - values.push(uniqueKey); - } - - const placeholders = columns.map((_, i) => `$${i + 1}`); - const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES (${placeholders.join(', ')}) RETURNING - id, - state, - attempt, - max_attempts as "maxAttempts", - attempted_at as "attemptedAt", - created_at as "createdAt", - finalized_at as "finalizedAt", - scheduled_at as "scheduledAt", - priority, - args, - attempted_by as "attemptedBy", - errors, - kind, - metadata, - queue, - tags, - unique_key as "uniqueKey", - unique_states as "uniqueStates"`; - const result = await executor.query< - Omit & { uniqueStates: Buffer | null } - >(query, values); - - const row = result.rows[0]; - if (!row) return row; - - return this.mapRowToInsertResult(row, false); - } - async insertMany( jobs: { args: T; opts: InsertOpts }[], ): Promise[]> { @@ -167,10 +82,46 @@ export default class PgDriver implements Driver { await client.query('BEGIN'); const results: InsertResult[] = []; - for (const job of jobs) { - results.push(await this.insertTx(client, job.args, job.opts)); + let batch: { + index: number; + parts: { columns: string[]; values: (string | number | Buffer)[] }; + }[] = []; + let batchColumnKey: string | null = null; + + const flushBatch = async (): Promise => { + if (batch.length === 0) return; + + const batchResults = await this.insertBatchTx( + client, + batch.map((job) => job.parts), + ); + + batch.forEach((job, i) => { + results[job.index] = batchResults[i]; + }); + + batch = []; + batchColumnKey = null; + }; + + for (const [index, job] of jobs.entries()) { + if (job.opts.uniqueOpts) { + await flushBatch(); + results[index] = await this.insertTx(client, job.args, job.opts); + continue; + } + + const parts = this.buildInsertParts(job.args, job.opts); + const columnKey = parts.columns.join('\0'); + if (batchColumnKey && batchColumnKey !== columnKey) { + await flushBatch(); + } + + batch.push({ index, parts }); + batchColumnKey = columnKey; } + await flushBatch(); await client.query('COMMIT'); return results; @@ -266,6 +217,132 @@ export default class PgDriver implements Driver { ); } + private async insertWithQueryExecutor( + executor: Pick, + args: T, + opts: InsertOpts, + ): Promise> { + if (!opts.maxAttempts) { + throw new Error('maxAttempts is required in InsertOpts'); + } + + let uniqueKey: Buffer | undefined; + if (opts.uniqueOpts) { + uniqueKey = mapToUniqueKey(args, opts); + } + + if (uniqueKey) { + const stateList = opts.uniqueOpts?.byState || []; + + let query = 'SELECT * FROM river_job WHERE unique_key = $1'; + let values: (Buffer | string[])[] = [uniqueKey]; + + if (stateList.length > 0) { + query += ' AND state = ANY($2)'; + values.push(stateList); + } + + query += ' LIMIT 1'; + + const result = await executor.query< + Omit & { uniqueStates: Buffer | null } + >(query, values); + + if (result.rows.length > 0) { + const row = result.rows[0]; + + return this.mapRowToInsertResult(row, true); + } + } + + const { columns, values } = this.buildInsertParts(args, opts, uniqueKey); + + const placeholders = columns.map((_, i) => `$${i + 1}`); + const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES (${placeholders.join(', ')}) RETURNING + ${JOB_RETURNING_COLUMNS}`; + const result = await executor.query< + Omit & { uniqueStates: Buffer | null } + >(query, values); + + const row = result.rows[0]; + if (!row) return row; + + return this.mapRowToInsertResult(row, false); + } + + private buildInsertParts( + args: T, + opts: InsertOpts, + uniqueKey?: Buffer, + ): { columns: string[]; values: (string | number | Buffer)[] } { + if (!opts.maxAttempts) { + throw new Error('maxAttempts is required in InsertOpts'); + } + + const { kind, ...restArgs } = args; + const columns = ['kind', 'args', 'queue', 'max_attempts']; + const values: (string | number | Buffer)[] = [ + kind, + JSON.stringify(restArgs), + opts.queue, + opts.maxAttempts, + ]; + + if (opts.tags) { + columns.push('tags'); + values.push(JSON.stringify(opts.tags)); + } + + if (opts.priority) { + columns.push('priority'); + values.push(opts.priority); + } + + if (opts.metadata) { + columns.push('metadata'); + values.push(JSON.stringify(opts.metadata)); + } + + if (opts.scheduledAt) { + columns.push('scheduled_at'); + values.push( + opts.scheduledAt instanceof Date ? opts.scheduledAt.toISOString() : opts.scheduledAt, + ); + } + + if (uniqueKey) { + columns.push('unique_key'); + values.push(uniqueKey); + } + + return { columns, values }; + } + + private async insertBatchTx( + tx: PoolClient, + batch: { columns: string[]; values: (string | number | Buffer)[] }[], + ): Promise[]> { + const columns = batch[0].columns; + const values: (string | number | Buffer)[] = []; + const valueRows = batch.map((job) => { + const placeholders = job.values.map((value) => { + values.push(value); + return `$${values.length}`; + }); + + return `(${placeholders.join(', ')})`; + }); + + const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES ${valueRows.join(', ')} RETURNING + ${JOB_RETURNING_COLUMNS}`; + const result = await tx.query & { uniqueStates: Buffer | null }>( + query, + values, + ); + + return result.rows.map((row) => this.mapRowToInsertResult(row, false)); + } + /** * Helper to map a DB row to a Job and InsertResult. */ diff --git a/src/drivers/pg/types/index.ts b/src/drivers/pg/types/index.ts deleted file mode 100644 index 06be84a..0000000 --- a/src/drivers/pg/types/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -import QueryExecutor from './query-executor'; - -export { QueryExecutor }; diff --git a/src/drivers/pg/types/query-executor.ts b/src/drivers/pg/types/query-executor.ts deleted file mode 100644 index 8aca1e8..0000000 --- a/src/drivers/pg/types/query-executor.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { Pool } from 'pg'; - -/** - * A pg object that can run SQL queries. - */ -type QueryExecutor = Pick; - -export default QueryExecutor; From cd5fcd0cf25631391d24ccf5252091a4d62865a4 Mon Sep 17 00:00:00 2001 From: Odunlami Zacchaeus Date: Wed, 13 May 2026 15:47:03 +0100 Subject: [PATCH 2/2] batch non-unique insertMany jobs --- src/drivers/pg/pg-driver.ts | 75 ++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/src/drivers/pg/pg-driver.ts b/src/drivers/pg/pg-driver.ts index 77a0375..3b0a88d 100644 --- a/src/drivers/pg/pg-driver.ts +++ b/src/drivers/pg/pg-driver.ts @@ -6,6 +6,22 @@ import { bitmaskToJobStates, mapToUniqueKey } from '../../utils'; import Driver from '../driver'; import Options from './pg-options'; +// pg object that can run SQL queries. +type QueryExecutor = Pick; + +// Row returned by pg before unique state bitmasks are decoded. +type PgRow = Omit & { uniqueStates: Buffer | null }; + +// Value type bound into pg insert query parameters. +type PgValue = string | number | Buffer; + +// Column names and bound values for one pg insert row. +type PgInsert = { + columns: string[]; + values: PgValue[]; +}; + +// Shared RETURNING columns for pg job insert queries. const JOB_RETURNING_COLUMNS = ` id, state, @@ -82,10 +98,7 @@ export default class PgDriver implements Driver { await client.query('BEGIN'); const results: InsertResult[] = []; - let batch: { - index: number; - parts: { columns: string[]; values: (string | number | Buffer)[] }; - }[] = []; + let batch: { index: number; insert: PgInsert }[] = []; let batchColumnKey: string | null = null; const flushBatch = async (): Promise => { @@ -93,7 +106,7 @@ export default class PgDriver implements Driver { const batchResults = await this.insertBatchTx( client, - batch.map((job) => job.parts), + batch.map((job) => job.insert), ); batch.forEach((job, i) => { @@ -111,13 +124,13 @@ export default class PgDriver implements Driver { continue; } - const parts = this.buildInsertParts(job.args, job.opts); - const columnKey = parts.columns.join('\0'); + const insert = this.buildPgInsert(job.args, job.opts); + const columnKey = insert.columns.join('\0'); if (batchColumnKey && batchColumnKey !== columnKey) { await flushBatch(); } - batch.push({ index, parts }); + batch.push({ index, insert }); batchColumnKey = columnKey; } @@ -217,8 +230,9 @@ export default class PgDriver implements Driver { ); } + // Inserts one job using either the pool or a transaction client. private async insertWithQueryExecutor( - executor: Pick, + executor: QueryExecutor, args: T, opts: InsertOpts, ): Promise> { @@ -244,9 +258,7 @@ export default class PgDriver implements Driver { query += ' LIMIT 1'; - const result = await executor.query< - Omit & { uniqueStates: Buffer | null } - >(query, values); + const result = await executor.query(query, values); if (result.rows.length > 0) { const row = result.rows[0]; @@ -255,14 +267,12 @@ export default class PgDriver implements Driver { } } - const { columns, values } = this.buildInsertParts(args, opts, uniqueKey); + const { columns, values } = this.buildPgInsert(args, opts, uniqueKey); const placeholders = columns.map((_, i) => `$${i + 1}`); const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES (${placeholders.join(', ')}) RETURNING ${JOB_RETURNING_COLUMNS}`; - const result = await executor.query< - Omit & { uniqueStates: Buffer | null } - >(query, values); + const result = await executor.query(query, values); const row = result.rows[0]; if (!row) return row; @@ -270,23 +280,19 @@ export default class PgDriver implements Driver { return this.mapRowToInsertResult(row, false); } - private buildInsertParts( + // Builds the column list and parameter values for one job insert. + private buildPgInsert( args: T, opts: InsertOpts, uniqueKey?: Buffer, - ): { columns: string[]; values: (string | number | Buffer)[] } { + ): PgInsert { if (!opts.maxAttempts) { throw new Error('maxAttempts is required in InsertOpts'); } const { kind, ...restArgs } = args; const columns = ['kind', 'args', 'queue', 'max_attempts']; - const values: (string | number | Buffer)[] = [ - kind, - JSON.stringify(restArgs), - opts.queue, - opts.maxAttempts, - ]; + const values: PgValue[] = [kind, JSON.stringify(restArgs), opts.queue, opts.maxAttempts]; if (opts.tags) { columns.push('tags'); @@ -318,12 +324,13 @@ export default class PgDriver implements Driver { return { columns, values }; } + // Inserts compatible non-unique jobs with one multi-row INSERT query. private async insertBatchTx( - tx: PoolClient, - batch: { columns: string[]; values: (string | number | Buffer)[] }[], + tx: QueryExecutor, + batch: PgInsert[], ): Promise[]> { const columns = batch[0].columns; - const values: (string | number | Buffer)[] = []; + const values: PgValue[] = []; const valueRows = batch.map((job) => { const placeholders = job.values.map((value) => { values.push(value); @@ -335,21 +342,13 @@ export default class PgDriver implements Driver { const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES ${valueRows.join(', ')} RETURNING ${JOB_RETURNING_COLUMNS}`; - const result = await tx.query & { uniqueStates: Buffer | null }>( - query, - values, - ); + const result = await tx.query(query, values); return result.rows.map((row) => this.mapRowToInsertResult(row, false)); } - /** - * Helper to map a DB row to a Job and InsertResult. - */ - private mapRowToInsertResult( - row: Omit & { uniqueStates: Buffer | null }, - skipped: boolean, - ): InsertResult { + // Maps a pg row to a typed job insert result. + private mapRowToInsertResult(row: PgRow, skipped: boolean): InsertResult { return { job: { ...row,