diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f93a651..a004a4c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,3 +34,40 @@ jobs: - name: Typecheck run: pnpm typecheck + + test: + runs-on: ubuntu-latest + + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + MYSQL_DATABASE: queue + ports: + - 3306:3306 + options: >- + --health-cmd="mysqladmin ping" + --health-interval=10s + --health-timeout=5s + --health-retries=3 + + steps: + - uses: actions/checkout@v4 + - uses: pnpm/action-setup@v2 + - uses: actions/setup-node@v4 + with: + node-version: 22 + cache: "pnpm" + registry-url: https://registry.npmjs.org + + - shell: bash + run: corepack enable + + - shell: bash + run: pnpm install + + - name: Run tests + run: pnpm test + env: + DATABASE_URL: mysql://root:root@localhost:3306/queue diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 55a3556..01a7f9f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,5 +1,10 @@ name: Publish Package to npm on: + workflow_dispatch: + push: + # branches: [main] + tags: + - "*" release: types: [published] jobs: diff --git a/LICENCE b/LICENCE index f0f108c..3e4facd 100644 --- a/LICENCE +++ b/LICENCE @@ -1,3 +1,4 @@ +Copyright 2024 lilac Copyright 2024 Mohamed Bassem Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/README.md b/README.md index 40b582e..3491458 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Liteque +# MySQL Lite Queue -![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/hoarder-app/liteque/ci.yml) ![NPM Version](https://img.shields.io/npm/v/liteque) +![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/rabrain/mysql-queue/ci.yml) ![NPM Version](https://img.shields.io/npm/v/mysql-queue) A simple typesafe mysql-based job queue for Node.js. @@ -14,10 +14,10 @@ $ npm install mysql-queue ## Usage ```ts -import { buildDBClient, Runner, SqliteQueue } from "liteque"; +import { connect, Runner, LiteQueue } from "mysql-queue"; import { z } from "zod"; -const db = buildDBClient(":memory:", true); +const db = connect("mysql://root:root@localhost:3306/queue"); const requestSchema = z.object({ message: z.string(), @@ -25,7 +25,7 @@ const requestSchema = z.object({ const ZRequest = z.infer; // Init the queue -const queue = new SqliteQueue("requests", db, { +const queue = new LiteQueue("requests", db, { defaultJobArgs: { numRetries: 2, }, diff --git a/package.json b/package.json index 9bfe22f..8d7ebb4 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "name": "mysql-queue", "description": "A lite job queue for Node.js", "author": "lilac ", - "version": "0.1.0", + "version": "0.1.1", "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -15,21 +15,21 @@ "license": "MIT", "repository": { "type": "git", - "url": "git+https://github.com/hoarder-app/liteque.git" + "url": "git+https://github.com/rabrain/mysql-queue.git" }, "keywords": [ "queue" ], "dependencies": { "async-mutex": "^0.4.1", - "drizzle-orm": "^0.33.0", + "drizzle-orm": "^0.38.2", "eslint": "^9.17.0", "mysql2": "^3.11.5", "zod": "^3.22.4" }, "devDependencies": { "@tsconfig/node21": "^21.0.3", - "drizzle-kit": "^0.24.02", + "drizzle-kit": "^0.30.1", "typescript": "^5.6.3", "vitest": "^1.3.1" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cb07dad..1d35ece 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,8 +12,8 @@ importers: specifier: ^0.4.1 version: 0.4.1 drizzle-orm: - specifier: ^0.33.0 - version: 0.33.0(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5) + specifier: ^0.38.2 + version: 0.38.2(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5) eslint: specifier: ^9.17.0 version: 9.17.0 @@ -28,8 +28,8 @@ importers: specifier: ^21.0.3 version: 21.0.3 drizzle-kit: - specifier: ^0.24.02 - version: 0.24.2 + specifier: ^0.30.1 + version: 0.30.1 typescript: specifier: ^5.6.3 version: 5.6.3 @@ -783,18 +783,19 @@ packages: resolution: {integrity: sha512-EjePK1srD3P08o2j4f0ExnylqRs5B9tJjcp9t1krH2qRi8CCdsYfwe9JgSLurFBWwq4uOlipzfk5fHNvwFKr8Q==} engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} - drizzle-kit@0.24.2: - resolution: {integrity: sha512-nXOaTSFiuIaTMhS8WJC2d4EBeIcN9OSt2A2cyFbQYBAZbi7lRsVGJNqDpEwPqYfJz38yxbY/UtbvBBahBfnExQ==} + drizzle-kit@0.30.1: + resolution: {integrity: sha512-HmA/NeewvHywhJ2ENXD3KvOuM/+K2dGLJfxVfIHsGwaqKICJnS+Ke2L6UcSrSrtMJLJaT0Im1Qv4TFXfaZShyw==} hasBin: true - drizzle-orm@0.33.0: - resolution: {integrity: sha512-SHy72R2Rdkz0LEq0PSG/IdvnT3nGiWuRk+2tXZQ90GVq/XQhpCzu/EFT3V2rox+w8MlkBQxifF8pCStNYnERfA==} + drizzle-orm@0.38.2: + resolution: {integrity: sha512-eCE3yPRAskLo1WpM9OHpFaM70tBEDsWhwR/0M3CKyztAXKR9Qs3asZlcJOEliIcUSg8GuwrlY0dmYDgmm6y5GQ==} peerDependencies: '@aws-sdk/client-rds-data': '>=3' - '@cloudflare/workers-types': '>=3' - '@electric-sql/pglite': '>=0.1.1' - '@libsql/client': '*' - '@neondatabase/serverless': '>=0.1' + '@cloudflare/workers-types': '>=4' + '@electric-sql/pglite': '>=0.2.0' + '@libsql/client': '>=0.10.0' + '@libsql/client-wasm': '>=0.10.0' + '@neondatabase/serverless': '>=0.10.0' '@op-engineering/op-sqlite': '>=2' '@opentelemetry/api': ^1.4.1 '@planetscale/database': '>=1' @@ -808,7 +809,7 @@ packages: '@xata.io/client': '*' better-sqlite3: '>=7' bun-types: '*' - expo-sqlite: '>=13.2.0' + expo-sqlite: '>=14.0.0' knex: '*' kysely: '*' mysql2: '>=2' @@ -827,6 +828,8 @@ packages: optional: true '@libsql/client': optional: true + '@libsql/client-wasm': + optional: true '@neondatabase/serverless': optional: true '@op-engineering/op-sqlite': @@ -2005,7 +2008,7 @@ snapshots: diff-sequences@29.6.3: {} - drizzle-kit@0.24.2: + drizzle-kit@0.30.1: dependencies: '@drizzle-team/brocli': 0.10.2 '@esbuild-kit/esm-loader': 2.6.5 @@ -2014,7 +2017,7 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.33.0(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5): + drizzle-orm@0.38.2(@types/better-sqlite3@7.6.11)(better-sqlite3@11.5.0)(mysql2@3.11.5): optionalDependencies: '@types/better-sqlite3': 7.6.11 better-sqlite3: 11.5.0 diff --git a/src/db/index.ts b/src/db/index.ts index 264875a..743920f 100644 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -4,24 +4,20 @@ import mysql from "mysql2/promise"; import path from "node:path"; import * as schema from "./schema"; +export type Database = MySql2Database; + export const affectedRows = (rawResult: MySqlRawQueryResult) => { return rawResult[0].affectedRows }; -const defaultURL = 'mysql://root:root@localhost:3306/queue' - -export function buildDBClient(url?: string, runMigrations = false) { - const connection = mysql.createPool(url ?? defaultURL); - const db = drizzle(connection, { schema, mode: 'planetscale' }); - - if (runMigrations) { - migrateDB(db); - } +export async function connect(url: string) { + const connection = await mysql.createConnection(url); + const db = drizzle(connection, { schema, mode: 'default' }); return db; } export function migrateDB(db: MySql2Database) { - migrate(db, { + return migrate(db, { migrationsFolder: path.join(__dirname, '../drizzle') }); } diff --git a/src/db/schema.ts b/src/db/schema.ts index 99c4ff7..fbbe05e 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -5,6 +5,7 @@ import { timestamp, mysqlTable, unique, + varchar, } from "drizzle-orm/mysql-core"; export const createTable = mysqlTable; @@ -19,19 +20,20 @@ export const tasksTable = createTable( "tasks", { id: integer("id").notNull().primaryKey().autoincrement(), - queue: text("queue").notNull(), + queue: varchar("queue", { length: 255 }).notNull(), payload: text("payload").notNull(), createdAt: createdAtField(), - status: text("status", { + status: varchar("status", { + length: 50, enum: ["pending", "running", "pending_retry", "failed"], }) .notNull() .default("pending"), expireAt: timestamp("expireAt", { mode: "date" }), - allocationId: text("allocationId").notNull(), + allocationId: varchar("allocationId", { length: 50 }).notNull(), numRunsLeft: integer("numRunsLeft").notNull(), maxNumRuns: integer("maxNumRuns").notNull(), - idempotencyKey: text("idempotencyKey"), + idempotencyKey: varchar("idempotencyKey", { length: 255 }), }, (tasks) => ({ queueIdx: index("tasks_queue_idx").on(tasks.queue), diff --git a/src/drizzle/0000_stiff_martin_li.sql b/src/drizzle/0000_tiresome_gamma_corps.sql similarity index 84% rename from src/drizzle/0000_stiff_martin_li.sql rename to src/drizzle/0000_tiresome_gamma_corps.sql index da3ea94..cc8eaf6 100644 --- a/src/drizzle/0000_stiff_martin_li.sql +++ b/src/drizzle/0000_tiresome_gamma_corps.sql @@ -1,14 +1,14 @@ CREATE TABLE `tasks` ( `id` int AUTO_INCREMENT NOT NULL, - `queue` text NOT NULL, + `queue` varchar(255) NOT NULL, `payload` text NOT NULL, `createdAt` timestamp NOT NULL, - `status` text NOT NULL DEFAULT ('pending'), + `status` varchar(50) NOT NULL DEFAULT 'pending', `expireAt` timestamp, - `allocationId` text NOT NULL, + `allocationId` varchar(50) NOT NULL, `numRunsLeft` int NOT NULL, `maxNumRuns` int NOT NULL, - `idempotencyKey` text, + `idempotencyKey` varchar(255), CONSTRAINT `tasks_id` PRIMARY KEY(`id`), CONSTRAINT `tasks_queue_idempotencyKey_unique` UNIQUE(`queue`,`idempotencyKey`) ); diff --git a/src/drizzle/meta/0000_snapshot.json b/src/drizzle/meta/0000_snapshot.json index 27c71b5..87f1890 100644 --- a/src/drizzle/meta/0000_snapshot.json +++ b/src/drizzle/meta/0000_snapshot.json @@ -1,7 +1,7 @@ { "version": "5", "dialect": "mysql", - "id": "d5ac27ee-fb1f-4d8b-977d-23ef372e91b0", + "id": "4b17721e-a90c-4cdb-beb5-2d26dce5c576", "prevId": "00000000-0000-0000-0000-000000000000", "tables": { "tasks": { @@ -16,7 +16,7 @@ }, "queue": { "name": "queue", - "type": "text", + "type": "varchar(255)", "primaryKey": false, "notNull": true, "autoincrement": false @@ -37,11 +37,11 @@ }, "status": { "name": "status", - "type": "text", + "type": "varchar(50)", "primaryKey": false, "notNull": true, "autoincrement": false, - "default": "('pending')" + "default": "'pending'" }, "expireAt": { "name": "expireAt", @@ -52,7 +52,7 @@ }, "allocationId": { "name": "allocationId", - "type": "text", + "type": "varchar(50)", "primaryKey": false, "notNull": true, "autoincrement": false @@ -73,7 +73,7 @@ }, "idempotencyKey": { "name": "idempotencyKey", - "type": "text", + "type": "varchar(255)", "primaryKey": false, "notNull": false, "autoincrement": false @@ -140,9 +140,11 @@ "idempotencyKey" ] } - } + }, + "checkConstraint": {} } }, + "views": {}, "_meta": { "schemas": {}, "tables": {}, diff --git a/src/drizzle/meta/_journal.json b/src/drizzle/meta/_journal.json index 1fddda4..d5ebbc4 100644 --- a/src/drizzle/meta/_journal.json +++ b/src/drizzle/meta/_journal.json @@ -5,8 +5,8 @@ { "idx": 0, "version": "5", - "when": 1734791710708, - "tag": "0000_stiff_martin_li", + "when": 1734868297153, + "tag": "0000_tiresome_gamma_corps", "breakpoints": true } ] diff --git a/src/index.ts b/src/index.ts index f2c5b04..587bbe0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ export { LiteQueue } from "./queue"; -export { buildDBClient, migrateDB } from "./db"; +export { connect, migrateDB } from "./db"; export type { QueueOptions, RunnerOptions, RunnerFuncs } from "./options"; export { Runner } from "./runner"; diff --git a/src/queue.test.ts b/src/queue.test.ts index e75e5ea..b2fc1f8 100644 --- a/src/queue.test.ts +++ b/src/queue.test.ts @@ -2,9 +2,9 @@ import { describe, expect, test } from "vitest"; import { - buildDBClient, LiteQueue } from "./"; +import { db } from "./test"; interface Work { increment: number; @@ -13,10 +13,10 @@ interface Work { } describe("LiteQueue", () => { - test("idempotency keys", async () => { + test("idempotency keys", async (context) => { const queue = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + context.task.id, + db, { defaultJobArgs: { numRetries: 0, @@ -40,10 +40,11 @@ describe("LiteQueue", () => { }); - test("keep failed jobs", async () => { + test("keep failed jobs", async (context) => { + const id = context.task.id const queueKeep = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + id + "queue1", + db, { defaultJobArgs: { numRetries: 0, @@ -53,8 +54,8 @@ describe("LiteQueue", () => { ); const queueDontKeep = new LiteQueue( - "queue2", - buildDBClient(undefined, true), + id + "queue2", + db, { defaultJobArgs: { numRetries: 0, diff --git a/src/queue.ts b/src/queue.ts index 0bba34e..024fd1e 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,7 +1,7 @@ import assert from "node:assert"; import { and, asc, count, eq, gt, lt, or, sql } from "drizzle-orm"; -import { affectedRows, buildDBClient } from "./db"; +import { affectedRows, type Database } from "./db"; import { EnqueueOptions, QueueOptions } from "./options"; import { Job, tasksTable } from "./db/schema"; @@ -12,12 +12,12 @@ function generateAllocationId() { export class LiteQueue { queueName: string; - db: ReturnType; + db: Database; options: QueueOptions; constructor( name: string, - db: ReturnType, + db: Database, options: QueueOptions, ) { this.queueName = name; @@ -117,14 +117,15 @@ export class LiteQueue { assert(jobs.length == 1); const job = jobs[0]; + const change = { + status: "running" as Job["status"], + numRunsLeft: job.numRunsLeft - 1, + allocationId: generateAllocationId(), + expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), + }; const result = await txn .update(tasksTable) - .set({ - status: "running", - numRunsLeft: job.numRunsLeft - 1, - allocationId: generateAllocationId(), - expireAt: new Date(new Date().getTime() + options.timeoutSecs * 1000), - }) + .set(change) .where( and( eq(tasksTable.id, job.id), @@ -138,7 +139,7 @@ export class LiteQueue { return null; } assert(rows == 1); - return job; + return Object.assign(job, change); }); } diff --git a/src/runner.test.ts b/src/runner.test.ts index 5b0ef90..d282ef1 100644 --- a/src/runner.test.ts +++ b/src/runner.test.ts @@ -5,7 +5,6 @@ import { describe, expect, test } from "vitest"; import { z } from "zod"; import { - buildDBClient, DequeuedJob, DequeuedJobError, Runner, @@ -13,6 +12,7 @@ import { LiteQueue, } from "./"; import { tasksTable } from "./db/schema"; +import { db } from "./test"; class Baton { semaphore: Semaphore; @@ -140,11 +140,11 @@ function buildRunner( return { runner, results }; } -describe("SqiteQueueRunner", () => { - test("should run jobs with correct concurrency", async () => { +describe("Queue Runner", async () => { + test("should run jobs with correct concurrency", async (context) => { const queue = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + context.task.id, + db, { defaultJobArgs: { numRetries: 0, @@ -203,10 +203,10 @@ describe("SqiteQueueRunner", () => { expect(results.numFailed).toEqual(0); }); - test("should retry errors", async () => { + test("should retry errors", async (context) => { const queue = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + context.task.id, + db, { defaultJobArgs: { numRetries: 2, @@ -241,10 +241,10 @@ describe("SqiteQueueRunner", () => { expect(results.numFailed).toEqual(1); }); - test("timeouts are respected", async () => { + test("timeouts are respected", async (context) => { const queue = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + context.task.id, + db, { defaultJobArgs: { numRetries: 1, @@ -277,10 +277,10 @@ describe("SqiteQueueRunner", () => { expect(results.numFailed).toEqual(1); }); - test("serialization errors", async () => { + test("serialization errors", async (context) => { const queue = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + context.task.id, + db, { defaultJobArgs: { numRetries: 1, @@ -322,10 +322,10 @@ describe("SqiteQueueRunner", () => { expect(results.numFailed).toEqual(1); }); - test("concurrent runners", async () => { + test("concurrent runners", async (context) => { const queue = new LiteQueue( - "queue1", - buildDBClient(undefined, true), + context.task.id, + db, { defaultJobArgs: { numRetries: 0, @@ -386,15 +386,15 @@ describe("SqiteQueueRunner", () => { expect(results.numFailed).toEqual(0); }); - test("large test", async () => { - const db = buildDBClient(undefined, true); - const queue1 = new LiteQueue("queue1", db, { + test("large test", async (context) => { + const id = context.task.id + const queue1 = new LiteQueue(id + "queue1", db, { defaultJobArgs: { numRetries: 0, }, keepFailedJobs: true, }); - const queue2 = new LiteQueue("queue2", db, { + const queue2 = new LiteQueue(id + "queue2", db, { defaultJobArgs: { numRetries: 0, }, diff --git a/src/test.ts b/src/test.ts new file mode 100644 index 0000000..5d5fb1f --- /dev/null +++ b/src/test.ts @@ -0,0 +1,13 @@ +import { connect, migrateDB } from "./db"; +import { env } from 'node:process'; + +const defaultUrl = env['DATABASE_URL'] ?? 'mysql://root:root@localhost:3306/queue' + + +async function prepareDB(url?: string) { + const db = await connect(url ?? defaultUrl); + await migrateDB(db); + return db; +} + +export const db = await prepareDB(); diff --git a/vitest.config.ts b/vitest.config.ts index a206cfc..cd5f31c 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -9,5 +9,11 @@ export default defineConfig({ alias: { "@/*": "./*", }, + testTimeout: 20000, + poolOptions: { + threads: { + singleThread: true + } + } }, });