Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ AWS_REGION=us-east-1
# VAULT_ADDR=https://vault.example.com:8200
# VAULT_TOKEN=s.xxxxxxxxxxxxxxxx
# VAULT_TRANSIT_PATH=transit/keys/stellar-keys
# Local AES-256-GCM fallback key (64-char hex, 32 bytes) used when Vault is unreachable
# VAULT_FALLBACK_KEY=

# Signing Key (Public key for SEP-1 info endpoint)
# This is the public key used for signing, NOT a private key
Expand Down Expand Up @@ -116,6 +118,9 @@ REDIS_URL=redis://localhost:6379
# FEATURE_FLAG_MULTISIG=true
# FEATURE_FLAG_BATCH_PAYMENTS=true

# Email (SendGrid with Nodemailer fallback)
# SENDGRID_API_KEY=SG.xxxxxxxxxxxxxxxx

# Note: NEVER commit plaintext private keys to this file.
# Use KEY_MANAGEMENT_BACKEND to encrypt keys at rest.
# For local development, use test keys only.
14 changes: 9 additions & 5 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@
},
"dependencies": {
"@aws-sdk/client-kms": "^3.500.0",
"@bull-board/api": "^5.21.0",
"@bull-board/express": "^5.21.0",
"@prisma/client": "^6.19.2",
"@sendgrid/mail": "^8.1.3",
"@stellar/stellar-sdk": "^14.6.1",
"cron-parser": "^4.9.0",
"@types/node-cron": "^3.0.11",
"@types/pdfkit": "^0.17.6",
"bullmq": "^5.7.0",
"cors": "^2.8.5",
"cron-parser": "^4.9.0",
"dotenv": "^16.4.5",
"express": "^4.19.2",
"express-rate-limit": "^7.1.0",
"ioredis": "^5.3.0",
"jsonwebtoken": "^9.0.3",
"multer": "^1.4.5-lts.2",
"node-cron": "^3.0.3",
"nodemailer": "^6.10.1",
"node-cron": "^4.2.1",
"node-vault": "^0.10.2",
"nodemailer": "^6.10.1",
"pdfkit": "^0.18.0",
"prom-client": "^15.1.0",
"rate-limit-redis": "^4.0.0",
Expand All @@ -60,12 +64,12 @@
"devDependencies": {
"@types/cors": "^2.8.19",
"@types/express": "^4.17.25",
"@types/node-cron": "^3.0.11",
"@types/jest": "^29.5.12",
"@types/jsonwebtoken": "^9.0.10",
"@types/multer": "^1.4.12",
"@types/nodemailer": "^6.4.17",
"@types/node": "^20.11.30",
"@types/node-cron": "^3.0.11",
"@types/nodemailer": "^6.4.17",
"@types/supertest": "^6.0.2",
"@types/swagger-jsdoc": "^6.0.4",
"@types/swagger-ui-express": "^4.1.8",
Expand Down
20 changes: 20 additions & 0 deletions backend/src/api/routes/queue-dashboard.route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Router } from 'express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { Queue } from 'bullmq';
import { queueConnection, QUEUE_NAMES } from '../../config/queue';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/api/queue-dashboard');

const queues = Object.values(QUEUE_NAMES).map(
(name) => new BullMQAdapter(new Queue(name, { connection: queueConnection }))
);

createBullBoard({ queues, serverAdapter });

const router = Router();
router.use('/', serverAdapter.getRouter());

export default router;
33 changes: 33 additions & 0 deletions backend/src/config/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { queueConnection } from './queue';

describe('BullMQ queue connection resiliency (#361)', () => {
it('has maxRetriesPerRequest set to null for BullMQ compatibility', () => {
expect(queueConnection.maxRetriesPerRequest).toBeNull();
});

it('has enableReadyCheck disabled', () => {
expect(queueConnection.enableReadyCheck).toBe(false);
});

it('retryStrategy returns capped delay', () => {
const strategy = queueConnection.retryStrategy as (times: number) => number;
expect(strategy(1)).toBe(100);
expect(strategy(10)).toBe(1000);
expect(strategy(100)).toBe(5000);
});

it('reconnectOnError returns true for READONLY errors', () => {
const fn = queueConnection.reconnectOnError as (err: Error) => boolean;
expect(fn(new Error('READONLY command not allowed'))).toBe(true);
});

it('reconnectOnError returns true for ECONNRESET errors', () => {
const fn = queueConnection.reconnectOnError as (err: Error) => boolean;
expect(fn(new Error('ECONNRESET'))).toBe(true);
});

it('reconnectOnError returns false for unrelated errors', () => {
const fn = queueConnection.reconnectOnError as (err: Error) => boolean;
expect(fn(new Error('WRONGTYPE'))).toBe(false);
});
});
14 changes: 8 additions & 6 deletions backend/src/config/queue.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { QueueOptions, WorkerOptions, JobsOptions } from 'bullmq';
import { redis } from '../lib/redis';

/**
* BullMQ Queue Configuration
*/

// Redis connection for BullMQ
// Redis connection for BullMQ with resiliency settings (#361)
export const queueConnection = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379', 10),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0', 10),
maxRetriesPerRequest: null,
enableReadyCheck: false,
retryStrategy: (times: number) => Math.min(times * 100, 5000),
reconnectOnError: (err: Error) => {
const targetErrors = ['READONLY', 'ECONNRESET', 'ETIMEDOUT'];
return targetErrors.some((e) => err.message.includes(e));
},
};

// Priority mapping — must be declared before jobTypeConfigs to allow direct references
Expand Down
4 changes: 4 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { notificationService } from './services/notification.service';
import { createEmailProvider, ConsoleSmsProvider, ConsolePushProvider } from './lib/notifications/providers';
import { NotificationType } from '@prisma/client';
import { validateKmsConfigOnStartup } from './lib/key-management.service';
import queueDashboardRouter from './api/routes/queue-dashboard.route';

// Initialize Notification Engine
notificationService.registerProvider(NotificationType.EMAIL, createEmailProvider());
Expand Down Expand Up @@ -144,6 +145,9 @@ app.use('/metrics', publicLimiter, metricsRouter);

app.use('/api/recurring-payments', recurringPaymentsRouter);

// BullMQ queue monitoring dashboard (#362) — admin-only in production
app.use('/api/queue-dashboard', queueDashboardRouter);

// Global error handling middleware (must be last)
app.use(errorHandler);

Expand Down
67 changes: 67 additions & 0 deletions backend/src/lib/email.service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { sendEmail, EmailPayload } from './email.service';

const payload: EmailPayload = {
to: 'user@example.com',
from: 'noreply@anchorpoint.app',
subject: 'Test',
text: 'Hello',
};

jest.mock('@sendgrid/mail', () => ({
setApiKey: jest.fn(),
send: jest.fn(),
}));

jest.mock('nodemailer', () => ({
createTransport: jest.fn().mockReturnValue({
sendMail: jest.fn().mockResolvedValue({}),
}),
}));

import sgMail from '@sendgrid/mail';
import nodemailer from 'nodemailer';

describe('sendEmail', () => {
beforeEach(() => {
jest.clearAllMocks();
delete process.env.SENDGRID_API_KEY;
delete process.env.SMTP_HOST;
});

it('sends via SendGrid when SENDGRID_API_KEY is set', async () => {
process.env.SENDGRID_API_KEY = 'SG.test';
(sgMail.send as jest.Mock).mockResolvedValue([{ statusCode: 202 }]);

await sendEmail(payload);

expect(sgMail.setApiKey).toHaveBeenCalledWith('SG.test');
expect(sgMail.send).toHaveBeenCalledTimes(1);
});

it('falls back to Nodemailer when SendGrid fails', async () => {
process.env.SENDGRID_API_KEY = 'SG.test';
process.env.SMTP_HOST = 'smtp.example.com';
(sgMail.send as jest.Mock).mockRejectedValue(new Error('SendGrid error'));
const mockSendMail = jest.fn().mockResolvedValue({});
(nodemailer.createTransport as jest.Mock).mockReturnValue({ sendMail: mockSendMail });

await sendEmail(payload);

expect(mockSendMail).toHaveBeenCalledTimes(1);
});

it('sends via Nodemailer when only SMTP_HOST is set', async () => {
process.env.SMTP_HOST = 'smtp.example.com';
const mockSendMail = jest.fn().mockResolvedValue({});
(nodemailer.createTransport as jest.Mock).mockReturnValue({ sendMail: mockSendMail });

await sendEmail(payload);

expect(mockSendMail).toHaveBeenCalledTimes(1);
expect(sgMail.send).not.toHaveBeenCalled();
});

it('does not throw when no transport is configured', async () => {
await expect(sendEmail(payload)).resolves.not.toThrow();
});
});
65 changes: 65 additions & 0 deletions backend/src/lib/email.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import sgMail from '@sendgrid/mail';
import nodemailer from 'nodemailer';
import logger from '../utils/logger';

export interface EmailPayload {
to: string;
from: string;
subject: string;
text: string;
html?: string;
}

async function sendViaSendGrid(payload: EmailPayload): Promise<void> {
sgMail.setApiKey(process.env.SENDGRID_API_KEY!);
await sgMail.send({
to: payload.to,
from: payload.from,
subject: payload.subject,
text: payload.text,
html: payload.html,
});
}

async function sendViaNodemailer(payload: EmailPayload): Promise<void> {
const transporter = nodemailer.createTransport({
host: process.env.SMTP_HOST,
port: parseInt(process.env.SMTP_PORT || '587', 10),
secure: parseInt(process.env.SMTP_PORT || '587', 10) === 465,
auth:
process.env.SMTP_USER && process.env.SMTP_PASS
? { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS }
: undefined,
});

await transporter.sendMail({
from: payload.from,
to: payload.to,
subject: payload.subject,
text: payload.text,
html: payload.html,
});
}

export async function sendEmail(payload: EmailPayload): Promise<void> {
if (process.env.SENDGRID_API_KEY) {
try {
await sendViaSendGrid(payload);
logger.debug('Email sent via SendGrid', { to: payload.to });
return;
} catch (err: unknown) {
logger.warn('SendGrid delivery failed, falling back to Nodemailer', {
to: payload.to,
error: err instanceof Error ? err.message : String(err),
});
}
}

if (process.env.SMTP_HOST) {
await sendViaNodemailer(payload);
logger.debug('Email sent via Nodemailer', { to: payload.to });
return;
}

logger.info('No email transport configured; email not sent', { to: payload.to, subject: payload.subject });
}
Loading