diff --git a/packages/worker/.gitignore b/packages/worker/.gitignore index b94707787..1d15bf08a 100644 --- a/packages/worker/.gitignore +++ b/packages/worker/.gitignore @@ -1,2 +1,4 @@ +**/*.log* +/coverage node_modules/ dist/ diff --git a/packages/worker/package.json b/packages/worker/package.json index 44f3dba7b..9dd95bd71 100644 --- a/packages/worker/package.json +++ b/packages/worker/package.json @@ -27,6 +27,7 @@ "lint": "eslint .", "lint:fix": "eslint . --fix", "sort-package": "npx sort-package-json", + "test": "vitest run --coverage", "typecheck": "tsc --noEmit -p tsconfig.json --composite false" }, "dependencies": { @@ -39,6 +40,7 @@ "@prefabs.tech/eslint-config": "0.5.0", "@prefabs.tech/fastify-config": "0.93.5", "@prefabs.tech/tsconfig": "0.5.0", + "@vitest/coverage-istanbul": "3.2.4", "eslint": "9.39.2", "fastify": "5.7.4", "fastify-plugin": "5.1.0", diff --git a/packages/worker/src/__test__/cron/scheduler.test.ts b/packages/worker/src/__test__/cron/scheduler.test.ts new file mode 100644 index 000000000..53617eda0 --- /dev/null +++ b/packages/worker/src/__test__/cron/scheduler.test.ts @@ -0,0 +1,92 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import CronScheduler from "../../cron/scheduler"; + +const { mockStop, mockSchedule } = vi.hoisted(() => { + const mockStop = vi.fn(); + const mockSchedule = vi.fn().mockReturnValue({ stop: mockStop }); + + return { mockStop, mockSchedule }; +}); + +vi.mock("node-cron", () => ({ + default: { + schedule: mockSchedule, + }, +})); + +describe("CronScheduler", () => { + let scheduler: CronScheduler; + + beforeEach(() => { + vi.clearAllMocks(); + mockSchedule.mockReturnValue({ stop: mockStop }); + scheduler = new CronScheduler(); + }); + + describe("schedule", () => { + it("should schedule a cron job with the given expression and task", () => { + const task = vi.fn(); + const job = { expression: "* * * * *", task }; + + scheduler.schedule(job); + + expect(mockSchedule).toHaveBeenCalledWith("* * * * *", task, undefined); + }); + + it("should pass options to node-cron when provided", () => { + const task = vi.fn(); + const options = { scheduled: true, timezone: "UTC" }; + const job = { expression: "0 * * * *", task, options }; + + scheduler.schedule(job); + + expect(mockSchedule).toHaveBeenCalledWith("0 * * * *", task, options); + }); + + it("should track multiple scheduled tasks", () => { + scheduler.schedule({ expression: "* * * * *", task: vi.fn() }); + scheduler.schedule({ expression: "0 * * * *", task: vi.fn() }); + scheduler.schedule({ expression: "0 0 * * *", task: vi.fn() }); + + expect(mockSchedule).toHaveBeenCalledTimes(3); + }); + }); + + describe("stopAll", () => { + it("should stop all scheduled tasks", () => { + const mockStop1 = vi.fn(); + const mockStop2 = vi.fn(); + + mockSchedule + .mockReturnValueOnce({ stop: mockStop1 }) + .mockReturnValueOnce({ stop: mockStop2 }); + + scheduler.schedule({ expression: "* * * * *", task: vi.fn() }); + scheduler.schedule({ expression: "0 * * * *", task: vi.fn() }); + + scheduler.stopAll(); + + expect(mockStop1).toHaveBeenCalledOnce(); + expect(mockStop2).toHaveBeenCalledOnce(); + }); + + it("should clear the tasks list after stopping", () => { + mockSchedule.mockReturnValue({ stop: vi.fn() }); + + scheduler.schedule({ expression: "* * * * *", task: vi.fn() }); + scheduler.stopAll(); + + // Calling stopAll again should not call any stop methods + const newMockStop = vi.fn(); + mockSchedule.mockReturnValue({ stop: newMockStop }); + scheduler.stopAll(); + + expect(newMockStop).not.toHaveBeenCalled(); + }); + + it("should do nothing when no tasks are scheduled", () => { + expect(() => scheduler.stopAll()).not.toThrow(); + }); + }); +}); diff --git a/packages/worker/src/__test__/jobOrchestrator.test.ts b/packages/worker/src/__test__/jobOrchestrator.test.ts new file mode 100644 index 000000000..89d6d9c08 --- /dev/null +++ b/packages/worker/src/__test__/jobOrchestrator.test.ts @@ -0,0 +1,208 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { QueueProvider } from "../enum"; +import JobOrchestrator from "../jobOrchestrator"; + +const { mockSchedule, mockStopAll, mockAdapterStart, mockAdapterShutdown } = + vi.hoisted(() => ({ + mockSchedule: vi.fn(), + mockStopAll: vi.fn(), + // eslint-disable-next-line unicorn/no-useless-undefined + mockAdapterStart: vi.fn().mockResolvedValue(undefined), + // eslint-disable-next-line unicorn/no-useless-undefined + mockAdapterShutdown: vi.fn().mockResolvedValue(undefined), + })); + +vi.mock("../cron", () => ({ + CronScheduler: vi.fn().mockImplementation(() => ({ + schedule: mockSchedule, + stopAll: mockStopAll, + })), +})); + +vi.mock("../queue", async (importOriginal) => { + const original = await importOriginal(); + + return { + ...original, + createQueueAdapter: vi + .fn() + .mockImplementation((config: { name: string }) => ({ + queueName: config.name, + start: mockAdapterStart, + shutdown: mockAdapterShutdown, + getClient: vi.fn(), + push: vi.fn(), + })), + }; +}); + +describe("JobOrchestrator", () => { + let orchestrator: JobOrchestrator; + + beforeEach(() => { + vi.clearAllMocks(); + // eslint-disable-next-line unicorn/no-useless-undefined + mockAdapterStart.mockResolvedValue(undefined); + // eslint-disable-next-line unicorn/no-useless-undefined + mockAdapterShutdown.mockResolvedValue(undefined); + }); + + afterEach(async () => { + // Clear static registry between tests to prevent state leakage + await JobOrchestrator.adapters.shutdownAll(); + }); + + describe("constructor", () => { + it("should create a CronScheduler instance", async () => { + const { CronScheduler } = vi.mocked(await import("../cron")); + + orchestrator = new JobOrchestrator({ cronJobs: [], queues: [] }); + + expect(CronScheduler).toHaveBeenCalledOnce(); + expect(orchestrator.cron).toBeDefined(); + }); + }); + + describe("start", () => { + it("should schedule all cron jobs on start", async () => { + const task = vi.fn(); + orchestrator = new JobOrchestrator({ + cronJobs: [ + { expression: "* * * * *", task }, + { expression: "0 * * * *", task }, + ], + }); + + await orchestrator.start(); + + expect(mockSchedule).toHaveBeenCalledTimes(2); + expect(mockSchedule).toHaveBeenCalledWith({ + expression: "* * * * *", + task, + }); + expect(mockSchedule).toHaveBeenCalledWith({ + expression: "0 * * * *", + task, + }); + }); + + it("should create and start all queue adapters on start", async () => { + const { createQueueAdapter } = vi.mocked(await import("../queue")); + + orchestrator = new JobOrchestrator({ + queues: [ + { + bullmqConfig: { + handler: vi.fn(), + queueOptions: { connection: {} }, + }, + name: "queue-1", + provider: QueueProvider.BULLMQ, + }, + { + bullmqConfig: { + handler: vi.fn(), + queueOptions: { connection: {} }, + }, + name: "queue-2", + provider: QueueProvider.BULLMQ, + }, + ], + }); + + await orchestrator.start(); + + expect(createQueueAdapter).toHaveBeenCalledTimes(2); + expect(mockAdapterStart).toHaveBeenCalledTimes(2); + }); + + it("should register adapters in the static registry", async () => { + orchestrator = new JobOrchestrator({ + queues: [ + { + bullmqConfig: { + handler: vi.fn(), + queueOptions: { connection: {} }, + }, + name: "my-queue", + provider: QueueProvider.BULLMQ, + }, + ], + }); + + await orchestrator.start(); + + expect(JobOrchestrator.adapters.has("my-queue")).toBe(true); + }); + + it("should not schedule any cron jobs when cronJobs is undefined", async () => { + orchestrator = new JobOrchestrator({ queues: [] }); + + await orchestrator.start(); + + expect(mockSchedule).not.toHaveBeenCalled(); + }); + + it("should not create any adapters when queues is undefined", async () => { + const { createQueueAdapter } = vi.mocked(await import("../queue")); + orchestrator = new JobOrchestrator({ cronJobs: [] }); + + await orchestrator.start(); + + expect(createQueueAdapter).not.toHaveBeenCalled(); + expect(mockAdapterStart).not.toHaveBeenCalled(); + }); + }); + + describe("shutdown", () => { + it("should stop all cron jobs on shutdown", async () => { + orchestrator = new JobOrchestrator({ cronJobs: [], queues: [] }); + await orchestrator.start(); + + await orchestrator.shutdown(); + + expect(mockStopAll).toHaveBeenCalledOnce(); + }); + + it("should shut down all registered adapters on shutdown", async () => { + orchestrator = new JobOrchestrator({ + queues: [ + { + bullmqConfig: { + handler: vi.fn(), + queueOptions: { connection: {} }, + }, + name: "shutdown-queue", + provider: QueueProvider.BULLMQ, + }, + ], + }); + + await orchestrator.start(); + await orchestrator.shutdown(); + + expect(mockAdapterShutdown).toHaveBeenCalledOnce(); + }); + + it("should clear the adapter registry after shutdown", async () => { + orchestrator = new JobOrchestrator({ + queues: [ + { + bullmqConfig: { + handler: vi.fn(), + queueOptions: { connection: {} }, + }, + name: "clear-queue", + provider: QueueProvider.BULLMQ, + }, + ], + }); + + await orchestrator.start(); + await orchestrator.shutdown(); + + expect(JobOrchestrator.adapters.getAll()).toHaveLength(0); + }); + }); +}); diff --git a/packages/worker/src/__test__/plugin.test.ts b/packages/worker/src/__test__/plugin.test.ts new file mode 100644 index 000000000..952a94e77 --- /dev/null +++ b/packages/worker/src/__test__/plugin.test.ts @@ -0,0 +1,84 @@ +import fastify from "fastify"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import type { FastifyInstance } from "fastify"; + +const { mockStart, mockShutdown, MockJobOrchestrator } = vi.hoisted(() => { + // eslint-disable-next-line unicorn/no-useless-undefined + const mockStart = vi.fn().mockResolvedValue(undefined); + // eslint-disable-next-line unicorn/no-useless-undefined + const mockShutdown = vi.fn().mockResolvedValue(undefined); + const MockJobOrchestrator = vi.fn().mockImplementation(() => ({ + cron: {}, + shutdown: mockShutdown, + start: mockStart, + })); + + return { mockStart, mockShutdown, MockJobOrchestrator }; +}); + +vi.mock("../jobOrchestrator", () => ({ + default: MockJobOrchestrator, +})); + +describe("Worker plugin", async () => { + let api: FastifyInstance; + const { default: plugin } = await import("../plugin"); + + const workerConfig = { + cronJobs: [], + queues: [], + }; + + beforeEach(async () => { + vi.clearAllMocks(); + // eslint-disable-next-line unicorn/no-useless-undefined + mockStart.mockResolvedValue(undefined); + // eslint-disable-next-line unicorn/no-useless-undefined + mockShutdown.mockResolvedValue(undefined); + api = fastify(); + }); + + afterEach(async () => { + // Suppress error if api was already closed inside the test + await api.close().catch(() => {}); + }); + + it("should log a warning and skip registration when worker config is missing", async () => { + api.decorate("config", {} as never); + + await api.register(plugin); + await api.ready(); + + expect(MockJobOrchestrator).not.toHaveBeenCalled(); + }); + + it("should create a JobOrchestrator and call start when worker config is present", async () => { + api.decorate("config", { worker: workerConfig } as never); + + await api.register(plugin); + await api.ready(); + + expect(MockJobOrchestrator).toHaveBeenCalledWith(workerConfig); + expect(mockStart).toHaveBeenCalledOnce(); + }); + + it("should decorate the fastify instance with the worker orchestrator", async () => { + api.decorate("config", { worker: workerConfig } as never); + + await api.register(plugin); + await api.ready(); + + expect((api as FastifyInstance & { worker: unknown }).worker).toBeDefined(); + }); + + it("should call shutdown on the orchestrator when fastify closes", async () => { + api.decorate("config", { worker: workerConfig } as never); + + await api.register(plugin); + await api.ready(); + await api.close(); + + expect(mockShutdown).toHaveBeenCalledOnce(); + }); +}); diff --git a/packages/worker/src/__test__/queue/adapterRegistry.test.ts b/packages/worker/src/__test__/queue/adapterRegistry.test.ts new file mode 100644 index 000000000..5782dcf84 --- /dev/null +++ b/packages/worker/src/__test__/queue/adapterRegistry.test.ts @@ -0,0 +1,111 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import AdapterRegistry from "../../queue/adapterRegistry"; +import QueueAdapter from "../../queue/adapters/base"; + +class MockAdapter extends QueueAdapter { + // eslint-disable-next-line unicorn/no-useless-undefined + start = vi.fn().mockResolvedValue(undefined); + // eslint-disable-next-line unicorn/no-useless-undefined + shutdown = vi.fn().mockResolvedValue(undefined); + getClient = vi.fn().mockReturnValue({}); + push = vi.fn().mockResolvedValue("job-id"); +} + +describe("AdapterRegistry", () => { + let registry: AdapterRegistry; + let adapterA: MockAdapter; + let adapterB: MockAdapter; + + beforeEach(() => { + registry = new AdapterRegistry(); + adapterA = new MockAdapter("queue-a"); + adapterB = new MockAdapter("queue-b"); + }); + + describe("add / get", () => { + it("should add an adapter and retrieve it by name", () => { + registry.add(adapterA); + + expect(registry.get("queue-a")).toBe(adapterA); + }); + + it("should return undefined for an unregistered adapter name", () => { + expect(registry.get("non-existent")).toBeUndefined(); + }); + + it("should overwrite an existing adapter with the same name", () => { + const replacement = new MockAdapter("queue-a"); + registry.add(adapterA); + registry.add(replacement); + + expect(registry.get("queue-a")).toBe(replacement); + }); + }); + + describe("getAll", () => { + it("should return all registered adapters", () => { + registry.add(adapterA); + registry.add(adapterB); + + expect(registry.getAll()).toHaveLength(2); + expect(registry.getAll()).toContain(adapterA); + expect(registry.getAll()).toContain(adapterB); + }); + + it("should return an empty array when no adapters are registered", () => { + expect(registry.getAll()).toEqual([]); + }); + }); + + describe("has", () => { + it("should return true when adapter exists", () => { + registry.add(adapterA); + + expect(registry.has("queue-a")).toBe(true); + }); + + it("should return false when adapter does not exist", () => { + expect(registry.has("queue-a")).toBe(false); + }); + }); + + describe("remove", () => { + it("should remove an adapter by name", () => { + registry.add(adapterA); + registry.remove("queue-a"); + + expect(registry.has("queue-a")).toBe(false); + expect(registry.get("queue-a")).toBeUndefined(); + }); + + it("should not throw when removing a non-existent adapter", () => { + expect(() => registry.remove("non-existent")).not.toThrow(); + }); + }); + + describe("shutdownAll", () => { + it("should call shutdown on all adapters", async () => { + registry.add(adapterA); + registry.add(adapterB); + + await registry.shutdownAll(); + + expect(adapterA.shutdown).toHaveBeenCalledOnce(); + expect(adapterB.shutdown).toHaveBeenCalledOnce(); + }); + + it("should clear all adapters after shutdown", async () => { + registry.add(adapterA); + registry.add(adapterB); + + await registry.shutdownAll(); + + expect(registry.getAll()).toEqual([]); + }); + + it("should resolve without error when no adapters are registered", async () => { + await expect(registry.shutdownAll()).resolves.not.toThrow(); + }); + }); +}); diff --git a/packages/worker/src/__test__/queue/adapters/bullmq.test.ts b/packages/worker/src/__test__/queue/adapters/bullmq.test.ts new file mode 100644 index 000000000..804cbcdf5 --- /dev/null +++ b/packages/worker/src/__test__/queue/adapters/bullmq.test.ts @@ -0,0 +1,244 @@ +import { Job, JobsOptions } from "bullmq"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import BullMQAdapter from "../../../queue/adapters/bullmq"; + +const { + mockQueueAdd, + mockQueueClose, + mockWorkerClose, + mockWorkerOn, + capturedHandler, + eventListeners, + MockQueue, + MockWorker, +} = vi.hoisted(() => { + const mockQueueAdd = vi.fn().mockResolvedValue({ id: "job-123" }); + // eslint-disable-next-line unicorn/no-useless-undefined + const mockQueueClose = vi.fn().mockResolvedValue(undefined); + // eslint-disable-next-line unicorn/no-useless-undefined + const mockWorkerClose = vi.fn().mockResolvedValue(undefined); + + const eventListeners: Record void> = {}; + const mockWorkerOn = vi + .fn() + .mockImplementation( + (event: string, callback: (...arguments_: unknown[]) => void) => { + eventListeners[event] = callback; + }, + ); + + const capturedHandler = { + fn: undefined as ((job: unknown) => Promise) | undefined, + }; + + const MockQueue = vi.fn().mockImplementation(() => ({ + add: mockQueueAdd, + close: mockQueueClose, + })); + + const MockWorker = vi + .fn() + .mockImplementation( + (_name: string, handler: (job: unknown) => Promise) => { + capturedHandler.fn = handler; + return { on: mockWorkerOn, close: mockWorkerClose }; + }, + ); + + return { + MockQueue, + MockWorker, + capturedHandler, + eventListeners, + mockQueueAdd, + mockQueueClose, + mockWorkerClose, + mockWorkerOn, + }; +}); + +vi.mock("bullmq", () => ({ + Job: class {}, + Queue: MockQueue, + Worker: MockWorker, +})); + +const baseConfig = { + // eslint-disable-next-line unicorn/no-useless-undefined + handler: vi.fn().mockResolvedValue(undefined), + queueOptions: { + connection: { host: "localhost", port: 6379 }, + }, +}; + +describe("BullMQAdapter", () => { + let adapter: BullMQAdapter<{ key: string }>; + + beforeEach(() => { + vi.clearAllMocks(); + mockQueueAdd.mockResolvedValue({ id: "job-123" }); + adapter = new BullMQAdapter("test-queue", baseConfig); + }); + + describe("start", () => { + it("should create a BullMQ Queue with the given name and options", async () => { + await adapter.start(); + + expect(MockQueue).toHaveBeenCalledWith( + "test-queue", + baseConfig.queueOptions, + ); + }); + + it("should create a Worker with the queue name and connection", async () => { + await adapter.start(); + + expect(MockWorker).toHaveBeenCalledWith( + "test-queue", + expect.any(Function), + { connection: baseConfig.queueOptions.connection }, + ); + }); + + it("should merge workerOptions with connection from queueOptions", async () => { + const config = { + ...baseConfig, + workerOptions: { + concurrency: 5, + connection: baseConfig.queueOptions.connection, + }, + }; + const adapterWithWorkerOptions = new BullMQAdapter("test-queue", config); + + await adapterWithWorkerOptions.start(); + + expect(MockWorker).toHaveBeenCalledWith( + "test-queue", + expect.any(Function), + { connection: baseConfig.queueOptions.connection, concurrency: 5 }, + ); + }); + + it("should register error and failed event listeners on the worker", async () => { + await adapter.start(); + + expect(mockWorkerOn).toHaveBeenCalledWith("error", expect.any(Function)); + expect(mockWorkerOn).toHaveBeenCalledWith("failed", expect.any(Function)); + }); + + it("should invoke the job handler when the worker processes a job", async () => { + await adapter.start(); + + const mockJob = { data: { key: "value" } } as Job; + await capturedHandler.fn!(mockJob); + + expect(baseConfig.handler).toHaveBeenCalledWith(mockJob); + }); + }); + + describe("shutdown", () => { + it("should close the worker and queue", async () => { + await adapter.start(); + await adapter.shutdown(); + + expect(mockWorkerClose).toHaveBeenCalledOnce(); + expect(mockQueueClose).toHaveBeenCalledOnce(); + }); + + it("should not throw if called before start", async () => { + await expect(adapter.shutdown()).resolves.not.toThrow(); + }); + }); + + describe("getClient", () => { + it("should return the underlying BullMQ Queue instance", async () => { + await adapter.start(); + + expect(adapter.getClient()).toBeDefined(); + expect(adapter.getClient()).toHaveProperty("add"); + }); + }); + + describe("push", () => { + it("should add a job to the queue and return the job id", async () => { + await adapter.start(); + + const id = await adapter.push({ key: "value" }); + + expect(mockQueueAdd).toHaveBeenCalledWith( + "test-queue", + { key: "value" }, + undefined, + ); + expect(id).toBe("job-123"); + }); + + it("should pass job options to queue.add", async () => { + await adapter.start(); + const options: JobsOptions = { delay: 1000 }; + + await adapter.push({ key: "value" }, options); + + expect(mockQueueAdd).toHaveBeenCalledWith( + "test-queue", + { key: "value" }, + options, + ); + }); + + it("should throw a descriptive error when queue.add fails", async () => { + await adapter.start(); + mockQueueAdd.mockRejectedValueOnce(new Error("Redis connection refused")); + + await expect(adapter.push({ key: "value" })).rejects.toThrowError( + "Failed to push job to BullMQ queue: test-queue. Error: Redis connection refused", + ); + }); + }); + + describe("event handlers", () => { + it("should call onError when the worker emits an error", async () => { + const onError = vi.fn(); + const adapterWithError = new BullMQAdapter("test-queue", { + ...baseConfig, + onError, + }); + await adapterWithError.start(); + + const error = new Error("worker error"); + eventListeners["error"](error); + + expect(onError).toHaveBeenCalledWith(error); + }); + + it("should not throw when an error is emitted with no onError handler", async () => { + await adapter.start(); + + expect(() => eventListeners["error"](new Error("error"))).not.toThrow(); + }); + + it("should call onFailed when the worker emits a failed event", async () => { + const onFailed = vi.fn(); + const adapterWithFailed = new BullMQAdapter("test-queue", { + ...baseConfig, + onFailed, + }); + await adapterWithFailed.start(); + + const job = { id: "job-1" } as Job; + const error = new Error("job failed"); + eventListeners["failed"](job, error); + + expect(onFailed).toHaveBeenCalledWith(job, error); + }); + + it("should not throw when a failed event is emitted with no onFailed handler", async () => { + await adapter.start(); + + expect(() => + eventListeners["failed"]({ id: "job-1" }, new Error("error")), + ).not.toThrow(); + }); + }); +}); diff --git a/packages/worker/src/__test__/queue/adapters/sqs.test.ts b/packages/worker/src/__test__/queue/adapters/sqs.test.ts new file mode 100644 index 000000000..72538095b --- /dev/null +++ b/packages/worker/src/__test__/queue/adapters/sqs.test.ts @@ -0,0 +1,280 @@ +import { + DeleteMessageCommand, + ReceiveMessageCommand, + SendMessageCommand, +} from "@aws-sdk/client-sqs"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import SQSAdapter from "../../../queue/adapters/sqs"; + +const { mockClientSend, mockClientDestroy, MockSQSClient } = vi.hoisted(() => { + const mockClientSend = vi.fn(); + const mockClientDestroy = vi.fn(); + const MockSQSClient = vi.fn().mockImplementation(() => ({ + destroy: mockClientDestroy, + send: mockClientSend, + })); + + return { mockClientSend, mockClientDestroy, MockSQSClient }; +}); + +vi.mock("@aws-sdk/client-sqs", () => { + class ReceiveMessageCommand { + input: Record; + constructor(input: Record) { + this.input = input; + } + } + class DeleteMessageCommand { + input: Record; + constructor(input: Record) { + this.input = input; + } + } + class SendMessageCommand { + input: Record; + constructor(input: Record) { + this.input = input; + } + } + + return { + DeleteMessageCommand, + ReceiveMessageCommand, + SendMessageCommand, + SQSClient: MockSQSClient, + }; +}); + +const waitFor = (ms = 20) => new Promise((resolve) => setTimeout(resolve, ms)); +const neverResolve = () => new Promise(() => {}); + +const baseConfig = { + clientConfig: { region: "us-east-1" }, + // eslint-disable-next-line unicorn/no-useless-undefined + handler: vi.fn().mockResolvedValue(undefined), + queueUrl: "https://sqs.us-east-1.amazonaws.com/123456789/test-queue", +}; + +describe("SQSAdapter", () => { + let adapter: SQSAdapter<{ key: string }>; + + beforeEach(() => { + vi.clearAllMocks(); + mockClientSend.mockImplementation(neverResolve); + adapter = new SQSAdapter("sqs-queue", baseConfig); + }); + + describe("start", () => { + it("should create an SQSClient with the provided config", async () => { + await adapter.start(); + + expect(MockSQSClient).toHaveBeenCalledWith(baseConfig.clientConfig); + }); + + it("should set isPolling to true when start is called", async () => { + await adapter.start(); + + expect(adapter["isPolling"]).toBe(true); + }); + + it("should send a ReceiveMessageCommand once polling starts", async () => { + await adapter.start(); + + // poll() calls send() synchronously before its first await + expect(mockClientSend).toHaveBeenCalledWith( + expect.any(ReceiveMessageCommand), + ); + }); + + it("should include custom receiveMessageOptions in the ReceiveMessageCommand", async () => { + const configWithOptions = { + ...baseConfig, + receiveMessageOptions: { + MaxNumberOfMessages: 5, + QueueUrl: baseConfig.queueUrl, + }, + }; + const customAdapter = new SQSAdapter("sqs-queue", configWithOptions); + + await customAdapter.start(); + + const callArgument = mockClientSend.mock + .calls[0][0] as ReceiveMessageCommand; + expect(callArgument.input).toMatchObject({ + MaxNumberOfMessages: 5, + QueueUrl: baseConfig.queueUrl, + }); + }); + }); + + describe("shutdown", () => { + it("should set isPolling to false and destroy the client", async () => { + await adapter.start(); + await adapter.shutdown(); + + expect(adapter["isPolling"]).toBe(false); + expect(mockClientDestroy).toHaveBeenCalledOnce(); + }); + + it("should not throw if called before start", async () => { + await expect(adapter.shutdown()).resolves.not.toThrow(); + }); + }); + + describe("getClient", () => { + it("should return the underlying SQSClient instance", async () => { + await adapter.start(); + + expect(adapter.getClient()).toBeDefined(); + expect(adapter.getClient()).toHaveProperty("send"); + }); + }); + + describe("push", () => { + it("should send a SendMessageCommand and return the message id", async () => { + await adapter.start(); + // The poll loop is suspended on neverResolve — this once-value goes to push + mockClientSend.mockResolvedValueOnce({ MessageId: "msg-abc-123" }); + + const id = await adapter.push({ key: "value" }); + + const sendCall = mockClientSend.mock.calls.find( + (call) => call[0] instanceof SendMessageCommand, + ); + expect(sendCall).toBeDefined(); + expect((sendCall![0] as SendMessageCommand).input).toMatchObject({ + MessageBody: JSON.stringify({ key: "value" }), + QueueUrl: baseConfig.queueUrl, + }); + expect(id).toBe("msg-abc-123"); + }); + + it("should spread extra options into the SendMessageCommand", async () => { + await adapter.start(); + mockClientSend.mockResolvedValueOnce({ MessageId: "msg-xyz" }); + + await adapter.push( + { key: "value" }, + { MessageGroupId: "group-1", MessageDeduplicationId: "dedup-1" }, + ); + + const sendCall = mockClientSend.mock.calls.find( + (call) => call[0] instanceof SendMessageCommand, + ); + expect((sendCall![0] as SendMessageCommand).input).toMatchObject({ + MessageDeduplicationId: "dedup-1", + MessageGroupId: "group-1", + }); + }); + + it("should throw a descriptive error when send fails", async () => { + await adapter.start(); + mockClientSend.mockRejectedValueOnce(new Error("SQS unavailable")); + + await expect(adapter.push({ key: "value" })).rejects.toThrowError( + "Failed to push job to SQS queue: sqs-queue. Error: SQS unavailable", + ); + }); + }); + + describe("polling", () => { + it("should call the handler and delete the message when a message is received", async () => { + // Create the adapter first so we can reference it inside the mock + const pollingAdapter = new SQSAdapter("sqs-queue", baseConfig); + let sendCallCount = 0; + mockClientSend.mockImplementation(async () => { + sendCallCount++; + if (sendCallCount === 1) { + return { + Messages: [ + { Body: '{"key":"polled"}', ReceiptHandle: "receipt-handle-1" }, + ], + }; + } + // After first receive + delete, stop the loop + pollingAdapter["isPolling"] = false; + return {}; + }); + + await pollingAdapter.start(); + await waitFor(); + + expect(baseConfig.handler).toHaveBeenCalledWith({ key: "polled" }); + + const deleteCall = mockClientSend.mock.calls.find( + (call) => call[0] instanceof DeleteMessageCommand, + ); + expect(deleteCall).toBeDefined(); + expect((deleteCall![0] as DeleteMessageCommand).input).toMatchObject({ + QueueUrl: baseConfig.queueUrl, + ReceiptHandle: "receipt-handle-1", + }); + }); + + it("should call onError when the handler throws during message processing", async () => { + const onError = vi.fn(); + const errorAdapter = new SQSAdapter("sqs-queue", { + ...baseConfig, + handler: vi.fn().mockRejectedValueOnce(new Error("handler error")), + onError, + }); + let sendCallCount = 0; + + mockClientSend.mockImplementation(async () => { + sendCallCount++; + if (sendCallCount === 1) { + return { + Messages: [ + { Body: '{"key":"value"}', ReceiptHandle: "receipt-handle-1" }, + ], + }; + } + errorAdapter["isPolling"] = false; + return {}; + }); + + await errorAdapter.start(); + await waitFor(); + + expect(onError).toHaveBeenCalledWith( + expect.objectContaining({ message: "handler error" }), + expect.objectContaining({ ReceiptHandle: "receipt-handle-1" }), + ); + }); + + it("should call onError when ReceiveMessageCommand itself fails", async () => { + const onError = vi.fn(); + const errorAdapter = new SQSAdapter("sqs-queue", { + ...baseConfig, + onError, + }); + let sendCallCount = 0; + + mockClientSend.mockImplementation(async () => { + sendCallCount++; + if (sendCallCount === 1) { + throw new Error("SQS network error"); + } + errorAdapter["isPolling"] = false; + return { Messages: [] }; + }); + + await errorAdapter.start(); + await waitFor(); + + expect(onError).toHaveBeenCalledWith( + expect.objectContaining({ message: "SQS network error" }), + ); + }); + + it("should not start a second polling loop if already polling", async () => { + await adapter.start(); + // Calling startPolling again while isPolling=true should be a no-op + adapter["startPolling"](); + + // Only the initial ReceiveMessageCommand should have been dispatched + expect(mockClientSend).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/packages/worker/src/__test__/queue/factory.test.ts b/packages/worker/src/__test__/queue/factory.test.ts new file mode 100644 index 000000000..e06056ed5 --- /dev/null +++ b/packages/worker/src/__test__/queue/factory.test.ts @@ -0,0 +1,101 @@ +import { describe, expect, it, vi } from "vitest"; + +import { QueueProvider } from "../../enum"; +import BullMQAdapter from "../../queue/adapters/bullmq"; +import SQSAdapter from "../../queue/adapters/sqs"; +import createQueueAdapter from "../../queue/factory"; + +vi.mock("../../queue/adapters/bullmq", () => ({ + default: vi.fn().mockImplementation((name: string) => ({ + queueName: name, + })), +})); + +vi.mock("../../queue/adapters/sqs", () => ({ + default: vi.fn().mockImplementation((name: string) => ({ + queueName: name, + })), +})); + +const mockBullMQConfig = { + handler: vi.fn(), + queueOptions: { + connection: { host: "localhost", port: 6379 }, + }, +}; + +const mockSQSConfig = { + clientConfig: { region: "us-east-1" }, + handler: vi.fn(), + queueUrl: "https://sqs.us-east-1.amazonaws.com/123/test-queue", +}; + +describe("createQueueAdapter", () => { + describe("BullMQ provider", () => { + it("should create a BullMQAdapter for BULLMQ provider", () => { + const config = { + bullmqConfig: mockBullMQConfig, + name: "test-queue", + provider: QueueProvider.BULLMQ, + }; + + const adapter = createQueueAdapter(config); + + expect(BullMQAdapter).toHaveBeenCalledWith( + "test-queue", + mockBullMQConfig, + ); + expect(adapter).toBeDefined(); + }); + + it("should throw when BullMQ config is missing", () => { + const config = { + name: "test-queue", + provider: QueueProvider.BULLMQ, + }; + + expect(() => createQueueAdapter(config)).toThrowError( + "BullMQ configuration is required for queue: test-queue", + ); + }); + }); + + describe("SQS provider", () => { + it("should create an SQSAdapter for SQS provider", () => { + const config = { + name: "sqs-queue", + provider: QueueProvider.SQS, + sqsConfig: mockSQSConfig, + }; + + const adapter = createQueueAdapter(config); + + expect(SQSAdapter).toHaveBeenCalledWith("sqs-queue", mockSQSConfig); + expect(adapter).toBeDefined(); + }); + + it("should throw when SQS config is missing", () => { + const config = { + name: "sqs-queue", + provider: QueueProvider.SQS, + }; + + expect(() => createQueueAdapter(config)).toThrowError( + "SQS configuration is required for queue: sqs-queue", + ); + }); + }); + + describe("unsupported provider", () => { + it("should throw for an unsupported provider value", () => { + const config = { + name: "unknown-queue", + provider: "kafka" as QueueProvider, + }; + + expect(() => createQueueAdapter(config)).toThrowError( + "Unsupported queue provider: kafka", + ); + }); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a128911c3..eb5d4e434 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -620,6 +620,9 @@ importers: '@prefabs.tech/tsconfig': specifier: 0.5.0 version: 0.5.0(@types/node@24.10.13) + '@vitest/coverage-istanbul': + specifier: 3.2.4 + version: 3.2.4(vitest@3.2.4(@types/node@24.10.13)(jiti@2.6.1)(yaml@2.8.1)) eslint: specifier: 9.39.2 version: 9.39.2(jiti@2.6.1)