Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 190 additions & 114 deletions src/drivers/pg/pg-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,42 @@ import { InsertOpts, InsertResult, Job, JobArgs, JobState } from '../../types';
import { bitmaskToJobStates, mapToUniqueKey } from '../../utils';
import Driver from '../driver';
import Options from './pg-options';
import { QueryExecutor } from './types';

// pg object that can run SQL queries.
type QueryExecutor = Pick<Pool, 'query'>;

// Row returned by pg before unique state bitmasks are decoded.
type PgRow = Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null };

// Value type bound into pg insert query parameters.
type PgValue = string | number | Buffer;

// Column names and bound values for one pg insert row.
type PgInsert = {
columns: string[];
values: PgValue[];
};

// Shared RETURNING columns for pg job insert queries.
const JOB_RETURNING_COLUMNS = `
id,
state,
attempt,
max_attempts as "maxAttempts",
attempted_at as "attemptedAt",
created_at as "createdAt",
finalized_at as "finalizedAt",
scheduled_at as "scheduledAt",
priority,
args,
attempted_by as "attemptedBy",
errors,
kind,
metadata,
queue,
tags,
unique_key as "uniqueKey",
unique_states as "uniqueStates"`;

// Implements the RiverQueue Driver interface using the 'pg' library.
export default class PgDriver implements Driver<PoolClient> {
Expand Down Expand Up @@ -55,110 +90,6 @@ export default class PgDriver implements Driver<PoolClient> {
return this.insertWithQueryExecutor(tx, args, opts);
}

private async insertWithQueryExecutor<T extends JobArgs>(
executor: QueryExecutor,
args: T,
opts: InsertOpts,
): Promise<InsertResult<T>> {
if (!opts.maxAttempts) {
throw new Error('maxAttempts is required in InsertOpts');
}

let uniqueKey: Buffer | undefined;
if (opts.uniqueOpts) {
uniqueKey = mapToUniqueKey(args, opts);
}

if (uniqueKey) {
const stateList = opts.uniqueOpts?.byState || [];

let query = 'SELECT * FROM river_job WHERE unique_key = $1';
let values: (Buffer | string[])[] = [uniqueKey];

if (stateList.length > 0) {
query += ' AND state = ANY($2)';
values.push(stateList);
}

query += ' LIMIT 1';

const result = await executor.query<
Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null }
>(query, values);

if (result.rows.length > 0) {
const row = result.rows[0];

return this.mapRowToInsertResult(row, true);
}
}

const { kind, ...restArgs } = args;
const columns = ['kind', 'args', 'queue', 'max_attempts'];
const values: (string | number | Buffer)[] = [
kind,
JSON.stringify(restArgs),
opts.queue,
opts.maxAttempts,
];

if (opts.tags) {
columns.push('tags');
values.push(JSON.stringify(opts.tags));
}

if (opts.priority) {
columns.push('priority');
values.push(opts.priority);
}

if (opts.metadata) {
columns.push('metadata');
values.push(JSON.stringify(opts.metadata));
}

if (opts.scheduledAt) {
columns.push('scheduled_at');
values.push(
opts.scheduledAt instanceof Date ? opts.scheduledAt.toISOString() : opts.scheduledAt,
);
}

if (uniqueKey) {
columns.push('unique_key');
values.push(uniqueKey);
}

const placeholders = columns.map((_, i) => `$${i + 1}`);
const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES (${placeholders.join(', ')}) RETURNING
id,
state,
attempt,
max_attempts as "maxAttempts",
attempted_at as "attemptedAt",
created_at as "createdAt",
finalized_at as "finalizedAt",
scheduled_at as "scheduledAt",
priority,
args,
attempted_by as "attemptedBy",
errors,
kind,
metadata,
queue,
tags,
unique_key as "uniqueKey",
unique_states as "uniqueStates"`;
const result = await executor.query<
Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null }
>(query, values);

const row = result.rows[0];
if (!row) return row;

return this.mapRowToInsertResult(row, false);
}

async insertMany<T extends JobArgs>(
jobs: { args: T; opts: InsertOpts }[],
): Promise<InsertResult<T>[]> {
Expand All @@ -167,10 +98,43 @@ export default class PgDriver implements Driver<PoolClient> {
await client.query('BEGIN');

const results: InsertResult<T>[] = [];
for (const job of jobs) {
results.push(await this.insertTx(client, job.args, job.opts));
let batch: { index: number; insert: PgInsert }[] = [];
let batchColumnKey: string | null = null;

const flushBatch = async (): Promise<void> => {
if (batch.length === 0) return;

const batchResults = await this.insertBatchTx<T>(
client,
batch.map((job) => job.insert),
);

batch.forEach((job, i) => {
results[job.index] = batchResults[i];
});

batch = [];
batchColumnKey = null;
};

for (const [index, job] of jobs.entries()) {
if (job.opts.uniqueOpts) {
await flushBatch();
results[index] = await this.insertTx(client, job.args, job.opts);
continue;
}

const insert = this.buildPgInsert(job.args, job.opts);
const columnKey = insert.columns.join('\0');
if (batchColumnKey && batchColumnKey !== columnKey) {
await flushBatch();
}

batch.push({ index, insert });
batchColumnKey = columnKey;
}

await flushBatch();
await client.query('COMMIT');

return results;
Expand Down Expand Up @@ -266,13 +230,125 @@ export default class PgDriver implements Driver<PoolClient> {
);
}

/**
* Helper to map a DB row to a Job<T> and InsertResult.
*/
private mapRowToInsertResult<T extends JobArgs>(
row: Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null },
skipped: boolean,
): InsertResult<T> {
// Inserts one job using either the pool or a transaction client.
private async insertWithQueryExecutor<T extends JobArgs>(
executor: QueryExecutor,
args: T,
opts: InsertOpts,
): Promise<InsertResult<T>> {
if (!opts.maxAttempts) {
throw new Error('maxAttempts is required in InsertOpts');
}

let uniqueKey: Buffer | undefined;
if (opts.uniqueOpts) {
uniqueKey = mapToUniqueKey(args, opts);
}

if (uniqueKey) {
const stateList = opts.uniqueOpts?.byState || [];

let query = 'SELECT * FROM river_job WHERE unique_key = $1';
let values: (Buffer | string[])[] = [uniqueKey];

if (stateList.length > 0) {
query += ' AND state = ANY($2)';
values.push(stateList);
}

query += ' LIMIT 1';

const result = await executor.query<PgRow>(query, values);

if (result.rows.length > 0) {
const row = result.rows[0];

return this.mapRowToInsertResult(row, true);
}
}

const { columns, values } = this.buildPgInsert(args, opts, uniqueKey);

const placeholders = columns.map((_, i) => `$${i + 1}`);
const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES (${placeholders.join(', ')}) RETURNING
${JOB_RETURNING_COLUMNS}`;
const result = await executor.query<PgRow>(query, values);

const row = result.rows[0];
if (!row) return row;

return this.mapRowToInsertResult(row, false);
}

// Builds the column list and parameter values for one job insert.
private buildPgInsert<T extends JobArgs>(
args: T,
opts: InsertOpts,
uniqueKey?: Buffer,
): PgInsert {
if (!opts.maxAttempts) {
throw new Error('maxAttempts is required in InsertOpts');
}

const { kind, ...restArgs } = args;
const columns = ['kind', 'args', 'queue', 'max_attempts'];
const values: PgValue[] = [kind, JSON.stringify(restArgs), opts.queue, opts.maxAttempts];

if (opts.tags) {
columns.push('tags');
values.push(JSON.stringify(opts.tags));
}

if (opts.priority) {
columns.push('priority');
values.push(opts.priority);
}

if (opts.metadata) {
columns.push('metadata');
values.push(JSON.stringify(opts.metadata));
}

if (opts.scheduledAt) {
columns.push('scheduled_at');
values.push(
opts.scheduledAt instanceof Date ? opts.scheduledAt.toISOString() : opts.scheduledAt,
);
}

if (uniqueKey) {
columns.push('unique_key');
values.push(uniqueKey);
}

return { columns, values };
}

// Inserts compatible non-unique jobs with one multi-row INSERT query.
private async insertBatchTx<T extends JobArgs>(
tx: QueryExecutor,
batch: PgInsert[],
): Promise<InsertResult<T>[]> {
const columns = batch[0].columns;
const values: PgValue[] = [];
const valueRows = batch.map((job) => {
const placeholders = job.values.map((value) => {
values.push(value);
return `$${values.length}`;
});

return `(${placeholders.join(', ')})`;
});

const query = `INSERT INTO river_job (${columns.join(', ')}) VALUES ${valueRows.join(', ')} RETURNING
${JOB_RETURNING_COLUMNS}`;
const result = await tx.query<PgRow>(query, values);

return result.rows.map((row) => this.mapRowToInsertResult<T>(row, false));
}

// Maps a pg row to a typed job insert result.
private mapRowToInsertResult<T extends JobArgs>(row: PgRow, skipped: boolean): InsertResult<T> {
return {
job: {
...row,
Expand Down
3 changes: 0 additions & 3 deletions src/drivers/pg/types/index.ts

This file was deleted.

8 changes: 0 additions & 8 deletions src/drivers/pg/types/query-executor.ts

This file was deleted.

Loading