From 4de26039c39a493d8d6bf0216291973c0693d12b Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 10:38:43 +0800 Subject: [PATCH 01/12] Add test job to the CI workflow --- .github/workflows/ci.yml | 37 +++++++++++++++++++++++++++++++++++++ LICENCE | 1 + README.md | 10 +++++----- package.json | 2 +- src/db/index.ts | 3 ++- 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f93a651..a004a4c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,3 +34,40 @@ jobs: - name: Typecheck run: pnpm typecheck + + test: + runs-on: ubuntu-latest + + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: queue + ports: + - 3306:3306 + options: >- + --health-cmd="mysqladmin ping" + --health-interval=10s + --health-timeout=5s + --health-retries=3 + + steps: + - uses: actions/checkout@v4 + - uses: pnpm/action-setup@v2 + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: "pnpm" + registry-url: https://registry.npmjs.org + + - shell: bash + run: corepack enable + + - shell: bash + run: pnpm install + + - name: Run tests + run: pnpm test + env: + DATABASE_URL: mysql://root:root@localhost:3306/queue diff --git a/LICENCE b/LICENCE index f0f108c..3e4facd 100644 --- a/LICENCE +++ b/LICENCE @@ -1,3 +1,4 @@ +Copyright 2024 lilac Copyright 2024 Mohamed Bassem Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/README.md b/README.md index 40b582e..8130738 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Liteque +# MySQL Lite Queue -![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/hoarder-app/liteque/ci.yml) ![NPM Version](https://img.shields.io/npm/v/liteque) +![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/rabrain/mysql-queue/ci.yml) ![NPM Version](https://img.shields.io/npm/v/mysql-queue) A simple typesafe mysql-based job queue for Node.js. @@ -14,10 +14,10 @@ $ npm install mysql-queue ## Usage ```ts -import { buildDBClient, Runner, SqliteQueue } from "liteque"; +import { buildDBClient, Runner, LiteQueue } from "mysql-queue"; import { z } from "zod"; -const db = buildDBClient(":memory:", true); +const db = buildDBClient("mysql://root:root@localhost:3306/queue", true); const requestSchema = z.object({ message: z.string(), @@ -25,7 +25,7 @@ const requestSchema = z.object({ const ZRequest = z.infer; // Init the queue -const queue = new SqliteQueue("requests", db, { +const queue = new LiteQueue("requests", db, { defaultJobArgs: { numRetries: 2, }, diff --git a/package.json b/package.json index 9bfe22f..41d5359 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "license": "MIT", "repository": { "type": "git", - "url": "git+https://github.com/hoarder-app/liteque.git" + "url": "git+https://github.com/rabrain/mysql-queue.git" }, "keywords": [ "queue" diff --git a/src/db/index.ts b/src/db/index.ts index 264875a..025fcb5 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -2,13 +2,14 @@ import { MySql2Database, type MySqlRawQueryResult, drizzle } from "drizzle-orm/m import { migrate } from "drizzle-orm/mysql2/migrator"; import mysql from "mysql2/promise"; import path from "node:path"; +import { env } from 'node:process'; import * as schema from "./schema"; export const affectedRows = (rawResult: MySqlRawQueryResult) => { return rawResult[0].affectedRows }; -const defaultURL = 'mysql://root:root@localhost:3306/queue' +const defaultURL = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' export function buildDBClient(url?: string, runMigrations = false) { const connection = mysql.createPool(url ?? defaultURL); From 959da3656385bd7e92d74f7c6608ff7fee79f201 Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 12:19:35 +0800 Subject: [PATCH 02/12] Migrate database before creating queues --- package.json | 2 +- src/db/index.ts | 14 ++++++++------ src/index.ts | 2 +- src/queue.test.ts | 8 ++++---- src/queue.ts | 6 +++--- src/runner.test.ts | 14 +++++++------- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/package.json b/package.json index 41d5359..59d8eb1 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "name": "mysql-queue", "description": "A lite job queue for Node.js", "author": "lilac ", - "version": "0.1.0", + "version": "0.1.1", "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/db/index.ts b/src/db/index.ts index 025fcb5..f7dfae4 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -11,18 +11,20 @@ export const affectedRows = (rawResult: MySqlRawQueryResult) => { const defaultURL = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' -export function buildDBClient(url?: string, runMigrations = false) { +export async function prepareDB(url?: string) { + const db = connect(url); + await migrateDB(db); + return db; +} + +export function connect(url: string | undefined) { const connection = mysql.createPool(url ?? defaultURL); const db = drizzle(connection, { schema, mode: 'planetscale' }); - - if (runMigrations) { - migrateDB(db); - } return db; } export function migrateDB(db: MySql2Database) { - migrate(db, { + return migrate(db, { migrationsFolder: path.join(__dirname, '../drizzle') }); } diff --git a/src/index.ts b/src/index.ts index f2c5b04..e5503d7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ export { LiteQueue } from "./queue"; -export { buildDBClient, migrateDB } from "./db"; +export { connect, prepareDB, migrateDB } from "./db"; export type { QueueOptions, RunnerOptions, RunnerFuncs } from "./options"; export { Runner } from "./runner"; diff --git a/src/queue.test.ts b/src/queue.test.ts index e75e5ea..a6c9fad 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -2,7 +2,7 @@ import { describe, expect, test } from "vitest"; import { - buildDBClient, + prepareDB, LiteQueue } from "./"; @@ -16,7 +16,7 @@ describe("LiteQueue", () => { test("idempotency keys", async () => { const queue = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 0, @@ -43,7 +43,7 @@ describe("LiteQueue", () => { test("keep failed jobs", async () => { const queueKeep = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 0, @@ -54,7 +54,7 @@ describe("LiteQueue", () => { const queueDontKeep = new LiteQueue( "queue2", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 0, diff --git a/src/queue.ts b/src/queue.ts index 0bba34e..7014649 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,7 +1,7 @@ import assert from "node:assert"; import { and, asc, count, eq, gt, lt, or, sql } from "drizzle-orm"; -import { affectedRows, buildDBClient } from "./db"; +import { affectedRows, connect } from "./db"; import { EnqueueOptions, QueueOptions } from "./options"; import { Job, tasksTable } from "./db/schema"; @@ -12,12 +12,12 @@ function generateAllocationId() { export class LiteQueue { queueName: string; - db: ReturnType; + db: ReturnType; options: QueueOptions; constructor( name: string, - db: ReturnType, + db: ReturnType, options: QueueOptions, ) { this.queueName = name; diff --git a/src/runner.test.ts b/src/runner.test.ts index 5b0ef90..c2596a8 100644 --- a/src/runner.test.ts +++ b/src/runner.test.ts @@ -5,7 +5,7 @@ import { describe, expect, test } from "vitest"; import { z } from "zod"; import { - buildDBClient, + prepareDB, DequeuedJob, DequeuedJobError, Runner, @@ -144,7 +144,7 @@ describe("SqiteQueueRunner", () => { test("should run jobs with correct concurrency", async () => { const queue = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 0, @@ -206,7 +206,7 @@ describe("SqiteQueueRunner", () => { test("should retry errors", async () => { const queue = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 2, @@ -244,7 +244,7 @@ describe("SqiteQueueRunner", () => { test("timeouts are respected", async () => { const queue = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 1, @@ -280,7 +280,7 @@ describe("SqiteQueueRunner", () => { test("serialization errors", async () => { const queue = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 1, @@ -325,7 +325,7 @@ describe("SqiteQueueRunner", () => { test("concurrent runners", async () => { const queue = new LiteQueue( "queue1", - buildDBClient(undefined, true), + await prepareDB(), { defaultJobArgs: { numRetries: 0, @@ -387,7 +387,7 @@ describe("SqiteQueueRunner", () => { }); test("large test", async () => { - const db = buildDBClient(undefined, true); + const db = await prepareDB(); const queue1 = new LiteQueue("queue1", db, { defaultJobArgs: { numRetries: 0, From aec703f8a5e1c3cb9c3bf3f2b1bb201a704fc10c Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 12:19:35 +0800 Subject: [PATCH 03/12] Change columns to varchar --- src/db/schema.ts | 10 ++++++---- ...stiff_martin_li.sql => 0000_cynical_joystick.sql} | 8 ++++---- src/drizzle/meta/0000_snapshot.json | 12 ++++++------ src/drizzle/meta/_journal.json | 4 ++-- 4 files changed, 18 insertions(+), 16 deletions(-) rename src/drizzle/{0000_stiff_martin_li.sql => 0000_cynical_joystick.sql} (84%) diff --git a/src/db/schema.ts b/src/db/schema.ts index 99c4ff7..fbbe05e 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -5,6 +5,7 @@ import { timestamp, mysqlTable, unique, + varchar, } from "drizzle-orm/mysql-core"; export const createTable = mysqlTable; @@ -19,19 +20,20 @@ export const tasksTable = createTable( "tasks", { id: integer("id").notNull().primaryKey().autoincrement(), - queue: text("queue").notNull(), + queue: varchar("queue", { length: 255 }).notNull(), payload: text("payload").notNull(), createdAt: createdAtField(), - status: text("status", { + status: varchar("status", { + length: 50, enum: ["pending", "running", "pending_retry", "failed"], }) .notNull() .default("pending"), expireAt: timestamp("expireAt", { mode: "date" }), - allocationId: text("allocationId").notNull(), + allocationId: varchar("allocationId", { length: 50 }).notNull(), numRunsLeft: integer("numRunsLeft").notNull(), maxNumRuns: integer("maxNumRuns").notNull(), - idempotencyKey: text("idempotencyKey"), + idempotencyKey: varchar("idempotencyKey", { length: 255 }), }, (tasks) => ({ queueIdx: index("tasks_queue_idx").on(tasks.queue), diff --git a/src/drizzle/0000_stiff_martin_li.sql b/src/drizzle/0000_cynical_joystick.sql similarity index 84% rename from src/drizzle/0000_stiff_martin_li.sql rename to src/drizzle/0000_cynical_joystick.sql index da3ea94..cc8eaf6 100644 --- a/src/drizzle/0000_stiff_martin_li.sql +++ b/src/drizzle/0000_cynical_joystick.sql @@ -1,14 +1,14 @@ CREATE TABLE `tasks` ( `id` int AUTO_INCREMENT NOT NULL, - `queue` text NOT NULL, + `queue` varchar(255) NOT NULL, `payload` text NOT NULL, `createdAt` timestamp NOT NULL, - `status` text NOT NULL DEFAULT ('pending'), + `status` varchar(50) NOT NULL DEFAULT 'pending', `expireAt` timestamp, - `allocationId` text NOT NULL, + `allocationId` varchar(50) NOT NULL, `numRunsLeft` int NOT NULL, `maxNumRuns` int NOT NULL, - `idempotencyKey` text, + `idempotencyKey` varchar(255), CONSTRAINT `tasks_id` PRIMARY KEY(`id`), CONSTRAINT `tasks_queue_idempotencyKey_unique` UNIQUE(`queue`,`idempotencyKey`) ); diff --git a/src/drizzle/meta/0000_snapshot.json b/src/drizzle/meta/0000_snapshot.json index 27c71b5..a2a2109 100644 --- a/src/drizzle/meta/0000_snapshot.json +++ b/src/drizzle/meta/0000_snapshot.json @@ -1,7 +1,7 @@ { "version": "5", "dialect": "mysql", - "id": "d5ac27ee-fb1f-4d8b-977d-23ef372e91b0", + "id": "5e96ed89-0bdf-4c0d-b7d9-4ab6d0111973", "prevId": "00000000-0000-0000-0000-000000000000", "tables": { "tasks": { @@ -16,7 +16,7 @@ }, "queue": { "name": "queue", - "type": "text", + "type": "varchar(255)", "primaryKey": false, "notNull": true, "autoincrement": false @@ -37,11 +37,11 @@ }, "status": { "name": "status", - "type": "text", + "type": "varchar(50)", "primaryKey": false, "notNull": true, "autoincrement": false, - "default": "('pending')" + "default": "'pending'" }, "expireAt": { "name": "expireAt", @@ -52,7 +52,7 @@ }, "allocationId": { "name": "allocationId", - "type": "text", + "type": "varchar(50)", "primaryKey": false, "notNull": true, "autoincrement": false @@ -73,7 +73,7 @@ }, "idempotencyKey": { "name": "idempotencyKey", - "type": "text", + "type": "varchar(255)", "primaryKey": false, "notNull": false, "autoincrement": false diff --git a/src/drizzle/meta/_journal.json b/src/drizzle/meta/_journal.json index 1fddda4..3bc4b4c 100644 --- a/src/drizzle/meta/_journal.json +++ b/src/drizzle/meta/_journal.json @@ -5,8 +5,8 @@ { "idx": 0, "version": "5", - "when": 1734791710708, - "tag": "0000_stiff_martin_li", + "when": 1734842416630, + "tag": "0000_cynical_joystick", "breakpoints": true } ] From f6fa61514e6649f15e886320c8891fbca5044ca4 Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 14:00:36 +0800 Subject: [PATCH 04/12] Dequeue updated job --- src/queue.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/queue.ts b/src/queue.ts index 7014649..a4d04cb 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -117,14 +117,15 @@ export class LiteQueue { assert(jobs.length == 1); const job = jobs[0]; + const change = { + status: "running" as Job["status"], + numRunsLeft: job.numRunsLeft - 1, + allocationId: generateAllocationId(), + expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), + }; const result = await txn .update(tasksTable) - .set({ - status: "running", - numRunsLeft: job.numRunsLeft - 1, - allocationId: generateAllocationId(), - expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), - }) + .set(change) .where( and( eq(tasksTable.id, job.id), @@ -138,7 +139,7 @@ export class LiteQueue { return null; } assert(rows == 1); - return job; + return Object.assign(job, change); }); } From 1963889e336257e9dacaae88ecca55907a8c86bf Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 14:36:30 +0800 Subject: [PATCH 05/12] Use a single database client --- src/db/index.ts | 18 ++++++++++++------ src/queue.ts | 6 +++--- src/runner.test.ts | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/db/index.ts b/src/db/index.ts index f7dfae4..4cf6e58 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -5,21 +5,27 @@ import path from "node:path"; import { env } from 'node:process'; import * as schema from "./schema"; +export type Database = MySql2Database; + export const affectedRows = (rawResult: MySqlRawQueryResult) => { return rawResult[0].affectedRows }; -const defaultURL = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' +const defaultUrl = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' + +let db: Database; export async function prepareDB(url?: string) { - const db = connect(url); - await migrateDB(db); + if (!db) { + db = await connect(url ?? defaultUrl); + await migrateDB(db); + } return db; } -export function connect(url: string | undefined) { - const connection = mysql.createPool(url ?? defaultURL); - const db = drizzle(connection, { schema, mode: 'planetscale' }); +export async function connect(url: string) { + const connection = await mysql.createConnection(url); + const db = drizzle(connection, { schema, mode: 'default' }); return db; } diff --git a/src/queue.ts b/src/queue.ts index a4d04cb..024fd1e 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,7 +1,7 @@ import assert from "node:assert"; import { and, asc, count, eq, gt, lt, or, sql } from "drizzle-orm"; -import { affectedRows, connect } from "./db"; +import { affectedRows, type Database } from "./db"; import { EnqueueOptions, QueueOptions } from "./options"; import { Job, tasksTable } from "./db/schema"; @@ -12,12 +12,12 @@ function generateAllocationId() { export class LiteQueue { queueName: string; - db: ReturnType; + db: Database; options: QueueOptions; constructor( name: string, - db: ReturnType, + db: Database, options: QueueOptions, ) { this.queueName = name; diff --git a/src/runner.test.ts b/src/runner.test.ts index c2596a8..9d35e55 100644 --- a/src/runner.test.ts +++ b/src/runner.test.ts @@ -140,7 +140,7 @@ function buildRunner( return { runner, results }; } -describe("SqiteQueueRunner", () => { +describe("Queue Runner", () => { test("should run jobs with correct concurrency", async () => { const queue = new LiteQueue( "queue1", From 78c1f54b9f0c649a3d295d470fa4a388d2f9a37d Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 18:34:02 +0800 Subject: [PATCH 06/12] Upgrade drizzle-orm and kit --- package.json | 4 ++-- pnpm-lock.yaml | 33 ++++++++++++++++++--------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/package.json b/package.json index 59d8eb1..8d7ebb4 100644 --- a/package.json +++ b/package.json @@ -22,14 +22,14 @@ ], "dependencies": { "async-mutex": "^0.4.1", - "drizzle-orm": "^0.33.0", + "drizzle-orm": "^0.38.2", "eslint": "^9.17.0", "mysql2": "^3.11.5", "zod": "^3.22.4" }, "devDependencies": { "@tsconfig/node21": "^21.0.3", - "drizzle-kit": "^0.24.02", + "drizzle-kit": "^0.30.1", "typescript": "^5.6.3", "vitest": "^1.3.1" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cb07dad..1d35ece 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,8 +12,8 @@ importers: specifier: ^0.4.1 version: 0.4.1 drizzle-orm: - specifier: ^0.33.0 - version: 0.33.0(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5) + specifier: ^0.38.2 + version: 0.38.2(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5) eslint: specifier: ^9.17.0 version: 9.17.0 @@ -28,8 +28,8 @@ importers: specifier: ^21.0.3 version: 21.0.3 drizzle-kit: - specifier: ^0.24.02 - version: 0.24.2 + specifier: ^0.30.1 + version: 0.30.1 typescript: specifier: ^5.6.3 version: 5.6.3 @@ -783,18 +783,19 @@ packages: resolution: {integrity: sha512-EjePK1srD3P08o2j4f0ExnylqRs5B9tJjcp9t1krH2qRi8CCdsYfwe9JgSLurFBWwq4uOlipzfk5fHNvwFKr8Q==} engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} - drizzle-kit@0.24.2: - resolution: {integrity: sha512-nXOaTSFiuIaTMhS8WJC2d4EBeIcN9OSt2A2cyFbQYBAZbi7lRsVGJNqDpEwPqYfJz38yxbY/UtbvBBahBfnExQ==} + drizzle-kit@0.30.1: + resolution: {integrity: sha512-HmA/NeewvHywhJ2ENXD3KvOuM/+K2dGLJfxVfIHsGwaqKICJnS+Ke2L6UcSrSrtMJLJaT0Im1Qv4TFXfaZShyw==} hasBin: true - drizzle-orm@0.33.0: - resolution: {integrity: sha512-SHy72R2Rdkz0LEq0PSG/IdvnT3nGiWuRk+2tXZQ90GVq/XQhpCzu/EFT3V2rox+w8MlkBQxifF8pCStNYnERfA==} + drizzle-orm@0.38.2: + resolution: {integrity: sha512-eCE3yPRAskLo1WpM9OHpFaM70tBEDsWhwR/0M3CKyztAXKR9Qs3asZlcJOEliIcUSg8GuwrlY0dmYDgmm6y5GQ==} peerDependencies: '@aws-sdk/client-rds-data': '>=3' - '@cloudflare/workers-types': '>=3' - '@electric-sql/pglite': '>=0.1.1' - '@libsql/client': '*' - '@neondatabase/serverless': '>=0.1' + '@cloudflare/workers-types': '>=4' + '@electric-sql/pglite': '>=0.2.0' + '@libsql/client': '>=0.10.0' + '@libsql/client-wasm': '>=0.10.0' + '@neondatabase/serverless': '>=0.10.0' '@op-engineering/op-sqlite': '>=2' '@opentelemetry/api': ^1.4.1 '@planetscale/database': '>=1' @@ -808,7 +809,7 @@ packages: '@xata.io/client': '*' better-sqlite3: '>=7' bun-types: '*' - expo-sqlite: '>=13.2.0' + expo-sqlite: '>=14.0.0' knex: '*' kysely: '*' mysql2: '>=2' @@ -827,6 +828,8 @@ packages: optional: true '@libsql/client': optional: true + '@libsql/client-wasm': + optional: true '@neondatabase/serverless': optional: true '@op-engineering/op-sqlite': @@ -2005,7 +2008,7 @@ snapshots: diff-sequences@29.6.3: {} - drizzle-kit@0.24.2: + drizzle-kit@0.30.1: dependencies: '@drizzle-team/brocli': 0.10.2 '@esbuild-kit/esm-loader': 2.6.5 @@ -2014,7 +2017,7 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.33.0(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5): + drizzle-orm@0.38.2(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5): optionalDependencies: '@types/better-sqlite3': 7.6.11 better-sqlite3: 11.5.0 From 901df64f661a0412ddec9478c2443e3f584550f0 Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 19:31:37 +0800 Subject: [PATCH 07/12] Initialize database as module singleton for test purpose --- src/db/index.ts | 13 ------------- src/index.ts | 2 +- src/queue.test.ts | 8 ++++---- src/runner.test.ts | 15 +++++++-------- src/test.ts | 13 +++++++++++++ 5 files changed, 25 insertions(+), 26 deletions(-) create mode 100644 src/test.ts diff --git a/src/db/index.ts b/src/db/index.ts index 4cf6e58..743920f 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -2,7 +2,6 @@ import { MySql2Database, type MySqlRawQueryResult, drizzle } from "drizzle-orm/m import { migrate } from "drizzle-orm/mysql2/migrator"; import mysql from "mysql2/promise"; import path from "node:path"; -import { env } from 'node:process'; import * as schema from "./schema"; export type Database = MySql2Database; @@ -11,18 +10,6 @@ export const affectedRows = (rawResult: MySqlRawQueryResult) => { return rawResult[0].affectedRows }; -const defaultUrl = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' - -let db: Database; - -export async function prepareDB(url?: string) { - if (!db) { - db = await connect(url ?? defaultUrl); - await migrateDB(db); - } - return db; -} - export async function connect(url: string) { const connection = await mysql.createConnection(url); const db = drizzle(connection, { schema, mode: 'default' }); diff --git a/src/index.ts b/src/index.ts index e5503d7..587bbe0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ export { LiteQueue } from "./queue"; -export { connect, prepareDB, migrateDB } from "./db"; +export { connect, migrateDB } from "./db"; export type { QueueOptions, RunnerOptions, RunnerFuncs } from "./options"; export { Runner } from "./runner"; diff --git a/src/queue.test.ts b/src/queue.test.ts index a6c9fad..f722d59 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -2,9 +2,9 @@ import { describe, expect, test } from "vitest"; import { - prepareDB, LiteQueue } from "./"; +import { db } from "./test"; interface Work { increment: number; @@ -16,7 +16,7 @@ describe("LiteQueue", () => { test("idempotency keys", async () => { const queue = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 0, @@ -43,7 +43,7 @@ describe("LiteQueue", () => { test("keep failed jobs", async () => { const queueKeep = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 0, @@ -54,7 +54,7 @@ describe("LiteQueue", () => { const queueDontKeep = new LiteQueue( "queue2", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 0, diff --git a/src/runner.test.ts b/src/runner.test.ts index 9d35e55..1543512 100644 --- a/src/runner.test.ts +++ b/src/runner.test.ts @@ -5,7 +5,6 @@ import { describe, expect, test } from "vitest"; import { z } from "zod"; import { - prepareDB, DequeuedJob, DequeuedJobError, Runner, @@ -13,6 +12,7 @@ import { LiteQueue, } from "./"; import { tasksTable } from "./db/schema"; +import { db } from "./test"; class Baton { semaphore: Semaphore; @@ -140,11 +140,11 @@ function buildRunner( return { runner, results }; } -describe("Queue Runner", () => { +describe("Queue Runner", async () => { test("should run jobs with correct concurrency", async () => { const queue = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 0, @@ -206,7 +206,7 @@ describe("Queue Runner", () => { test("should retry errors", async () => { const queue = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 2, @@ -244,7 +244,7 @@ describe("Queue Runner", () => { test("timeouts are respected", async () => { const queue = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 1, @@ -280,7 +280,7 @@ describe("Queue Runner", () => { test("serialization errors", async () => { const queue = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 1, @@ -325,7 +325,7 @@ describe("Queue Runner", () => { test("concurrent runners", async () => { const queue = new LiteQueue( "queue1", - await prepareDB(), + db, { defaultJobArgs: { numRetries: 0, @@ -387,7 +387,6 @@ describe("Queue Runner", () => { }); test("large test", async () => { - const db = await prepareDB(); const queue1 = new LiteQueue("queue1", db, { defaultJobArgs: { numRetries: 0, diff --git a/src/test.ts b/src/test.ts new file mode 100644 index 0000000..5d5fb1f --- /dev/null +++ b/src/test.ts @@ -0,0 +1,13 @@ +import { connect, migrateDB } from "./db"; +import { env } from 'node:process'; + +const defaultUrl = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' + + +async function prepareDB(url?: string) { + const db = await connect(url ?? defaultUrl); + await migrateDB(db); + return db; +} + +export const db = await prepareDB(); From 774cad94240f4899f237eb5f21e5168f0f6533f3 Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 19:53:22 +0800 Subject: [PATCH 08/12] Regenerate migration files --- ...0_cynical_joystick.sql => 0000_tiresome_gamma_corps.sql} | 0 src/drizzle/meta/0000_snapshot.json | 6 ++++-- src/drizzle/meta/_journal.json | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) rename src/drizzle/{0000_cynical_joystick.sql => 0000_tiresome_gamma_corps.sql} (100%) diff --git a/src/drizzle/0000_cynical_joystick.sql b/src/drizzle/0000_tiresome_gamma_corps.sql similarity index 100% rename from src/drizzle/0000_cynical_joystick.sql rename to src/drizzle/0000_tiresome_gamma_corps.sql diff --git a/src/drizzle/meta/0000_snapshot.json b/src/drizzle/meta/0000_snapshot.json index a2a2109..87f1890 100644 --- a/src/drizzle/meta/0000_snapshot.json +++ b/src/drizzle/meta/0000_snapshot.json @@ -1,7 +1,7 @@ { "version": "5", "dialect": "mysql", - "id": "5e96ed89-0bdf-4c0d-b7d9-4ab6d0111973", + "id": "4b17721e-a90c-4cdb-beb5-2d26dce5c576", "prevId": "00000000-0000-0000-0000-000000000000", "tables": { "tasks": { @@ -140,9 +140,11 @@ "idempotencyKey" ] } - } + }, + "checkConstraint": {} } }, + "views": {}, "_meta": { "schemas": {}, "tables": {}, diff --git a/src/drizzle/meta/_journal.json b/src/drizzle/meta/_journal.json index 3bc4b4c..d5ebbc4 100644 --- a/src/drizzle/meta/_journal.json +++ b/src/drizzle/meta/_journal.json @@ -5,8 +5,8 @@ { "idx": 0, "version": "5", - "when": 1734842416630, - "tag": "0000_cynical_joystick", + "when": 1734868297153, + "tag": "0000_tiresome_gamma_corps", "breakpoints": true } ] From 1212cbd1ab3f25d65f226e837cfe827c96c382ce Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 20:22:38 +0800 Subject: [PATCH 09/12] Run all tests inside a single thread --- vitest.config.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vitest.config.ts b/vitest.config.ts index a206cfc..d242d2c 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -9,5 +9,10 @@ export default defineConfig({ alias: { "@/*": "./*", }, + poolOptions: { + threads: { + singleThread: true + } + } }, }); From 005aab647d0f067c3bfce4fb13f0899d23c7238d Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 22:07:36 +0800 Subject: [PATCH 10/12] Use unique queue names to prevent interference between tests --- src/queue.test.ts | 11 ++++++----- src/runner.test.ts | 27 ++++++++++++++------------- vitest.config.ts | 1 + 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/queue.test.ts b/src/queue.test.ts index f722d59..b2fc1f8 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -13,9 +13,9 @@ interface Work { } describe("LiteQueue", () => { - test("idempotency keys", async () => { + test("idempotency keys", async (context) => { const queue = new LiteQueue( - "queue1", + context.task.id, db, { defaultJobArgs: { @@ -40,9 +40,10 @@ describe("LiteQueue", () => { }); - test("keep failed jobs", async () => { + test("keep failed jobs", async (context) => { + const id = context.task.id const queueKeep = new LiteQueue( - "queue1", + id + "queue1", db, { defaultJobArgs: { @@ -53,7 +54,7 @@ describe("LiteQueue", () => { ); const queueDontKeep = new LiteQueue( - "queue2", + id + "queue2", db, { defaultJobArgs: { diff --git a/src/runner.test.ts b/src/runner.test.ts index 1543512..d282ef1 100644 --- a/src/runner.test.ts +++ b/src/runner.test.ts @@ -141,9 +141,9 @@ function buildRunner( } describe("Queue Runner", async () => { - test("should run jobs with correct concurrency", async () => { + test("should run jobs with correct concurrency", async (context) => { const queue = new LiteQueue( - "queue1", + context.task.id, db, { defaultJobArgs: { @@ -203,9 +203,9 @@ describe("Queue Runner", async () => { expect(results.numFailed).toEqual(0); }); - test("should retry errors", async () => { + test("should retry errors", async (context) => { const queue = new LiteQueue( - "queue1", + context.task.id, db, { defaultJobArgs: { @@ -241,9 +241,9 @@ describe("Queue Runner", async () => { expect(results.numFailed).toEqual(1); }); - test("timeouts are respected", async () => { + test("timeouts are respected", async (context) => { const queue = new LiteQueue( - "queue1", + context.task.id, db, { defaultJobArgs: { @@ -277,9 +277,9 @@ describe("Queue Runner", async () => { expect(results.numFailed).toEqual(1); }); - test("serialization errors", async () => { + test("serialization errors", async (context) => { const queue = new LiteQueue( - "queue1", + context.task.id, db, { defaultJobArgs: { @@ -322,9 +322,9 @@ describe("Queue Runner", async () => { expect(results.numFailed).toEqual(1); }); - test("concurrent runners", async () => { + test("concurrent runners", async (context) => { const queue = new LiteQueue( - "queue1", + context.task.id, db, { defaultJobArgs: { @@ -386,14 +386,15 @@ describe("Queue Runner", async () => { expect(results.numFailed).toEqual(0); }); - test("large test", async () => { - const queue1 = new LiteQueue("queue1", db, { + test("large test", async (context) => { + const id = context.task.id + const queue1 = new LiteQueue(id + "queue1", db, { defaultJobArgs: { numRetries: 0, }, keepFailedJobs: true, }); - const queue2 = new LiteQueue("queue2", db, { + const queue2 = new LiteQueue(id + "queue2", db, { defaultJobArgs: { numRetries: 0, }, diff --git a/vitest.config.ts b/vitest.config.ts index d242d2c..cd5f31c 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -9,6 +9,7 @@ export default defineConfig({ alias: { "@/*": "./*", }, + testTimeout: 20000, poolOptions: { threads: { singleThread: true From ca4e61c9b8ec37752c684adbac70e13b35e9c8f7 Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 22:23:51 +0800 Subject: [PATCH 11/12] Update example code --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8130738..3491458 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,10 @@ $ npm install mysql-queue ## Usage ```ts -import { buildDBClient, Runner, LiteQueue } from "mysql-queue"; +import { connect, Runner, LiteQueue } from "mysql-queue"; import { z } from "zod"; -const db = buildDBClient("mysql://root:root@localhost:3306/queue", true); +const db = connect("mysql://root:root@localhost:3306/queue"); const requestSchema = z.object({ message: z.string(), From 9c7c94df4959967e7da687237507631f9bfaa541 Mon Sep 17 00:00:00 2001 From: Junjun Deng Date: Sun, 22 Dec 2024 22:33:31 +0800 Subject: [PATCH 12/12] Release on push tags and manual trigger --- .github/workflows/release.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 55a3556..01a7f9f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,5 +1,10 @@ name: Publish Package to npm on: + workflow_dispatch: + push: + # branches: [main] + tags: + - "*" release: types: [published] jobs: