From 206e8a7a75668694adf577d50a64967b5de53168 Mon Sep 17 00:00:00 2001 From: m0nggh Date: Thu, 14 May 2026 17:10:44 +0800 Subject: [PATCH 1/4] add queue for ses events --- package-lock.json | 94 ++++++++-- packages/backend/package.json | 1 + packages/backend/src/config/app.ts | 2 + .../src/helpers/create-bull-board-handler.ts | 2 + .../backend/src/helpers/process-ses-event.ts | 53 ++++++ packages/backend/src/queues/ses-events.ts | 166 ++++++++++++++++++ packages/backend/src/worker.ts | 1 + 7 files changed, 303 insertions(+), 16 deletions(-) create mode 100644 packages/backend/src/queues/ses-events.ts diff --git a/package-lock.json b/package-lock.json index f30c66b669..3460194b12 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1008,6 +1008,58 @@ "node": ">=20.0.0" } }, + "node_modules/@aws-sdk/client-sqs": { + "version": "3.1045.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/client-sqs/-/client-sqs-3.1045.0.tgz", + "integrity": "sha512-reWeEE53mgCv8uUFSicvVf0A6BoWGImdC4y5x5clkeEmfIikahIBtons6u0d32kwI4NczpcretpUkOCE/nvWAA==", + "license": "Apache-2.0", + "dependencies": { + "@aws-crypto/sha256-browser": "5.2.0", + "@aws-crypto/sha256-js": "5.2.0", + "@aws-sdk/core": "^3.974.8", + "@aws-sdk/credential-provider-node": "^3.972.39", + "@aws-sdk/middleware-host-header": "^3.972.10", + "@aws-sdk/middleware-logger": "^3.972.10", + "@aws-sdk/middleware-recursion-detection": "^3.972.11", + "@aws-sdk/middleware-sdk-sqs": "^3.972.22", + "@aws-sdk/middleware-user-agent": "^3.972.38", + "@aws-sdk/region-config-resolver": "^3.972.13", + "@aws-sdk/types": "^3.973.8", + "@aws-sdk/util-endpoints": "^3.996.8", + "@aws-sdk/util-user-agent-browser": "^3.972.10", + "@aws-sdk/util-user-agent-node": "^3.973.24", + "@smithy/config-resolver": "^4.4.17", + "@smithy/core": "^3.23.17", + "@smithy/fetch-http-handler": "^5.3.17", + "@smithy/hash-node": "^4.2.14", + "@smithy/invalid-dependency": "^4.2.14", + "@smithy/md5-js": "^4.2.14", + "@smithy/middleware-content-length": "^4.2.14", + "@smithy/middleware-endpoint": "^4.4.32", + "@smithy/middleware-retry": "^4.5.7", + "@smithy/middleware-serde": "^4.2.20", + "@smithy/middleware-stack": "^4.2.14", + "@smithy/node-config-provider": "^4.3.14", + "@smithy/node-http-handler": "^4.6.1", + "@smithy/protocol-http": "^5.3.14", + "@smithy/smithy-client": "^4.12.13", + "@smithy/types": "^4.14.1", + "@smithy/url-parser": "^4.2.14", + "@smithy/util-base64": "^4.3.2", + "@smithy/util-body-length-browser": "^4.2.2", + "@smithy/util-body-length-node": "^4.2.3", + "@smithy/util-defaults-mode-browser": "^4.3.49", + "@smithy/util-defaults-mode-node": "^4.2.54", + "@smithy/util-endpoints": "^3.4.2", + "@smithy/util-middleware": "^4.2.14", + "@smithy/util-retry": "^4.3.6", + "@smithy/util-utf8": "^4.2.2", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/@aws-sdk/core": { "version": "3.974.8", "resolved": "https://registry.npmjs.org/@aws-sdk/core/-/core-3.974.8.tgz", @@ -1460,6 +1512,23 @@ "node": ">=20.0.0" } }, + "node_modules/@aws-sdk/middleware-sdk-sqs": { + "version": "3.972.22", + "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-sdk-sqs/-/middleware-sdk-sqs-3.972.22.tgz", + "integrity": "sha512-DtR3mEiOUJcnEX/QuXmvbJto6xvQzp2ftnHb29c0aQYdmmzbKf0gsu9ovx1i/yy4ZR6m0rttTucS0iiP32dlGA==", + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/types": "^3.973.8", + "@smithy/smithy-client": "^4.12.13", + "@smithy/types": "^4.14.1", + "@smithy/util-hex-encoding": "^4.2.2", + "@smithy/util-utf8": "^4.2.2", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/@aws-sdk/middleware-ssec": { "version": "3.972.9", "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-ssec/-/middleware-ssec-3.972.9.tgz", @@ -7680,20 +7749,13 @@ } }, "node_modules/@smithy/core": { - "version": "3.23.17", - "resolved": "https://registry.npmjs.org/@smithy/core/-/core-3.23.17.tgz", - "integrity": "sha512-x7BlLbUFL8NWCGjMF9C+1N5cVCxcPa7g6Tv9B4A2luWx3be3oU8hQ96wIwxe/s7OhIzvoJH73HAUSg5JXVlEtQ==", + "version": "3.24.1", + "resolved": "https://registry.npmjs.org/@smithy/core/-/core-3.24.1.tgz", + "integrity": "sha512-3mT7o4qQyUWttYnVK3A0Z/u3Xha3E81tXn32Tz6vjZiUXhBrkEivpw1hBYfh84iFF9CSzkBU9Y1DJ3Q6RQ231g==", "license": "Apache-2.0", "dependencies": { - "@smithy/protocol-http": "^5.3.14", + "@aws-crypto/crc32": "5.2.0", "@smithy/types": "^4.14.1", - "@smithy/url-parser": "^4.2.14", - "@smithy/util-base64": "^4.3.2", - "@smithy/util-body-length-browser": "^4.2.2", - "@smithy/util-middleware": "^4.2.14", - "@smithy/util-stream": "^4.5.25", - "@smithy/util-utf8": "^4.2.2", - "@smithy/uuid": "^1.1.2", "tslib": "^2.6.2" }, "engines": { @@ -7872,13 +7934,12 @@ } }, "node_modules/@smithy/md5-js": { - "version": "4.2.13", - "resolved": "https://registry.npmjs.org/@smithy/md5-js/-/md5-js-4.2.13.tgz", - "integrity": "sha512-cNm7I9NXolFxtS20ojROddOEpSAeI1Obq6pd1Kj5HtHws3s9Fkk8DdHDfQSs5KuxCewZuVK6UqrJnfJmiMzDuQ==", + "version": "4.3.1", + "resolved": "https://registry.npmjs.org/@smithy/md5-js/-/md5-js-4.3.1.tgz", + "integrity": "sha512-98NalujRdzv6ggVQNYPWpL2K57UKeUB8roIr61u6+JiHd7KUlMQ+sn/vk6IG4XxEjw2vlC7eu/xjYXshUE4XXg==", "license": "Apache-2.0", "dependencies": { - "@smithy/types": "^4.14.0", - "@smithy/util-utf8": "^4.2.2", + "@smithy/core": "^3.24.1", "tslib": "^2.6.2" }, "engines": { @@ -33986,6 +34047,7 @@ "@aws-sdk/client-dynamodb": "3.1027.0", "@aws-sdk/client-s3": "3.1027.0", "@aws-sdk/client-sesv2": "^3.1045.0", + "@aws-sdk/client-sqs": "^3.1045.0", "@aws-sdk/credential-providers": "^3.1045.0", "@aws-sdk/s3-presigned-post": "3.1027.0", "@bull-board/express": "5.17.0", diff --git a/packages/backend/package.json b/packages/backend/package.json index 9a3da7d300..bedb11e021 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -35,6 +35,7 @@ "@aws-sdk/client-dynamodb": "3.1027.0", "@aws-sdk/client-s3": "3.1027.0", "@aws-sdk/client-sesv2": "^3.1045.0", + "@aws-sdk/client-sqs": "^3.1045.0", "@aws-sdk/credential-providers": "^3.1045.0", "@aws-sdk/s3-presigned-post": "3.1027.0", "@bull-board/express": "5.17.0", diff --git a/packages/backend/src/config/app.ts b/packages/backend/src/config/app.ts index 6dbcdb42b2..a715cf6315 100644 --- a/packages/backend/src/config/app.ts +++ b/packages/backend/src/config/app.ts @@ -84,6 +84,7 @@ type AppConfig = { region: string roleArn: string configurationSet?: string + sqsQueueUrl?: string } } @@ -187,6 +188,7 @@ const appConfig: AppConfig = { ...(process.env.SES_CONFIGURATION_SET && { configurationSet: process.env.SES_CONFIGURATION_SET, }), + sqsQueueUrl: process.env.SQS_QUEUE_URL || undefined, }, } diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts index 3164f28a33..3f805d68be 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.ts +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -5,6 +5,7 @@ import { ExpressAdapter } from '@bull-board/express' import appConfig from '@/config/app' import { appActionQueues, mainActionQueue } from '@/queues/action' import flowQueue from '@/queues/flow' +import sesEventsQueue from '@/queues/ses-events' import triggerQueue from '@/queues/trigger' const serverAdapter = new ExpressAdapter() @@ -19,6 +20,7 @@ const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { new BullMQAdapter(flowQueue), new BullMQAdapter(triggerQueue), new BullMQAdapter(mainActionQueue), + new BullMQAdapter(sesEventsQueue), ...Object.values(appActionQueues).map( (queue) => new BullMQAdapter(queue), ), diff --git a/packages/backend/src/helpers/process-ses-event.ts b/packages/backend/src/helpers/process-ses-event.ts index 9e9a2dcece..e57b9d0656 100644 --- a/packages/backend/src/helpers/process-ses-event.ts +++ b/packages/backend/src/helpers/process-ses-event.ts @@ -1,6 +1,10 @@ +import { WorkerPro } from '@taskforcesh/bullmq-pro' + +import { createRedisClient } from '@/config/redis' import logger from '@/helpers/logger' import { SesEvent, SesEventType } from '@/helpers/ses-event-parser' import EmailSuppressionEntry from '@/models/email-suppression-entry' +import { startSqsPoller } from '@/queues/ses-events' export interface SesEventInput { sesEvent: SesEvent @@ -97,3 +101,52 @@ export async function processSesEvent(data: SesEventInput): Promise { return } } + +export const worker = new WorkerPro( + 'ses-events', + async (job) => { + await processSesEvent(job.data) + }, + { + prefix: '{sesEventsQ}', + connection: createRedisClient(), + concurrency: 5, + }, +) + +worker.on('completed', (job) => { + logger.info(`SES event job completed: ${job.id}`) +}) + +worker.on('failed', (job, err) => { + logger.error(`SES event job failed: ${job.id} — ${err.message}`, { + event: 'ses-event-job-failed', + jobId: job.id, + error: err.stack, + }) +}) + +worker.on('ready', () => { + logger.info('SES events worker is ready!') +}) + +worker.on('closed', () => { + logger.info('SES events worker is closed!') +}) + +worker.on('error', (err) => { + if (!err) { + logger.error('SES events worker undefined error') + return + } + logger.error(`SES events worker errored with ${err.message}`, { + err: err.stack, + }) +}) + +process.on('SIGTERM', async () => { + await worker.close() +}) + +// Start the SQS poller after worker is registered +startSqsPoller() diff --git a/packages/backend/src/queues/ses-events.ts b/packages/backend/src/queues/ses-events.ts new file mode 100644 index 0000000000..b04c655904 --- /dev/null +++ b/packages/backend/src/queues/ses-events.ts @@ -0,0 +1,166 @@ +import { + DeleteMessageCommand, + ReceiveMessageCommand, + SQSClient, +} from '@aws-sdk/client-sqs' +import { QueuePro } from '@taskforcesh/bullmq-pro' + +import appConfig from '@/config/app' +import { createRedisClient } from '@/config/redis' +import logger from '@/helpers/logger' +import { parseSqsMessage } from '@/helpers/ses-event-parser' + +const CONNECTION_REFUSED = 'ECONNREFUSED' + +const sesEventsQueue = new QueuePro('ses-events', { + prefix: '{sesEventsQ}', + connection: createRedisClient(), +}) + +sesEventsQueue.on('error', (err) => { + if ((err as NodeJS.ErrnoException).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err) + process.exit() + } +}) + +let isPolling = false + +process.on('SIGTERM', async () => { + // Stop the poll loop first so no new messages are received while we + // close the queue. In-flight enqueues finish before sesEventsQueue.close + // resolves; the long-poll receive call may take up to WaitTimeSeconds + // to wind down, which is acceptable for graceful shutdown. + stopSqsPoller() + await sesEventsQueue.close() +}) + +export async function startSqsPoller(): Promise { + const sqsQueueUrl = appConfig.ses.sqsQueueUrl + if (!sqsQueueUrl) { + logger.info('SQS_QUEUE_URL not set — SQS poller will not start.') + return + } + + if (isPolling) { + return + } + isPolling = true + + const sqsClient = new SQSClient({ + region: 'ap-southeast-1', + }) + + logger.info('SQS poller started', { queueUrl: sqsQueueUrl }) + + while (isPolling) { + try { + const response = await sqsClient.send( + new ReceiveMessageCommand({ + QueueUrl: sqsQueueUrl, + WaitTimeSeconds: 20, + MaxNumberOfMessages: 10, + }), + ) + + if (!response.Messages?.length) { + continue + } + + for (const message of response.Messages) { + // Parse-stage failures are unrecoverable: malformed JSON or missing + // body will keep failing on every retry. Delete poison messages so + // they don't cycle in the queue forever. The enqueue-stage failure + // path below is different — that one leaves the message visible for + // SQS to redeliver, since it's likely a transient Redis issue. + if (!message.Body) { + logger.error('SQS message missing Body — deleting as poison', { + event: 'sqs-poison-message', + messageId: message.MessageId, + }) + await sqsClient.send( + new DeleteMessageCommand({ + QueueUrl: sqsQueueUrl, + ReceiptHandle: message.ReceiptHandle, + }), + ) + continue + } + + let sesEvent + try { + sesEvent = parseSqsMessage(message.Body) + } catch (parseError) { + logger.error('Failed to parse SQS message — deleting as poison', { + event: 'sqs-poison-message', + messageId: message.MessageId, + bodyPreview: message.Body.slice(0, 200), + error: + parseError instanceof Error + ? parseError.message + : String(parseError), + }) + await sqsClient.send( + new DeleteMessageCommand({ + QueueUrl: sqsQueueUrl, + ReceiptHandle: message.ReceiptHandle, + }), + ) + continue + } + + try { + await sesEventsQueue.add( + `ses-${sesEvent.eventType}-${message.MessageId}`, + { + sesEvent, + sqsMessageId: message.MessageId, + }, + { + removeOnComplete: { age: 7 * 24 * 3600, count: 200 }, + removeOnFail: false, + attempts: 5, + backoff: { + type: 'exponential', + delay: 5000, + }, + }, + ) + + // Only delete after successful BullMQ enqueue + await sqsClient.send( + new DeleteMessageCommand({ + QueueUrl: sqsQueueUrl, + ReceiptHandle: message.ReceiptHandle, + }), + ) + } catch (enqueueError) { + // Transient enqueue failure (e.g. Redis down) — leave in SQS for + // redelivery after visibility timeout expires. + logger.error('Failed to enqueue SQS message — leaving in queue', { + event: 'sqs-enqueue-error', + messageId: message.MessageId, + error: + enqueueError instanceof Error + ? enqueueError.message + : String(enqueueError), + }) + } + } + } catch (pollError) { + // Polling-level failure (e.g. network issue) — back off and retry + logger.error('SQS polling error', { + event: 'sqs-poll-error', + error: + pollError instanceof Error ? pollError.message : String(pollError), + }) + await new Promise((resolve) => setTimeout(resolve, 5000)) + } + } +} + +export function stopSqsPoller(): void { + isPolling = false +} + +export default sesEventsQueue diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index bed7cbca48..db4996dc80 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -5,6 +5,7 @@ import '@/helpers/check-worker-readiness' import '@/workers/flow' import '@/workers/trigger' import '@/workers/action' +import '@/workers/ses-events' import logger from '@/helpers/logger' From fae68228e6ad7c61bbe3075177b29d4cbb4402ed Mon Sep 17 00:00:00 2001 From: m0nggh Date: Fri, 22 May 2026 19:02:28 +0800 Subject: [PATCH 2/4] move from using bullmq to sqs consumer --- package-lock.json | 34 +++ packages/backend/package.json | 1 + .../helpers/__tests__/ses-consumer.test.ts | 227 ++++++++++++++++++ .../src/helpers/create-bull-board-handler.ts | 2 - .../backend/src/helpers/process-ses-event.ts | 53 ---- packages/backend/src/helpers/ses-consumer.ts | 120 +++++++++ packages/backend/src/queues/ses-events.ts | 166 ------------- packages/backend/src/worker.ts | 4 +- 8 files changed, 385 insertions(+), 222 deletions(-) create mode 100644 packages/backend/src/helpers/__tests__/ses-consumer.test.ts create mode 100644 packages/backend/src/helpers/ses-consumer.ts delete mode 100644 packages/backend/src/queues/ses-events.ts diff --git a/package-lock.json b/package-lock.json index 3460194b12..8a29349abe 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29898,6 +29898,39 @@ "dev": true, "license": "BSD-3-Clause" }, + "node_modules/sqs-consumer": { + "version": "15.0.1", + "resolved": "https://registry.npmjs.org/sqs-consumer/-/sqs-consumer-15.0.1.tgz", + "integrity": "sha512-cDcSkekXxhLMn3u+FSnAG3Ryh6jwHMPna1/oqs2dUL0gvUXOLLMa2Yzu0K4QqwVyaH0MDZw1IU7+W+W618/xcw==", + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/client-sqs": "^3.1034.0", + "debug": "^4.4.3" + }, + "engines": { + "node": ">=22.0.0" + }, + "peerDependencies": { + "@aws-sdk/client-sqs": "^3.1034.0" + } + }, + "node_modules/sqs-consumer/node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, "node_modules/ssh-remote-port-forward": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/ssh-remote-port-forward/-/ssh-remote-port-forward-1.0.4.tgz", @@ -34104,6 +34137,7 @@ "rate-limiter-flexible": "2.4.1", "remark-breaks": "3.0.3", "remark-gfm": "3.0.1", + "sqs-consumer": "^15.0.1", "typescript": "^5.5.4", "ulid": "3.0.0", "urlcat": "3.1.0", diff --git a/packages/backend/package.json b/packages/backend/package.json index bedb11e021..c09191aabf 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -92,6 +92,7 @@ "rate-limiter-flexible": "2.4.1", "remark-breaks": "3.0.3", "remark-gfm": "3.0.1", + "sqs-consumer": "^15.0.1", "typescript": "^5.5.4", "ulid": "3.0.0", "urlcat": "3.1.0", diff --git a/packages/backend/src/helpers/__tests__/ses-consumer.test.ts b/packages/backend/src/helpers/__tests__/ses-consumer.test.ts new file mode 100644 index 0000000000..b387d3fb17 --- /dev/null +++ b/packages/backend/src/helpers/__tests__/ses-consumer.test.ts @@ -0,0 +1,227 @@ +import { readFileSync } from 'fs' +import { resolve } from 'path' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const mocks = vi.hoisted(() => ({ + consumerStart: vi.fn(), + consumerStop: vi.fn(), + consumerOn: vi.fn(), + consumerOnce: vi.fn(), + consumerCreate: vi.fn(), + processSesEvent: vi.fn(), + loggerInfo: vi.fn(), + loggerError: vi.fn(), + sqsQueueUrl: 'https://sqs.ap-southeast-1.amazonaws.com/123/ses-events', +})) + +vi.mock('sqs-consumer', () => ({ + Consumer: { + create: mocks.consumerCreate.mockImplementation(() => ({ + start: mocks.consumerStart, + stop: mocks.consumerStop, + on: mocks.consumerOn, + once: mocks.consumerOnce, + })), + }, +})) + +vi.mock('@/helpers/process-ses-event', () => ({ + processSesEvent: mocks.processSesEvent, +})) + +vi.mock('@/helpers/logger', () => ({ + default: { + info: mocks.loggerInfo, + error: mocks.loggerError, + }, +})) + +vi.mock('@/config/app', () => ({ + default: { + ses: { + get sqsQueueUrl() { + return mocks.sqsQueueUrl + }, + }, + }, +})) + +function loadFixture(name: string): string { + return readFileSync( + resolve(__dirname, '../../../ses-test-events', name), + 'utf-8', + ) +} + +async function importFresh() { + vi.resetModules() + return import('../ses-consumer') +} + +describe('SES consumer wiring', () => { + beforeEach(() => { + mocks.consumerCreate.mockClear() + mocks.consumerStart.mockClear() + mocks.consumerStop.mockClear() + mocks.consumerOn.mockClear() + mocks.consumerOnce.mockClear() + mocks.processSesEvent.mockReset() + mocks.loggerInfo.mockClear() + mocks.loggerError.mockClear() + mocks.sqsQueueUrl = + 'https://sqs.ap-southeast-1.amazonaws.com/123/ses-events' + }) + + afterEach(() => { + // Clear SIGTERM listeners registered by tests so they don't accumulate + process.removeAllListeners('SIGTERM') + }) + + it('does not start the consumer when SQS_QUEUE_URL is unset', async () => { + mocks.sqsQueueUrl = '' + + const { startSesConsumer } = await importFresh() + startSesConsumer() + + expect(mocks.consumerCreate).not.toHaveBeenCalled() + expect(mocks.consumerStart).not.toHaveBeenCalled() + expect(mocks.loggerInfo).toHaveBeenCalledWith( + expect.stringContaining('SQS_QUEUE_URL not set'), + ) + }) + + it('starts the consumer when SQS_QUEUE_URL is set', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + expect(mocks.consumerCreate).toHaveBeenCalledTimes(1) + expect(mocks.consumerStart).toHaveBeenCalledTimes(1) + + const createOptions = mocks.consumerCreate.mock.calls[0][0] + expect(createOptions.queueUrl).toBe(mocks.sqsQueueUrl) + expect(createOptions.batchSize).toBe(10) + expect(createOptions.waitTimeSeconds).toBe(20) + expect(typeof createOptions.handleMessage).toBe('function') + }) + + it('is idempotent — second call does not re-create the consumer', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + startSesConsumer() + + expect(mocks.consumerCreate).toHaveBeenCalledTimes(1) + expect(mocks.consumerStart).toHaveBeenCalledTimes(1) + }) + + it('handleMessage parses the body and dispatches to processSesEvent', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + const { handleMessage } = mocks.consumerCreate.mock.calls[0][0] + const message = { + MessageId: 'sqs-msg-1', + Body: loadFixture('ses-bounce-permanent.json'), + } + + const result = await handleMessage(message) + + expect(mocks.processSesEvent).toHaveBeenCalledTimes(1) + const call = mocks.processSesEvent.mock.calls[0][0] + expect(call.sqsMessageId).toBe('sqs-msg-1') + expect(call.sesEvent.eventType).toBe('Bounce') + // Returning the message tells sqs-consumer to delete (ack) it + expect(result).toBe(message) + }) + + it('handleMessage acks poison messages (unparseable body) and logs', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + const { handleMessage } = mocks.consumerCreate.mock.calls[0][0] + const message = { + MessageId: 'sqs-msg-poison', + Body: 'not valid json {{{', + } + + const result = await handleMessage(message) + + expect(mocks.processSesEvent).not.toHaveBeenCalled() + expect(result).toBe(message) + expect(mocks.loggerError).toHaveBeenCalledWith( + expect.stringContaining('Failed to parse SQS message'), + expect.objectContaining({ event: 'ses-poison-message' }), + ) + }) + + it('handleMessage propagates processSesEvent errors so SQS redelivers', async () => { + mocks.processSesEvent.mockRejectedValueOnce(new Error('db down')) + + const { startSesConsumer } = await importFresh() + startSesConsumer() + + const { handleMessage } = mocks.consumerCreate.mock.calls[0][0] + const message = { + MessageId: 'sqs-msg-retry', + Body: loadFixture('ses-bounce-permanent.json'), + } + + await expect(handleMessage(message)).rejects.toThrow('db down') + }) + + it('stopSesConsumer stops the consumer if it was started', async () => { + const { startSesConsumer, stopSesConsumer } = await importFresh() + startSesConsumer() + stopSesConsumer() + + expect(mocks.consumerStop).toHaveBeenCalledTimes(1) + }) + + it('stopSesConsumer is a no-op if the consumer was never started', async () => { + mocks.sqsQueueUrl = '' + + const { startSesConsumer, stopSesConsumer } = await importFresh() + startSesConsumer() + stopSesConsumer() + + expect(mocks.consumerStop).not.toHaveBeenCalled() + }) + + it('SIGTERM stops the consumer, awaits "stopped", then logs shutdown complete', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + expect(process.listenerCount('SIGTERM')).toBe(1) + + // Fire SIGTERM. shutdownOnSigterm should: + // 1. register a one-time 'stopped' listener on the consumer + // 2. call consumer.stop() + // 3. await the 'stopped' event + // 4. log 'shutdown complete' + process.emit('SIGTERM' as never) + + // Let the async handler register its 'stopped' listener and call stop() + await new Promise((resolve) => setImmediate(resolve)) + + expect(mocks.consumerStop).toHaveBeenCalledTimes(1) + const stoppedCall = mocks.consumerOnce.mock.calls.find( + ([eventName]) => eventName === 'stopped', + ) + expect(stoppedCall).toBeDefined() + + // Shutdown is still pending — log should NOT have fired yet + expect(mocks.loggerInfo).not.toHaveBeenCalledWith( + expect.stringContaining('shutdown complete'), + ) + + // Resolve the await by invoking the registered listener + const stoppedListener = stoppedCall![1] as () => void + stoppedListener() + + // Now the handler can complete and log + await new Promise((resolve) => setImmediate(resolve)) + + expect(mocks.loggerInfo).toHaveBeenCalledWith( + expect.stringContaining('shutdown complete'), + ) + }) +}) diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts index 3f805d68be..3164f28a33 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.ts +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -5,7 +5,6 @@ import { ExpressAdapter } from '@bull-board/express' import appConfig from '@/config/app' import { appActionQueues, mainActionQueue } from '@/queues/action' import flowQueue from '@/queues/flow' -import sesEventsQueue from '@/queues/ses-events' import triggerQueue from '@/queues/trigger' const serverAdapter = new ExpressAdapter() @@ -20,7 +19,6 @@ const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { new BullMQAdapter(flowQueue), new BullMQAdapter(triggerQueue), new BullMQAdapter(mainActionQueue), - new BullMQAdapter(sesEventsQueue), ...Object.values(appActionQueues).map( (queue) => new BullMQAdapter(queue), ), diff --git a/packages/backend/src/helpers/process-ses-event.ts b/packages/backend/src/helpers/process-ses-event.ts index e57b9d0656..9e9a2dcece 100644 --- a/packages/backend/src/helpers/process-ses-event.ts +++ b/packages/backend/src/helpers/process-ses-event.ts @@ -1,10 +1,6 @@ -import { WorkerPro } from '@taskforcesh/bullmq-pro' - -import { createRedisClient } from '@/config/redis' import logger from '@/helpers/logger' import { SesEvent, SesEventType } from '@/helpers/ses-event-parser' import EmailSuppressionEntry from '@/models/email-suppression-entry' -import { startSqsPoller } from '@/queues/ses-events' export interface SesEventInput { sesEvent: SesEvent @@ -101,52 +97,3 @@ export async function processSesEvent(data: SesEventInput): Promise { return } } - -export const worker = new WorkerPro( - 'ses-events', - async (job) => { - await processSesEvent(job.data) - }, - { - prefix: '{sesEventsQ}', - connection: createRedisClient(), - concurrency: 5, - }, -) - -worker.on('completed', (job) => { - logger.info(`SES event job completed: ${job.id}`) -}) - -worker.on('failed', (job, err) => { - logger.error(`SES event job failed: ${job.id} — ${err.message}`, { - event: 'ses-event-job-failed', - jobId: job.id, - error: err.stack, - }) -}) - -worker.on('ready', () => { - logger.info('SES events worker is ready!') -}) - -worker.on('closed', () => { - logger.info('SES events worker is closed!') -}) - -worker.on('error', (err) => { - if (!err) { - logger.error('SES events worker undefined error') - return - } - logger.error(`SES events worker errored with ${err.message}`, { - err: err.stack, - }) -}) - -process.on('SIGTERM', async () => { - await worker.close() -}) - -// Start the SQS poller after worker is registered -startSqsPoller() diff --git a/packages/backend/src/helpers/ses-consumer.ts b/packages/backend/src/helpers/ses-consumer.ts new file mode 100644 index 0000000000..6fb025afaa --- /dev/null +++ b/packages/backend/src/helpers/ses-consumer.ts @@ -0,0 +1,120 @@ +import { Message, SQSClient } from '@aws-sdk/client-sqs' +import { Consumer } from 'sqs-consumer' + +import appConfig from '@/config/app' +import logger from '@/helpers/logger' +import { processSesEvent } from '@/helpers/process-ses-event' +import { parseSqsMessage } from '@/helpers/ses-event-parser' + +let sesConsumer: Consumer | undefined + +function extractMessageIds( + message: Message | Message[] | undefined, +): string[] | undefined { + if (!message) { + return undefined + } + const ids = (Array.isArray(message) ? message : [message]) + .map((m) => m.MessageId) + .filter((id): id is string => Boolean(id)) + return ids.length > 0 ? ids : undefined +} + +export function startSesConsumer(): void { + const queueUrl = appConfig.ses.sqsQueueUrl + if (!queueUrl) { + logger.info('SQS_QUEUE_URL not set — SES consumer will not start.') + return + } + if (sesConsumer) { + return + } + + sesConsumer = Consumer.create({ + queueUrl, + sqs: new SQSClient({ region: 'ap-southeast-1' }), + batchSize: 10, + waitTimeSeconds: 20, + // visibilityTimeout intentionally unset — owned by SQS queue config in + // infra. heartbeatInterval would force us to also set visibilityTimeout + // here (the library validates heartbeat < visibility), and our handler + // is a single DB upsert that completes well under any sensible default. + pollingCompleteWaitTimeMs: 10_000, + // Contract with sqs-consumer: + // return message -> ack & delete + // throw -> nack; SQS redelivers per the queue's MaxReceiveCount + // Do NOT add a blanket try/catch here — the poison-message branch is + // the only intentional swallow. + handleMessage: async (message) => { + const messageId = message.MessageId ?? 'unknown' + + // Poison-message handling: until the DLQ ships (see ses-migration-plan + // "Alternatives Considered — DLQ"), catch parse failures and ack the + // message so it does not redeliver for the full SQS retention window. + // Switch this to a re-throw once the DLQ is in place. + let sesEvent + try { + sesEvent = parseSqsMessage(message.Body ?? '') + } catch (parseError) { + logger.error('Failed to parse SQS message — deleting as poison', { + event: 'ses-poison-message', + sqsMessageId: messageId, + bodyPreview: message.Body?.slice(0, 200), + err: + parseError instanceof Error ? parseError.stack : String(parseError), + }) + return message + } + + await processSesEvent({ sesEvent, sqsMessageId: messageId }) + return message + }, + }) + + sesConsumer.on('error', (err, message) => { + logger.error('SES consumer error', { + event: 'ses-consumer-error', + sqsMessageIds: extractMessageIds(message), + err: err.stack, + }) + }) + + sesConsumer.on('processing_error', (err, message) => { + logger.error( + 'SES consumer processing error — message will be redelivered', + { + event: 'ses-consumer-processing-error', + sqsMessageIds: extractMessageIds(message), + err: err.stack, + }, + ) + }) + + sesConsumer.on('stopped', () => { + logger.info('SES consumer stopped') + }) + + sesConsumer.start() + logger.info('SES consumer started', { queueUrl }) + + process.once('SIGTERM', shutdownOnSigterm) +} + +export function stopSesConsumer(): void { + sesConsumer?.stop() +} + +async function shutdownOnSigterm(): Promise { + if (!sesConsumer) { + return + } + const consumer = sesConsumer + const stopped = new Promise((resolve) => { + consumer.once('stopped', () => resolve()) + }) + consumer.stop() + // pollingCompleteWaitTimeMs (set at Consumer.create) caps this wait + // internally, so we do not need an external timeout race here. + await stopped + logger.info('SES consumer shutdown complete (SIGTERM)') +} diff --git a/packages/backend/src/queues/ses-events.ts b/packages/backend/src/queues/ses-events.ts deleted file mode 100644 index b04c655904..0000000000 --- a/packages/backend/src/queues/ses-events.ts +++ /dev/null @@ -1,166 +0,0 @@ -import { - DeleteMessageCommand, - ReceiveMessageCommand, - SQSClient, -} from '@aws-sdk/client-sqs' -import { QueuePro } from '@taskforcesh/bullmq-pro' - -import appConfig from '@/config/app' -import { createRedisClient } from '@/config/redis' -import logger from '@/helpers/logger' -import { parseSqsMessage } from '@/helpers/ses-event-parser' - -const CONNECTION_REFUSED = 'ECONNREFUSED' - -const sesEventsQueue = new QueuePro('ses-events', { - prefix: '{sesEventsQ}', - connection: createRedisClient(), -}) - -sesEventsQueue.on('error', (err) => { - if ((err as NodeJS.ErrnoException).code === CONNECTION_REFUSED) { - logger.error('Make sure you have installed Redis and it is running.', err) - process.exit() - } -}) - -let isPolling = false - -process.on('SIGTERM', async () => { - // Stop the poll loop first so no new messages are received while we - // close the queue. In-flight enqueues finish before sesEventsQueue.close - // resolves; the long-poll receive call may take up to WaitTimeSeconds - // to wind down, which is acceptable for graceful shutdown. - stopSqsPoller() - await sesEventsQueue.close() -}) - -export async function startSqsPoller(): Promise { - const sqsQueueUrl = appConfig.ses.sqsQueueUrl - if (!sqsQueueUrl) { - logger.info('SQS_QUEUE_URL not set — SQS poller will not start.') - return - } - - if (isPolling) { - return - } - isPolling = true - - const sqsClient = new SQSClient({ - region: 'ap-southeast-1', - }) - - logger.info('SQS poller started', { queueUrl: sqsQueueUrl }) - - while (isPolling) { - try { - const response = await sqsClient.send( - new ReceiveMessageCommand({ - QueueUrl: sqsQueueUrl, - WaitTimeSeconds: 20, - MaxNumberOfMessages: 10, - }), - ) - - if (!response.Messages?.length) { - continue - } - - for (const message of response.Messages) { - // Parse-stage failures are unrecoverable: malformed JSON or missing - // body will keep failing on every retry. Delete poison messages so - // they don't cycle in the queue forever. The enqueue-stage failure - // path below is different — that one leaves the message visible for - // SQS to redeliver, since it's likely a transient Redis issue. - if (!message.Body) { - logger.error('SQS message missing Body — deleting as poison', { - event: 'sqs-poison-message', - messageId: message.MessageId, - }) - await sqsClient.send( - new DeleteMessageCommand({ - QueueUrl: sqsQueueUrl, - ReceiptHandle: message.ReceiptHandle, - }), - ) - continue - } - - let sesEvent - try { - sesEvent = parseSqsMessage(message.Body) - } catch (parseError) { - logger.error('Failed to parse SQS message — deleting as poison', { - event: 'sqs-poison-message', - messageId: message.MessageId, - bodyPreview: message.Body.slice(0, 200), - error: - parseError instanceof Error - ? parseError.message - : String(parseError), - }) - await sqsClient.send( - new DeleteMessageCommand({ - QueueUrl: sqsQueueUrl, - ReceiptHandle: message.ReceiptHandle, - }), - ) - continue - } - - try { - await sesEventsQueue.add( - `ses-${sesEvent.eventType}-${message.MessageId}`, - { - sesEvent, - sqsMessageId: message.MessageId, - }, - { - removeOnComplete: { age: 7 * 24 * 3600, count: 200 }, - removeOnFail: false, - attempts: 5, - backoff: { - type: 'exponential', - delay: 5000, - }, - }, - ) - - // Only delete after successful BullMQ enqueue - await sqsClient.send( - new DeleteMessageCommand({ - QueueUrl: sqsQueueUrl, - ReceiptHandle: message.ReceiptHandle, - }), - ) - } catch (enqueueError) { - // Transient enqueue failure (e.g. Redis down) — leave in SQS for - // redelivery after visibility timeout expires. - logger.error('Failed to enqueue SQS message — leaving in queue', { - event: 'sqs-enqueue-error', - messageId: message.MessageId, - error: - enqueueError instanceof Error - ? enqueueError.message - : String(enqueueError), - }) - } - } - } catch (pollError) { - // Polling-level failure (e.g. network issue) — back off and retry - logger.error('SQS polling error', { - event: 'sqs-poll-error', - error: - pollError instanceof Error ? pollError.message : String(pollError), - }) - await new Promise((resolve) => setTimeout(resolve, 5000)) - } - } -} - -export function stopSqsPoller(): void { - isPolling = false -} - -export default sesEventsQueue diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index db4996dc80..9608d9d327 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -5,9 +5,11 @@ import '@/helpers/check-worker-readiness' import '@/workers/flow' import '@/workers/trigger' import '@/workers/action' -import '@/workers/ses-events' import logger from '@/helpers/logger' +import { startSesConsumer } from '@/helpers/ses-consumer' + +startSesConsumer() process.on('uncaughtException', (err) => { try { From 3ac1d563ec187c0a11ba81758c3692aeb6ca6075 Mon Sep 17 00:00:00 2001 From: m0nggh Date: Tue, 9 Jun 2026 00:21:53 +0800 Subject: [PATCH 3/4] add nits --- packages/backend/src/helpers/ses-consumer.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/backend/src/helpers/ses-consumer.ts b/packages/backend/src/helpers/ses-consumer.ts index 6fb025afaa..a1a9083462 100644 --- a/packages/backend/src/helpers/ses-consumer.ts +++ b/packages/backend/src/helpers/ses-consumer.ts @@ -22,14 +22,17 @@ function extractMessageIds( export function startSesConsumer(): void { const queueUrl = appConfig.ses.sqsQueueUrl + // Local dev does not need SES consumer if not testing for email suppressions, and we want to avoid noisy logs about missing SQS_QUEUE_URL, so short-circuit if not set. if (!queueUrl) { logger.info('SQS_QUEUE_URL not set — SES consumer will not start.') return } + if (sesConsumer) { return } + // Note that N consumers will be polling the same SQS queue with N ECS tasks, but each message goes to exactly one consumer, and the queue's visibility timeout prevents two from processing the same message. sesConsumer = Consumer.create({ queueUrl, sqs: new SQSClient({ region: 'ap-southeast-1' }), @@ -59,7 +62,7 @@ export function startSesConsumer(): void { logger.error('Failed to parse SQS message — deleting as poison', { event: 'ses-poison-message', sqsMessageId: messageId, - bodyPreview: message.Body?.slice(0, 200), + bodyLength: message.Body?.length, err: parseError instanceof Error ? parseError.stack : String(parseError), }) From 7522cc0c60cfbe80c90d895abfc542931a266b39 Mon Sep 17 00:00:00 2001 From: m0nggh Date: Tue, 9 Jun 2026 00:48:21 +0800 Subject: [PATCH 4/4] add small nits --- .../src/helpers/__tests__/ses-event-parser.test.ts | 2 +- packages/backend/src/helpers/ses-event-parser.ts | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts b/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts index 89a3bcfb85..267b878d8d 100644 --- a/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts +++ b/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts @@ -66,7 +66,7 @@ describe('ses-event-parser', () => { expect(() => parseSqsMessage(snsEnvelope)).toThrow(/eventType/) }) - it('should throw on an unhandled event type', () => { + it('should throw on an unhandled event type (e.g. Delivery)', () => { const snsEnvelope = JSON.stringify({ Type: 'Notification', Message: JSON.stringify({ eventType: 'Delivery', mail: {} }), diff --git a/packages/backend/src/helpers/ses-event-parser.ts b/packages/backend/src/helpers/ses-event-parser.ts index 53ba0ad2e1..f6edf55d17 100644 --- a/packages/backend/src/helpers/ses-event-parser.ts +++ b/packages/backend/src/helpers/ses-event-parser.ts @@ -55,10 +55,10 @@ const complaintEventSchema = z.object({ mail: sesMailSchema, }) -// The SQS queue's SNS subscription is filtered to Bounce + Complaint only, so -// the tagged union is exhaustive. Any other eventType is unexpected and fails -// validation here (surfaced by the consumer's error handling) rather than -// being silently accepted. +// Bounce + Complaint are the only events we handle. Any other eventType (the +// SES config set may also publish Delivery/Open/Send/... to the same topic) +// fails this strict union and is treated as a poison message by the consumer — +// the intended signal to tighten the SNS subscription filter upstream. const sesEventSchema = z.discriminatedUnion('eventType', [ bounceEventSchema, complaintEventSchema, @@ -78,7 +78,8 @@ export type SesEvent = z.infer * -> SES event (JSON string inside `Message`). * * Throws if the body isn't valid JSON (SyntaxError) or the payload doesn't - * match the expected shape (ZodError) — callers handle/log the failure. + * match a Bounce/Complaint event (ZodError) — the consumer treats either as a + * poison message. */ export function parseSqsMessage(sqsBody: string): SesEvent { const { Message } = snsEnvelopeSchema.parse(JSON.parse(sqsBody))