diff --git a/package-lock.json b/package-lock.json index f30c66b66..8a29349ab 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1008,6 +1008,58 @@ "node": ">=20.0.0" } }, + "node_modules/@aws-sdk/client-sqs": { + "version": "3.1045.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/client-sqs/-/client-sqs-3.1045.0.tgz", + "integrity": "sha512-reWeEE53mgCv8uUFSicvVf0A6BoWGImdC4y5x5clkeEmfIikahIBtons6u0d32kwI4NczpcretpUkOCE/nvWAA==", + "license": "Apache-2.0", + "dependencies": { + "@aws-crypto/sha256-browser": "5.2.0", + "@aws-crypto/sha256-js": "5.2.0", + "@aws-sdk/core": "^3.974.8", + "@aws-sdk/credential-provider-node": "^3.972.39", + "@aws-sdk/middleware-host-header": "^3.972.10", + "@aws-sdk/middleware-logger": "^3.972.10", + "@aws-sdk/middleware-recursion-detection": "^3.972.11", + "@aws-sdk/middleware-sdk-sqs": "^3.972.22", + "@aws-sdk/middleware-user-agent": "^3.972.38", + "@aws-sdk/region-config-resolver": "^3.972.13", + "@aws-sdk/types": "^3.973.8", + "@aws-sdk/util-endpoints": "^3.996.8", + "@aws-sdk/util-user-agent-browser": "^3.972.10", + "@aws-sdk/util-user-agent-node": "^3.973.24", + "@smithy/config-resolver": "^4.4.17", + "@smithy/core": "^3.23.17", + "@smithy/fetch-http-handler": "^5.3.17", + "@smithy/hash-node": "^4.2.14", + "@smithy/invalid-dependency": "^4.2.14", + "@smithy/md5-js": "^4.2.14", + "@smithy/middleware-content-length": "^4.2.14", + "@smithy/middleware-endpoint": "^4.4.32", + "@smithy/middleware-retry": "^4.5.7", + "@smithy/middleware-serde": "^4.2.20", + "@smithy/middleware-stack": "^4.2.14", + "@smithy/node-config-provider": "^4.3.14", + "@smithy/node-http-handler": "^4.6.1", + "@smithy/protocol-http": "^5.3.14", + "@smithy/smithy-client": "^4.12.13", + "@smithy/types": "^4.14.1", + "@smithy/url-parser": "^4.2.14", + "@smithy/util-base64": "^4.3.2", + "@smithy/util-body-length-browser": "^4.2.2", + "@smithy/util-body-length-node": "^4.2.3", + "@smithy/util-defaults-mode-browser": "^4.3.49", + "@smithy/util-defaults-mode-node": "^4.2.54", + "@smithy/util-endpoints": "^3.4.2", + "@smithy/util-middleware": "^4.2.14", + "@smithy/util-retry": "^4.3.6", + "@smithy/util-utf8": "^4.2.2", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/@aws-sdk/core": { "version": "3.974.8", "resolved": "https://registry.npmjs.org/@aws-sdk/core/-/core-3.974.8.tgz", @@ -1460,6 +1512,23 @@ "node": ">=20.0.0" } }, + "node_modules/@aws-sdk/middleware-sdk-sqs": { + "version": "3.972.22", + "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-sdk-sqs/-/middleware-sdk-sqs-3.972.22.tgz", + "integrity": "sha512-DtR3mEiOUJcnEX/QuXmvbJto6xvQzp2ftnHb29c0aQYdmmzbKf0gsu9ovx1i/yy4ZR6m0rttTucS0iiP32dlGA==", + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/types": "^3.973.8", + "@smithy/smithy-client": "^4.12.13", + "@smithy/types": "^4.14.1", + "@smithy/util-hex-encoding": "^4.2.2", + "@smithy/util-utf8": "^4.2.2", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/@aws-sdk/middleware-ssec": { "version": "3.972.9", "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-ssec/-/middleware-ssec-3.972.9.tgz", @@ -7680,20 +7749,13 @@ } }, "node_modules/@smithy/core": { - "version": "3.23.17", - "resolved": "https://registry.npmjs.org/@smithy/core/-/core-3.23.17.tgz", - "integrity": "sha512-x7BlLbUFL8NWCGjMF9C+1N5cVCxcPa7g6Tv9B4A2luWx3be3oU8hQ96wIwxe/s7OhIzvoJH73HAUSg5JXVlEtQ==", + "version": "3.24.1", + "resolved": "https://registry.npmjs.org/@smithy/core/-/core-3.24.1.tgz", + "integrity": "sha512-3mT7o4qQyUWttYnVK3A0Z/u3Xha3E81tXn32Tz6vjZiUXhBrkEivpw1hBYfh84iFF9CSzkBU9Y1DJ3Q6RQ231g==", "license": "Apache-2.0", "dependencies": { - "@smithy/protocol-http": "^5.3.14", + "@aws-crypto/crc32": "5.2.0", "@smithy/types": "^4.14.1", - "@smithy/url-parser": "^4.2.14", - "@smithy/util-base64": "^4.3.2", - "@smithy/util-body-length-browser": "^4.2.2", - "@smithy/util-middleware": "^4.2.14", - "@smithy/util-stream": "^4.5.25", - "@smithy/util-utf8": "^4.2.2", - "@smithy/uuid": "^1.1.2", "tslib": "^2.6.2" }, "engines": { @@ -7872,13 +7934,12 @@ } }, "node_modules/@smithy/md5-js": { - "version": "4.2.13", - "resolved": "https://registry.npmjs.org/@smithy/md5-js/-/md5-js-4.2.13.tgz", - "integrity": "sha512-cNm7I9NXolFxtS20ojROddOEpSAeI1Obq6pd1Kj5HtHws3s9Fkk8DdHDfQSs5KuxCewZuVK6UqrJnfJmiMzDuQ==", + "version": "4.3.1", + "resolved": "https://registry.npmjs.org/@smithy/md5-js/-/md5-js-4.3.1.tgz", + "integrity": "sha512-98NalujRdzv6ggVQNYPWpL2K57UKeUB8roIr61u6+JiHd7KUlMQ+sn/vk6IG4XxEjw2vlC7eu/xjYXshUE4XXg==", "license": "Apache-2.0", "dependencies": { - "@smithy/types": "^4.14.0", - "@smithy/util-utf8": "^4.2.2", + "@smithy/core": "^3.24.1", "tslib": "^2.6.2" }, "engines": { @@ -29837,6 +29898,39 @@ "dev": true, "license": "BSD-3-Clause" }, + "node_modules/sqs-consumer": { + "version": "15.0.1", + "resolved": "https://registry.npmjs.org/sqs-consumer/-/sqs-consumer-15.0.1.tgz", + "integrity": "sha512-cDcSkekXxhLMn3u+FSnAG3Ryh6jwHMPna1/oqs2dUL0gvUXOLLMa2Yzu0K4QqwVyaH0MDZw1IU7+W+W618/xcw==", + "license": "Apache-2.0", + "dependencies": { + "@aws-sdk/client-sqs": "^3.1034.0", + "debug": "^4.4.3" + }, + "engines": { + "node": ">=22.0.0" + }, + "peerDependencies": { + "@aws-sdk/client-sqs": "^3.1034.0" + } + }, + "node_modules/sqs-consumer/node_modules/debug": { + "version": "4.4.3", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", + "integrity": "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, "node_modules/ssh-remote-port-forward": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/ssh-remote-port-forward/-/ssh-remote-port-forward-1.0.4.tgz", @@ -33986,6 +34080,7 @@ "@aws-sdk/client-dynamodb": "3.1027.0", "@aws-sdk/client-s3": "3.1027.0", "@aws-sdk/client-sesv2": "^3.1045.0", + "@aws-sdk/client-sqs": "^3.1045.0", "@aws-sdk/credential-providers": "^3.1045.0", "@aws-sdk/s3-presigned-post": "3.1027.0", "@bull-board/express": "5.17.0", @@ -34042,6 +34137,7 @@ "rate-limiter-flexible": "2.4.1", "remark-breaks": "3.0.3", "remark-gfm": "3.0.1", + "sqs-consumer": "^15.0.1", "typescript": "^5.5.4", "ulid": "3.0.0", "urlcat": "3.1.0", diff --git a/packages/backend/package.json b/packages/backend/package.json index 9a3da7d30..c09191aab 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -35,6 +35,7 @@ "@aws-sdk/client-dynamodb": "3.1027.0", "@aws-sdk/client-s3": "3.1027.0", "@aws-sdk/client-sesv2": "^3.1045.0", + "@aws-sdk/client-sqs": "^3.1045.0", "@aws-sdk/credential-providers": "^3.1045.0", "@aws-sdk/s3-presigned-post": "3.1027.0", "@bull-board/express": "5.17.0", @@ -91,6 +92,7 @@ "rate-limiter-flexible": "2.4.1", "remark-breaks": "3.0.3", "remark-gfm": "3.0.1", + "sqs-consumer": "^15.0.1", "typescript": "^5.5.4", "ulid": "3.0.0", "urlcat": "3.1.0", diff --git a/packages/backend/src/config/app.ts b/packages/backend/src/config/app.ts index 6dbcdb42b..a715cf631 100644 --- a/packages/backend/src/config/app.ts +++ b/packages/backend/src/config/app.ts @@ -84,6 +84,7 @@ type AppConfig = { region: string roleArn: string configurationSet?: string + sqsQueueUrl?: string } } @@ -187,6 +188,7 @@ const appConfig: AppConfig = { ...(process.env.SES_CONFIGURATION_SET && { configurationSet: process.env.SES_CONFIGURATION_SET, }), + sqsQueueUrl: process.env.SQS_QUEUE_URL || undefined, }, } diff --git a/packages/backend/src/helpers/__tests__/ses-consumer.test.ts b/packages/backend/src/helpers/__tests__/ses-consumer.test.ts new file mode 100644 index 000000000..b387d3fb1 --- /dev/null +++ b/packages/backend/src/helpers/__tests__/ses-consumer.test.ts @@ -0,0 +1,227 @@ +import { readFileSync } from 'fs' +import { resolve } from 'path' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +const mocks = vi.hoisted(() => ({ + consumerStart: vi.fn(), + consumerStop: vi.fn(), + consumerOn: vi.fn(), + consumerOnce: vi.fn(), + consumerCreate: vi.fn(), + processSesEvent: vi.fn(), + loggerInfo: vi.fn(), + loggerError: vi.fn(), + sqsQueueUrl: 'https://sqs.ap-southeast-1.amazonaws.com/123/ses-events', +})) + +vi.mock('sqs-consumer', () => ({ + Consumer: { + create: mocks.consumerCreate.mockImplementation(() => ({ + start: mocks.consumerStart, + stop: mocks.consumerStop, + on: mocks.consumerOn, + once: mocks.consumerOnce, + })), + }, +})) + +vi.mock('@/helpers/process-ses-event', () => ({ + processSesEvent: mocks.processSesEvent, +})) + +vi.mock('@/helpers/logger', () => ({ + default: { + info: mocks.loggerInfo, + error: mocks.loggerError, + }, +})) + +vi.mock('@/config/app', () => ({ + default: { + ses: { + get sqsQueueUrl() { + return mocks.sqsQueueUrl + }, + }, + }, +})) + +function loadFixture(name: string): string { + return readFileSync( + resolve(__dirname, '../../../ses-test-events', name), + 'utf-8', + ) +} + +async function importFresh() { + vi.resetModules() + return import('../ses-consumer') +} + +describe('SES consumer wiring', () => { + beforeEach(() => { + mocks.consumerCreate.mockClear() + mocks.consumerStart.mockClear() + mocks.consumerStop.mockClear() + mocks.consumerOn.mockClear() + mocks.consumerOnce.mockClear() + mocks.processSesEvent.mockReset() + mocks.loggerInfo.mockClear() + mocks.loggerError.mockClear() + mocks.sqsQueueUrl = + 'https://sqs.ap-southeast-1.amazonaws.com/123/ses-events' + }) + + afterEach(() => { + // Clear SIGTERM listeners registered by tests so they don't accumulate + process.removeAllListeners('SIGTERM') + }) + + it('does not start the consumer when SQS_QUEUE_URL is unset', async () => { + mocks.sqsQueueUrl = '' + + const { startSesConsumer } = await importFresh() + startSesConsumer() + + expect(mocks.consumerCreate).not.toHaveBeenCalled() + expect(mocks.consumerStart).not.toHaveBeenCalled() + expect(mocks.loggerInfo).toHaveBeenCalledWith( + expect.stringContaining('SQS_QUEUE_URL not set'), + ) + }) + + it('starts the consumer when SQS_QUEUE_URL is set', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + expect(mocks.consumerCreate).toHaveBeenCalledTimes(1) + expect(mocks.consumerStart).toHaveBeenCalledTimes(1) + + const createOptions = mocks.consumerCreate.mock.calls[0][0] + expect(createOptions.queueUrl).toBe(mocks.sqsQueueUrl) + expect(createOptions.batchSize).toBe(10) + expect(createOptions.waitTimeSeconds).toBe(20) + expect(typeof createOptions.handleMessage).toBe('function') + }) + + it('is idempotent — second call does not re-create the consumer', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + startSesConsumer() + + expect(mocks.consumerCreate).toHaveBeenCalledTimes(1) + expect(mocks.consumerStart).toHaveBeenCalledTimes(1) + }) + + it('handleMessage parses the body and dispatches to processSesEvent', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + const { handleMessage } = mocks.consumerCreate.mock.calls[0][0] + const message = { + MessageId: 'sqs-msg-1', + Body: loadFixture('ses-bounce-permanent.json'), + } + + const result = await handleMessage(message) + + expect(mocks.processSesEvent).toHaveBeenCalledTimes(1) + const call = mocks.processSesEvent.mock.calls[0][0] + expect(call.sqsMessageId).toBe('sqs-msg-1') + expect(call.sesEvent.eventType).toBe('Bounce') + // Returning the message tells sqs-consumer to delete (ack) it + expect(result).toBe(message) + }) + + it('handleMessage acks poison messages (unparseable body) and logs', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + const { handleMessage } = mocks.consumerCreate.mock.calls[0][0] + const message = { + MessageId: 'sqs-msg-poison', + Body: 'not valid json {{{', + } + + const result = await handleMessage(message) + + expect(mocks.processSesEvent).not.toHaveBeenCalled() + expect(result).toBe(message) + expect(mocks.loggerError).toHaveBeenCalledWith( + expect.stringContaining('Failed to parse SQS message'), + expect.objectContaining({ event: 'ses-poison-message' }), + ) + }) + + it('handleMessage propagates processSesEvent errors so SQS redelivers', async () => { + mocks.processSesEvent.mockRejectedValueOnce(new Error('db down')) + + const { startSesConsumer } = await importFresh() + startSesConsumer() + + const { handleMessage } = mocks.consumerCreate.mock.calls[0][0] + const message = { + MessageId: 'sqs-msg-retry', + Body: loadFixture('ses-bounce-permanent.json'), + } + + await expect(handleMessage(message)).rejects.toThrow('db down') + }) + + it('stopSesConsumer stops the consumer if it was started', async () => { + const { startSesConsumer, stopSesConsumer } = await importFresh() + startSesConsumer() + stopSesConsumer() + + expect(mocks.consumerStop).toHaveBeenCalledTimes(1) + }) + + it('stopSesConsumer is a no-op if the consumer was never started', async () => { + mocks.sqsQueueUrl = '' + + const { startSesConsumer, stopSesConsumer } = await importFresh() + startSesConsumer() + stopSesConsumer() + + expect(mocks.consumerStop).not.toHaveBeenCalled() + }) + + it('SIGTERM stops the consumer, awaits "stopped", then logs shutdown complete', async () => { + const { startSesConsumer } = await importFresh() + startSesConsumer() + + expect(process.listenerCount('SIGTERM')).toBe(1) + + // Fire SIGTERM. shutdownOnSigterm should: + // 1. register a one-time 'stopped' listener on the consumer + // 2. call consumer.stop() + // 3. await the 'stopped' event + // 4. log 'shutdown complete' + process.emit('SIGTERM' as never) + + // Let the async handler register its 'stopped' listener and call stop() + await new Promise((resolve) => setImmediate(resolve)) + + expect(mocks.consumerStop).toHaveBeenCalledTimes(1) + const stoppedCall = mocks.consumerOnce.mock.calls.find( + ([eventName]) => eventName === 'stopped', + ) + expect(stoppedCall).toBeDefined() + + // Shutdown is still pending — log should NOT have fired yet + expect(mocks.loggerInfo).not.toHaveBeenCalledWith( + expect.stringContaining('shutdown complete'), + ) + + // Resolve the await by invoking the registered listener + const stoppedListener = stoppedCall![1] as () => void + stoppedListener() + + // Now the handler can complete and log + await new Promise((resolve) => setImmediate(resolve)) + + expect(mocks.loggerInfo).toHaveBeenCalledWith( + expect.stringContaining('shutdown complete'), + ) + }) +}) diff --git a/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts b/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts index 89a3bcfb8..267b878d8 100644 --- a/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts +++ b/packages/backend/src/helpers/__tests__/ses-event-parser.test.ts @@ -66,7 +66,7 @@ describe('ses-event-parser', () => { expect(() => parseSqsMessage(snsEnvelope)).toThrow(/eventType/) }) - it('should throw on an unhandled event type', () => { + it('should throw on an unhandled event type (e.g. Delivery)', () => { const snsEnvelope = JSON.stringify({ Type: 'Notification', Message: JSON.stringify({ eventType: 'Delivery', mail: {} }), diff --git a/packages/backend/src/helpers/ses-consumer.ts b/packages/backend/src/helpers/ses-consumer.ts new file mode 100644 index 000000000..a1a908346 --- /dev/null +++ b/packages/backend/src/helpers/ses-consumer.ts @@ -0,0 +1,123 @@ +import { Message, SQSClient } from '@aws-sdk/client-sqs' +import { Consumer } from 'sqs-consumer' + +import appConfig from '@/config/app' +import logger from '@/helpers/logger' +import { processSesEvent } from '@/helpers/process-ses-event' +import { parseSqsMessage } from '@/helpers/ses-event-parser' + +let sesConsumer: Consumer | undefined + +function extractMessageIds( + message: Message | Message[] | undefined, +): string[] | undefined { + if (!message) { + return undefined + } + const ids = (Array.isArray(message) ? message : [message]) + .map((m) => m.MessageId) + .filter((id): id is string => Boolean(id)) + return ids.length > 0 ? ids : undefined +} + +export function startSesConsumer(): void { + const queueUrl = appConfig.ses.sqsQueueUrl + // Local dev does not need SES consumer if not testing for email suppressions, and we want to avoid noisy logs about missing SQS_QUEUE_URL, so short-circuit if not set. + if (!queueUrl) { + logger.info('SQS_QUEUE_URL not set — SES consumer will not start.') + return + } + + if (sesConsumer) { + return + } + + // Note that N consumers will be polling the same SQS queue with N ECS tasks, but each message goes to exactly one consumer, and the queue's visibility timeout prevents two from processing the same message. + sesConsumer = Consumer.create({ + queueUrl, + sqs: new SQSClient({ region: 'ap-southeast-1' }), + batchSize: 10, + waitTimeSeconds: 20, + // visibilityTimeout intentionally unset — owned by SQS queue config in + // infra. heartbeatInterval would force us to also set visibilityTimeout + // here (the library validates heartbeat < visibility), and our handler + // is a single DB upsert that completes well under any sensible default. + pollingCompleteWaitTimeMs: 10_000, + // Contract with sqs-consumer: + // return message -> ack & delete + // throw -> nack; SQS redelivers per the queue's MaxReceiveCount + // Do NOT add a blanket try/catch here — the poison-message branch is + // the only intentional swallow. + handleMessage: async (message) => { + const messageId = message.MessageId ?? 'unknown' + + // Poison-message handling: until the DLQ ships (see ses-migration-plan + // "Alternatives Considered — DLQ"), catch parse failures and ack the + // message so it does not redeliver for the full SQS retention window. + // Switch this to a re-throw once the DLQ is in place. + let sesEvent + try { + sesEvent = parseSqsMessage(message.Body ?? '') + } catch (parseError) { + logger.error('Failed to parse SQS message — deleting as poison', { + event: 'ses-poison-message', + sqsMessageId: messageId, + bodyLength: message.Body?.length, + err: + parseError instanceof Error ? parseError.stack : String(parseError), + }) + return message + } + + await processSesEvent({ sesEvent, sqsMessageId: messageId }) + return message + }, + }) + + sesConsumer.on('error', (err, message) => { + logger.error('SES consumer error', { + event: 'ses-consumer-error', + sqsMessageIds: extractMessageIds(message), + err: err.stack, + }) + }) + + sesConsumer.on('processing_error', (err, message) => { + logger.error( + 'SES consumer processing error — message will be redelivered', + { + event: 'ses-consumer-processing-error', + sqsMessageIds: extractMessageIds(message), + err: err.stack, + }, + ) + }) + + sesConsumer.on('stopped', () => { + logger.info('SES consumer stopped') + }) + + sesConsumer.start() + logger.info('SES consumer started', { queueUrl }) + + process.once('SIGTERM', shutdownOnSigterm) +} + +export function stopSesConsumer(): void { + sesConsumer?.stop() +} + +async function shutdownOnSigterm(): Promise { + if (!sesConsumer) { + return + } + const consumer = sesConsumer + const stopped = new Promise((resolve) => { + consumer.once('stopped', () => resolve()) + }) + consumer.stop() + // pollingCompleteWaitTimeMs (set at Consumer.create) caps this wait + // internally, so we do not need an external timeout race here. + await stopped + logger.info('SES consumer shutdown complete (SIGTERM)') +} diff --git a/packages/backend/src/helpers/ses-event-parser.ts b/packages/backend/src/helpers/ses-event-parser.ts index 53ba0ad2e..f6edf55d1 100644 --- a/packages/backend/src/helpers/ses-event-parser.ts +++ b/packages/backend/src/helpers/ses-event-parser.ts @@ -55,10 +55,10 @@ const complaintEventSchema = z.object({ mail: sesMailSchema, }) -// The SQS queue's SNS subscription is filtered to Bounce + Complaint only, so -// the tagged union is exhaustive. Any other eventType is unexpected and fails -// validation here (surfaced by the consumer's error handling) rather than -// being silently accepted. +// Bounce + Complaint are the only events we handle. Any other eventType (the +// SES config set may also publish Delivery/Open/Send/... to the same topic) +// fails this strict union and is treated as a poison message by the consumer — +// the intended signal to tighten the SNS subscription filter upstream. const sesEventSchema = z.discriminatedUnion('eventType', [ bounceEventSchema, complaintEventSchema, @@ -78,7 +78,8 @@ export type SesEvent = z.infer * -> SES event (JSON string inside `Message`). * * Throws if the body isn't valid JSON (SyntaxError) or the payload doesn't - * match the expected shape (ZodError) — callers handle/log the failure. + * match a Bounce/Complaint event (ZodError) — the consumer treats either as a + * poison message. */ export function parseSqsMessage(sqsBody: string): SesEvent { const { Message } = snsEnvelopeSchema.parse(JSON.parse(sqsBody)) diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index bed7cbca4..9608d9d32 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -7,6 +7,9 @@ import '@/workers/trigger' import '@/workers/action' import logger from '@/helpers/logger' +import { startSesConsumer } from '@/helpers/ses-consumer' + +startSesConsumer() process.on('uncaughtException', (err) => { try {