Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
129dc14
feat: add package for fastify worker
BashisthaSudeep Feb 6, 2026
cb915ff
chore: add package depedencies
BashisthaSudeep Feb 6, 2026
aafddd9
feat: add cron scheduling support
BashisthaSudeep Feb 6, 2026
45520ae
feat: define queue config
BashisthaSudeep Feb 17, 2026
8acc9bc
feat: add bullmq client
BashisthaSudeep Feb 17, 2026
7f93f04
Merge remote-tracking branch 'origin/main' into feat/fastify-worker
BashisthaSudeep Feb 17, 2026
c32876f
feat: add support for triggering bullmq queue from app
BashisthaSudeep Feb 17, 2026
89e313d
feat: add sqs support for queue processing in worker (#1068)
BashisthaSudeep Feb 19, 2026
970a362
refactor: return queue client
BashisthaSudeep Feb 19, 2026
dcfb208
refactor: update structure for queues
BashisthaSudeep Feb 19, 2026
fd50d5c
chore: update pnpm-lock
BashisthaSudeep Feb 19, 2026
3eee54f
chore: add readme for worker package
BashisthaSudeep Feb 19, 2026
2f81b91
refactor: update queue error handling and config
BashisthaSudeep Feb 20, 2026
7df2d60
feat: add ability to initialize queue outside plugin
BashisthaSudeep Feb 20, 2026
b0df3ec
refactor: update queue client type
BashisthaSudeep Feb 20, 2026
fab8a39
chore: update readme
BashisthaSudeep Feb 20, 2026
eb4f912
fix: update bull mq job type
BashisthaSudeep Feb 20, 2026
bff48aa
chore: pin node-cron
BashisthaSudeep Feb 20, 2026
a9a9056
refactor: update queue implementation (#1069)
BashisthaSudeep Feb 23, 2026
4c331ca
chore: rename workers to JobOrchestrator
BashisthaSudeep Feb 23, 2026
ae07393
Merge branch 'main' of github.com:prefabs-tech/fastify into feat/fast…
uddhab Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/worker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules/
dist/
141 changes: 141 additions & 0 deletions packages/worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# @prefabs.tech/fastify-worker

A [Fastify](https://github.com/fastify/fastify) plugin for managing queue processes and cron tasks. It provides a unified interface for working with queues (BullMQ, SQS) and scheduling recurring tasks.

## Features

- **Cron Jobs**: Schedule recurring tasks using standard cron expressions
- **Queue System**: Queue management with support for BullMQ and AWS SQS
- **BullMQ Integration**: Redis-based message queues for high-performance background processing
- **AWS SQS Integration**: Support for Amazon Simple Queue Service

## Requirements
- [@prefabs.tech/fastify-config](https://www.npmjs.com/package/@prefabs.tech/fastify-config)

## Usage

### Fastify Plugin

Register the worker plugin with your Fastify instance:

```typescript
import workerPlugin from "@prefabs.tech/fastify-worker";
import Fastify from "fastify";

import config from "./config";

const start = async () => {
const fastify = Fastify({
logger: config.logger,
});

await fastify.register(workerPlugin);

await fastify.listen({
port: config.port,
host: "0.0.0.0",
});
};

start();
```

### Pushing to the queue

The `AdapterRegistry` is a singleton. Once the plugin initializes the worker, any service can access the same registry directly — no instance passing required:

```typescript
await fastify.register(workerPlugin);
```

```typescript
import { JobOrchestrator } from "@prefabs.tech/fastify-worker";

const queue = JobOrchestrator.adapters.get("queue-name")

if (queue) {
queue.push({ message: 'Hello world!' })
}
```

The plugin creates the `JobOrchestrator` instance, which populates `JobOrchestrator.adapters` on `start()`. Services import `JobOrchestrator` and access the static registry directly. On fastify close, `jobOrchestrator.shutdown()` drains all adapters.

### Standalone

Use the `JobOrchestrator` class directly without Fastify:

```typescript
import { JobOrchestrator } from "@prefabs.tech/fastify-worker";

const jobOrchestrator = new JobOrchestrator({
cronJobs: [...],
queues: [...],
});

await jobOrchestrator.start();

// later...
await jobOrchestrator.shutdown();
```

## Configuration

Add worker configuration to your config:

```typescript
import { QueueProvider } from "@prefabs.tech/fastify-worker";
import type { ApiConfig } from "@prefabs.tech/fastify-config";

const config: ApiConfig = {
// ...other config
worker: {
cronJobs: [
{
expression: "0 0 * * *",
task: async () => {
console.log("Running daily cleanup...");
},
options: {
scheduled: true,
timezone: "UTC",
},
},
],
queues: [
{
name: "bull-queue",
provider: QueueProvider.BULLMQ,
bullmqConfig: {
handler: async (job) => {
//
},
queueOptions: {
connection: {
host: "localhost",
port: 6379,
},
},
},
},
{
name: "sqs-queue",
provider: QueueProvider.SQS,
sqsConfig: {
clientConfig: {
credentials: {
accessKeyId: "",
secretAccessKey: "",
},
endpoint: "",
region: "",
},
handler: async (message) => {
//
},
queueUrl: "",
},
},
],
},
};
```
3 changes: 3 additions & 0 deletions packages/worker/eslint.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import fastifyConfig from "@prefabs.tech/eslint-config/fastify.js";

export default fastifyConfig;
60 changes: 60 additions & 0 deletions packages/worker/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
{
"name": "@prefabs.tech/fastify-worker",
"version": "0.93.4",
"description": "Fastify worker plugin",
"homepage": "https://github.com/prefabs-tech/fastify/tree/main/packages/worker#readme",
"repository": {
"type": "git",
"url": "git+https://github.com/prefabs-tech/fastify.git",
"directory": "packages/worker"
},
"license": "MIT",
"type": "module",
"exports": {
".": {
"import": "./dist/prefabs-tech-fastify-worker.js",
"require": "./dist/prefabs-tech-fastify-worker.cjs"
}
},
"main": "./dist/prefabs-tech-fastify-worker.cjs",
"module": "./dist/prefabs-tech-fastify-worker.js",
"types": "./dist/types/index.d.ts",
"files": [
"dist"
],
"scripts": {
"build": "vite build && tsc --emitDeclarationOnly && mv dist/src dist/types",
"lint": "eslint .",
"lint:fix": "eslint . --fix",
"sort-package": "npx sort-package-json",
"typecheck": "tsc --noEmit -p tsconfig.json --composite false"
},
"dependencies": {
"@aws-sdk/client-sqs": "3.991.0",
"bullmq": "5.69.3",
"node-cron": "4.2.1",
"zod": "3.25.76"
},
"devDependencies": {
"@prefabs.tech/eslint-config": "0.5.0",
"@prefabs.tech/fastify-config": "0.93.5",
"@prefabs.tech/tsconfig": "0.5.0",
"eslint": "9.39.2",
"fastify": "5.7.4",
"fastify-plugin": "5.1.0",
"prettier": "3.8.1",
"supertokens-node": "14.1.4",
"typescript": "5.9.3",
"vite": "6.4.1",
"vitest": "3.2.4"
},
"peerDependencies": {
"@prefabs.tech/fastify-config": "0.93.5",
"fastify": ">=5.2.1",
"fastify-plugin": ">=5.0.1",
"supertokens-node": ">=14.1.3"
},
"engines": {
"node": ">=20"
}
}
1 change: 1 addition & 0 deletions packages/worker/src/cron/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default as CronScheduler } from "./scheduler";
23 changes: 23 additions & 0 deletions packages/worker/src/cron/scheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import cron, { ScheduledTask } from "node-cron";

import { CronJob } from "../types";

class CronScheduler {
private tasks: ScheduledTask[] = [];

schedule(job: CronJob): void {
const task = cron.schedule(job.expression, job.task, job.options);

this.tasks.push(task);
}

stopAll(): void {
for (const task of this.tasks) {
task.stop();
}

this.tasks = [];
}
}

export default CronScheduler;
4 changes: 4 additions & 0 deletions packages/worker/src/enum/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum QueueProvider {
SQS = "sqs",
BULLMQ = "bullmq",
}
19 changes: 19 additions & 0 deletions packages/worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import "@prefabs.tech/fastify-config";

import { WorkerConfig } from "./types";

declare module "@prefabs.tech/fastify-config" {
interface ApiConfig {
worker: WorkerConfig;
}
}

export { SQSClient } from "@aws-sdk/client-sqs";
export { Job, Queue } from "bullmq";

export { default } from "./plugin";
export { default as JobOrchestrator } from "./jobOrchestrator";

export * from "./enum";
export * from "./queue";
export * from "./types";
38 changes: 38 additions & 0 deletions packages/worker/src/jobOrchestrator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { CronScheduler } from "./cron";
import { AdapterRegistry, createQueueAdapter } from "./queue";
import { WorkerConfig } from "./types";

class JobOrchestrator {
public static readonly adapters = new AdapterRegistry();
public readonly cron: CronScheduler;
private config: WorkerConfig;

constructor(config: WorkerConfig) {
this.config = config;
this.cron = new CronScheduler();
}

async start(): Promise<void> {
if (this.config.cronJobs) {
for (const job of this.config.cronJobs) {
this.cron.schedule(job);
}
}

if (this.config.queues) {
for (const queueConfig of this.config.queues) {
const adapter = createQueueAdapter(queueConfig);

await adapter.start();
JobOrchestrator.adapters.add(adapter);
}
}
}

async shutdown(): Promise<void> {
this.cron.stopAll();
await JobOrchestrator.adapters.shutdownAll();
}
}

export default JobOrchestrator;
29 changes: 29 additions & 0 deletions packages/worker/src/plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { FastifyInstance } from "fastify";
import FastifyPlugin from "fastify-plugin";

import JobOrchestrator from "./jobOrchestrator";

const plugin = async (fastify: FastifyInstance) => {
const { config, log } = fastify;

if (!config.worker) {
log.warn("Worker configuration is missing. Skipping plugin registration");

return;
}

log.info("Registering worker plugin");

const jobOrchestrator = new JobOrchestrator(config.worker);

await jobOrchestrator.start();

fastify.decorate("worker", jobOrchestrator);

fastify.addHook("onClose", async () => {
log.info("Shutting down worker");
await jobOrchestrator.shutdown();
});
};

export default FastifyPlugin(plugin);
35 changes: 35 additions & 0 deletions packages/worker/src/queue/adapterRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import QueueAdapter from "./adapters/base";

class AdapterRegistry {
private adapters = new Map<string, QueueAdapter>();

add(adapter: QueueAdapter): void {
this.adapters.set(adapter.queueName, adapter);
}

get(name: string): QueueAdapter | undefined {
return this.adapters.get(name);
}

getAll(): QueueAdapter[] {
return [...this.adapters.values()];
}

has(name: string): boolean {
return this.adapters.has(name);
}

remove(name: string): void {
this.adapters.delete(name);
}

async shutdownAll(): Promise<void> {
for (const adapter of this.adapters.values()) {
await adapter.shutdown();
}

this.adapters.clear();
}
}

export default AdapterRegistry;
17 changes: 17 additions & 0 deletions packages/worker/src/queue/adapters/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
abstract class QueueAdapter<Payload = unknown> {
public queueName: string;

constructor(name: string) {
this.queueName = name;
}

abstract start(): Promise<void>;
abstract shutdown(): Promise<void>;
abstract getClient(): unknown;
abstract push(
data: Payload,
options?: Record<string, unknown>,
): Promise<string>;
}

export default QueueAdapter;
Loading
Loading