Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/drivers/pg/pg-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { InsertOpts, InsertResult, Job, JobArgs, JobState } from '../../types';
import { bitmaskToJobStates, mapToUniqueKey } from '../../utils';
import Driver from '../driver';
import Options from './pg-options';
import { QueryExecutor } from './types';

// Implements the RiverQueue Driver interface using the 'pg' library.
export default class PgDriver implements Driver<PoolClient> {
Expand Down Expand Up @@ -43,18 +44,21 @@ export default class PgDriver implements Driver<PoolClient> {
}

async insert<T extends JobArgs>(args: T, opts: InsertOpts): Promise<InsertResult<T>> {
const client = await this.pool.connect();
try {
return await this.insertTx(client, args, opts);
} finally {
client.release();
}
return this.insertWithQueryExecutor(this.pool, args, opts);
}

async insertTx<T extends JobArgs>(
tx: PoolClient,
args: T,
opts: InsertOpts,
): Promise<InsertResult<T>> {
return this.insertWithQueryExecutor(tx, args, opts);
}

private async insertWithQueryExecutor<T extends JobArgs>(
executor: QueryExecutor,
args: T,
opts: InsertOpts,
): Promise<InsertResult<T>> {
if (!opts.maxAttempts) {
throw new Error('maxAttempts is required in InsertOpts');
Expand All @@ -78,10 +82,9 @@ export default class PgDriver implements Driver<PoolClient> {

query += ' LIMIT 1';

const result = await tx.query<Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null }>(
query,
values,
);
const result = await executor.query<
Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null }
>(query, values);

if (result.rows.length > 0) {
const row = result.rows[0];
Expand Down Expand Up @@ -146,10 +149,9 @@ export default class PgDriver implements Driver<PoolClient> {
tags,
unique_key as "uniqueKey",
unique_states as "uniqueStates"`;
const result = await tx.query<Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null }>(
query,
values,
);
const result = await executor.query<
Omit<Job, 'uniqueStates'> & { uniqueStates: Buffer | null }
>(query, values);

const row = result.rows[0];
if (!row) return row;
Expand Down
3 changes: 3 additions & 0 deletions src/drivers/pg/types/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import QueryExecutor from './query-executor';

export { QueryExecutor };
8 changes: 8 additions & 0 deletions src/drivers/pg/types/query-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { Pool } from 'pg';

/**
* A pg object that can run SQL queries.
*/
type QueryExecutor = Pick<Pool, 'query'>;

export default QueryExecutor;
Loading