Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/worker/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
**/*.log*
/coverage
node_modules/
dist/
2 changes: 2 additions & 0 deletions packages/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"lint": "eslint .",
"lint:fix": "eslint . --fix",
"sort-package": "npx sort-package-json",
"test": "vitest run --coverage",
"typecheck": "tsc --noEmit -p tsconfig.json --composite false"
},
"dependencies": {
Expand All @@ -39,6 +40,7 @@
"@prefabs.tech/eslint-config": "0.5.0",
"@prefabs.tech/fastify-config": "0.93.5",
"@prefabs.tech/tsconfig": "0.5.0",
"@vitest/coverage-istanbul": "3.2.4",
"eslint": "9.39.2",
"fastify": "5.7.4",
"fastify-plugin": "5.1.0",
Expand Down
92 changes: 92 additions & 0 deletions packages/worker/src/__test__/cron/scheduler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { beforeEach, describe, expect, it, vi } from "vitest";

import CronScheduler from "../../cron/scheduler";

const { mockStop, mockSchedule } = vi.hoisted(() => {
const mockStop = vi.fn();
const mockSchedule = vi.fn().mockReturnValue({ stop: mockStop });

return { mockStop, mockSchedule };
});

vi.mock("node-cron", () => ({
default: {
schedule: mockSchedule,
},
}));

describe("CronScheduler", () => {
let scheduler: CronScheduler;

beforeEach(() => {
vi.clearAllMocks();
mockSchedule.mockReturnValue({ stop: mockStop });
scheduler = new CronScheduler();
});

describe("schedule", () => {
it("should schedule a cron job with the given expression and task", () => {
const task = vi.fn();
const job = { expression: "* * * * *", task };

scheduler.schedule(job);

expect(mockSchedule).toHaveBeenCalledWith("* * * * *", task, undefined);
});

it("should pass options to node-cron when provided", () => {
const task = vi.fn();
const options = { scheduled: true, timezone: "UTC" };
const job = { expression: "0 * * * *", task, options };

scheduler.schedule(job);

expect(mockSchedule).toHaveBeenCalledWith("0 * * * *", task, options);
});

it("should track multiple scheduled tasks", () => {
scheduler.schedule({ expression: "* * * * *", task: vi.fn() });
scheduler.schedule({ expression: "0 * * * *", task: vi.fn() });
scheduler.schedule({ expression: "0 0 * * *", task: vi.fn() });

expect(mockSchedule).toHaveBeenCalledTimes(3);
});
});

describe("stopAll", () => {
it("should stop all scheduled tasks", () => {
const mockStop1 = vi.fn();
const mockStop2 = vi.fn();

mockSchedule
.mockReturnValueOnce({ stop: mockStop1 })
.mockReturnValueOnce({ stop: mockStop2 });

scheduler.schedule({ expression: "* * * * *", task: vi.fn() });
scheduler.schedule({ expression: "0 * * * *", task: vi.fn() });

scheduler.stopAll();

expect(mockStop1).toHaveBeenCalledOnce();
expect(mockStop2).toHaveBeenCalledOnce();
});

it("should clear the tasks list after stopping", () => {
mockSchedule.mockReturnValue({ stop: vi.fn() });

scheduler.schedule({ expression: "* * * * *", task: vi.fn() });
scheduler.stopAll();

// Calling stopAll again should not call any stop methods
const newMockStop = vi.fn();
mockSchedule.mockReturnValue({ stop: newMockStop });
scheduler.stopAll();

expect(newMockStop).not.toHaveBeenCalled();
});

it("should do nothing when no tasks are scheduled", () => {
expect(() => scheduler.stopAll()).not.toThrow();
});
});
});
208 changes: 208 additions & 0 deletions packages/worker/src/__test__/jobOrchestrator.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

import { QueueProvider } from "../enum";
import JobOrchestrator from "../jobOrchestrator";

const { mockSchedule, mockStopAll, mockAdapterStart, mockAdapterShutdown } =
vi.hoisted(() => ({
mockSchedule: vi.fn(),
mockStopAll: vi.fn(),
// eslint-disable-next-line unicorn/no-useless-undefined
mockAdapterStart: vi.fn().mockResolvedValue(undefined),
// eslint-disable-next-line unicorn/no-useless-undefined
mockAdapterShutdown: vi.fn().mockResolvedValue(undefined),
}));

vi.mock("../cron", () => ({
CronScheduler: vi.fn().mockImplementation(() => ({
schedule: mockSchedule,
stopAll: mockStopAll,
})),
}));

vi.mock("../queue", async (importOriginal) => {
const original = await importOriginal<typeof import("../queue")>();

return {
...original,
createQueueAdapter: vi
.fn()
.mockImplementation((config: { name: string }) => ({
queueName: config.name,
start: mockAdapterStart,
shutdown: mockAdapterShutdown,
getClient: vi.fn(),
push: vi.fn(),
})),
};
});

describe("JobOrchestrator", () => {
let orchestrator: JobOrchestrator;

beforeEach(() => {
vi.clearAllMocks();
// eslint-disable-next-line unicorn/no-useless-undefined
mockAdapterStart.mockResolvedValue(undefined);
// eslint-disable-next-line unicorn/no-useless-undefined
mockAdapterShutdown.mockResolvedValue(undefined);
});

afterEach(async () => {
// Clear static registry between tests to prevent state leakage
await JobOrchestrator.adapters.shutdownAll();
});

describe("constructor", () => {
it("should create a CronScheduler instance", async () => {
const { CronScheduler } = vi.mocked(await import("../cron"));

orchestrator = new JobOrchestrator({ cronJobs: [], queues: [] });

expect(CronScheduler).toHaveBeenCalledOnce();
expect(orchestrator.cron).toBeDefined();
});
});

describe("start", () => {
it("should schedule all cron jobs on start", async () => {
const task = vi.fn();
orchestrator = new JobOrchestrator({
cronJobs: [
{ expression: "* * * * *", task },
{ expression: "0 * * * *", task },
],
});

await orchestrator.start();

expect(mockSchedule).toHaveBeenCalledTimes(2);
expect(mockSchedule).toHaveBeenCalledWith({
expression: "* * * * *",
task,
});
expect(mockSchedule).toHaveBeenCalledWith({
expression: "0 * * * *",
task,
});
});

it("should create and start all queue adapters on start", async () => {
const { createQueueAdapter } = vi.mocked(await import("../queue"));

orchestrator = new JobOrchestrator({
queues: [
{
bullmqConfig: {
handler: vi.fn(),
queueOptions: { connection: {} },
},
name: "queue-1",
provider: QueueProvider.BULLMQ,
},
{
bullmqConfig: {
handler: vi.fn(),
queueOptions: { connection: {} },
},
name: "queue-2",
provider: QueueProvider.BULLMQ,
},
],
});

await orchestrator.start();

expect(createQueueAdapter).toHaveBeenCalledTimes(2);
expect(mockAdapterStart).toHaveBeenCalledTimes(2);
});

it("should register adapters in the static registry", async () => {
orchestrator = new JobOrchestrator({
queues: [
{
bullmqConfig: {
handler: vi.fn(),
queueOptions: { connection: {} },
},
name: "my-queue",
provider: QueueProvider.BULLMQ,
},
],
});

await orchestrator.start();

expect(JobOrchestrator.adapters.has("my-queue")).toBe(true);
});

it("should not schedule any cron jobs when cronJobs is undefined", async () => {
orchestrator = new JobOrchestrator({ queues: [] });

await orchestrator.start();

expect(mockSchedule).not.toHaveBeenCalled();
});

it("should not create any adapters when queues is undefined", async () => {
const { createQueueAdapter } = vi.mocked(await import("../queue"));
orchestrator = new JobOrchestrator({ cronJobs: [] });

await orchestrator.start();

expect(createQueueAdapter).not.toHaveBeenCalled();
expect(mockAdapterStart).not.toHaveBeenCalled();
});
});

describe("shutdown", () => {
it("should stop all cron jobs on shutdown", async () => {
orchestrator = new JobOrchestrator({ cronJobs: [], queues: [] });
await orchestrator.start();

await orchestrator.shutdown();

expect(mockStopAll).toHaveBeenCalledOnce();
});

it("should shut down all registered adapters on shutdown", async () => {
orchestrator = new JobOrchestrator({
queues: [
{
bullmqConfig: {
handler: vi.fn(),
queueOptions: { connection: {} },
},
name: "shutdown-queue",
provider: QueueProvider.BULLMQ,
},
],
});

await orchestrator.start();
await orchestrator.shutdown();

expect(mockAdapterShutdown).toHaveBeenCalledOnce();
});

it("should clear the adapter registry after shutdown", async () => {
orchestrator = new JobOrchestrator({
queues: [
{
bullmqConfig: {
handler: vi.fn(),
queueOptions: { connection: {} },
},
name: "clear-queue",
provider: QueueProvider.BULLMQ,
},
],
});

await orchestrator.start();
await orchestrator.shutdown();

expect(JobOrchestrator.adapters.getAll()).toHaveLength(0);
});
});
});
Loading