diff --git a/migrations/010_saved_search_cap.sql b/migrations/010_saved_search_cap.sql new file mode 100644 index 0000000..97ae3e6 --- /dev/null +++ b/migrations/010_saved_search_cap.sql @@ -0,0 +1,33 @@ +CREATE OR REPLACE FUNCTION enforce_saved_search_cap() +RETURNS TRIGGER AS $$ +DECLARE + active_count INTEGER; +BEGIN + IF NEW.deleted_at IS NOT NULL THEN + RETURN NEW; + END IF; + + PERFORM pg_advisory_xact_lock(hashtext(NEW.user_id::text)); + + SELECT COUNT(*)::int + INTO active_count + FROM saved_searches + WHERE user_id = NEW.user_id + AND deleted_at IS NULL + AND (TG_OP <> 'UPDATE' OR search_id <> NEW.search_id); + + IF active_count >= 10 THEN + RAISE EXCEPTION 'You can save at most 10 searches' + USING ERRCODE = '23514', + CONSTRAINT = 'saved_searches_active_cap_per_user'; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trg_saved_searches_cap ON saved_searches; + +CREATE TRIGGER trg_saved_searches_cap + BEFORE INSERT OR UPDATE OF user_id, deleted_at ON saved_searches + FOR EACH ROW EXECUTE FUNCTION enforce_saved_search_cap(); diff --git a/src/cron/savedSearchAlert.js b/src/cron/savedSearchAlert.js index b11ef1f..8db8bbe 100644 --- a/src/cron/savedSearchAlert.js +++ b/src/cron/savedSearchAlert.js @@ -3,126 +3,177 @@ import cron from "node-cron"; import { pool } from "../db/client.js"; import { logger } from "../logger/index.js"; -import { enqueueNotification } from "../workers/notificationQueue.js"; +import { enqueueNotificationsBulk } from "../workers/notificationQueue.js"; const SCHEDULE = process.env.CRON_SAVED_SEARCH_ALERT ?? "0 8 * * *"; +const SEARCH_BATCH_SIZE = Number(process.env.SAVED_SEARCH_ALERT_BATCH_SIZE) || 500; +const NOTIFICATION_CHUNK_SIZE = Number(process.env.SAVED_SEARCH_ALERT_NOTIFICATION_CHUNK_SIZE) || 100; -const buildWhereClause = (filters, params) => { - const clauses = [`l.status = 'active'`, `l.deleted_at IS NULL`, `l.expires_at > NOW()`]; - let p = params.length + 1; - - if (filters.city) { - const escaped = filters.city.replace(/\\/g, "\\\\").replace(/%/g, "\\%").replace(/_/g, "\\_"); - clauses.push(`LOWER(l.city) LIKE LOWER($${p}) ESCAPE '\\'`); - params.push(`${escaped}%`); - p++; - } - if (filters.minRent !== undefined) { - clauses.push(`l.rent_per_month >= $${p}`); - params.push(filters.minRent * 100); - p++; - } - if (filters.maxRent !== undefined) { - clauses.push(`l.rent_per_month <= $${p}`); - params.push(filters.maxRent * 100); - p++; - } - if (filters.roomType) { - clauses.push(`l.room_type = $${p}::room_type_enum`); - params.push(filters.roomType); - p++; - } - if (filters.bedType) { - clauses.push(`l.bed_type = $${p}::bed_type_enum`); - params.push(filters.bedType); - p++; - } - if (filters.preferredGender) { - clauses.push(`(l.preferred_gender = $${p}::gender_enum OR l.preferred_gender IS NULL)`); - params.push(filters.preferredGender); - p++; - } - if (filters.listingType) { - clauses.push(`l.listing_type = $${p}::listing_type_enum`); - params.push(filters.listingType); - p++; +const chunk = (items, size) => { + const chunks = []; + for (let i = 0; i < items.length; i += size) { + chunks.push(items.slice(i, i + size)); } - if (filters.availableFrom) { - clauses.push(`l.available_from <= $${p}`); - params.push(filters.availableFrom); - p++; - } - if (filters.amenityIds?.length) { - const ids = [...new Set(filters.amenityIds)]; - clauses.push( - `EXISTS ( - SELECT 1 FROM listing_amenities la - WHERE la.listing_id = l.listing_id - AND la.amenity_id = ANY($${p}::uuid[]) - GROUP BY la.listing_id - HAVING COUNT(DISTINCT la.amenity_id) = $${p + 1} - )`, - ); - params.push(ids, ids.length); - p += 2; - } - - return clauses; + return chunks; }; -const runSavedSearchAlert = async () => { - const startedAt = Date.now(); - logger.info("cron:savedSearchAlert — starting run"); - - const { rows: searches } = await pool.query( - `SELECT search_id, user_id, filters, last_alerted_at - FROM saved_searches - WHERE deleted_at IS NULL`, +const fetchMatchedSearches = async (cursorId) => { + const queryStartedAt = Date.now(); + const { rows } = await pool.query( + `WITH search_batch AS ( + SELECT search_id, user_id, filters, last_alerted_at + FROM saved_searches + WHERE deleted_at IS NULL + AND ($1::uuid IS NULL OR search_id > $1::uuid) + ORDER BY search_id ASC + LIMIT $2 + ), + candidate_listings AS ( + SELECT l.* + FROM listings l + WHERE l.status = 'active' + AND l.deleted_at IS NULL + AND l.expires_at > NOW() + AND EXISTS (SELECT 1 FROM search_batch) + AND l.created_at > ( + SELECT COALESCE(MIN(last_alerted_at), 'epoch'::timestamptz) + FROM search_batch + ) + ), + matched_searches AS ( + SELECT DISTINCT s.search_id, s.user_id + FROM search_batch s + JOIN candidate_listings l + ON l.created_at > COALESCE(s.last_alerted_at, 'epoch'::timestamptz) + WHERE + ( + NOT (s.filters ? 'city') + OR LOWER(l.city) LIKE LOWER( + REPLACE(REPLACE(REPLACE(s.filters->>'city', '\\', '\\\\'), '%', '\\%'), '_', '\\_') || '%' + ) ESCAPE '\\' + ) + AND (NOT (s.filters ? 'minRent') OR l.rent_per_month >= ((s.filters->>'minRent')::numeric * 100)::int) + AND (NOT (s.filters ? 'maxRent') OR l.rent_per_month <= ((s.filters->>'maxRent')::numeric * 100)::int) + AND (NOT (s.filters ? 'roomType') OR l.room_type::text = s.filters->>'roomType') + AND (NOT (s.filters ? 'bedType') OR l.bed_type::text = s.filters->>'bedType') + AND ( + NOT (s.filters ? 'preferredGender') + OR l.preferred_gender::text = s.filters->>'preferredGender' + OR l.preferred_gender IS NULL + ) + AND (NOT (s.filters ? 'listingType') OR l.listing_type::text = s.filters->>'listingType') + AND (NOT (s.filters ? 'availableFrom') OR l.available_from <= (s.filters->>'availableFrom')::date) + AND ( + NOT (s.filters ? 'amenityIds') + OR jsonb_array_length(s.filters->'amenityIds') = 0 + OR EXISTS ( + SELECT 1 + FROM listing_amenities la + WHERE la.listing_id = l.listing_id + AND la.amenity_id = ANY( + ARRAY( + SELECT jsonb_array_elements_text(s.filters->'amenityIds')::uuid + ) + ) + GROUP BY la.listing_id + HAVING COUNT(DISTINCT la.amenity_id) = jsonb_array_length(s.filters->'amenityIds') + ) + ) + ), + batch_meta AS ( + SELECT + (SELECT search_id FROM search_batch ORDER BY search_id DESC LIMIT 1) AS batch_cursor_id, + (SELECT COUNT(*)::int FROM search_batch) AS batch_count + ) + SELECT + bm.batch_cursor_id, + bm.batch_count, + ms.search_id, + ms.user_id + FROM batch_meta bm + LEFT JOIN matched_searches ms ON TRUE + ORDER BY ms.search_id ASC`, + [cursorId, SEARCH_BATCH_SIZE], ); - if (!searches.length) { - logger.debug({ durationMs: Date.now() - startedAt }, "cron:savedSearchAlert — no saved searches"); - return; - } - - let alerted = 0; + return { rows, queryDurationMs: Date.now() - queryStartedAt }; +}; - for (const search of searches) { - try { - const filters = search.filters ?? {}; - const params = [search.last_alerted_at ?? new Date(0)]; - const clauses = buildWhereClause(filters, params); - clauses.push(`l.created_at > $1`); +const markSearchesAlerted = async (searchIds) => { + if (!searchIds.length) return 0; - const { rows: newListings } = await pool.query( - `SELECT l.listing_id FROM listings l - LEFT JOIN properties p ON p.property_id = l.property_id AND p.deleted_at IS NULL - WHERE ${clauses.join(" AND ")} - LIMIT 1`, - params, - ); + const { rowCount } = await pool.query( + `UPDATE saved_searches + SET last_alerted_at = NOW() + WHERE search_id = ANY($1::uuid[]) + AND deleted_at IS NULL`, + [searchIds], + ); - if (!newListings.length) continue; + return rowCount; +}; - await pool.query(`UPDATE saved_searches SET last_alerted_at = NOW() WHERE search_id = $1`, [ - search.search_id, - ]); +export const runSavedSearchAlert = async () => { + const startedAt = Date.now(); + logger.info("cron:savedSearchAlert — starting run"); - enqueueNotification({ - recipientId: search.user_id, + let cursorId = null; + let scanned = 0; + let matched = 0; + let enqueued = 0; + let updated = 0; + let queryDurationMs = 0; + + while (true) { + const { rows, queryDurationMs: batchQueryDurationMs } = await fetchMatchedSearches(cursorId); + queryDurationMs += batchQueryDurationMs; + + const batchCursorId = rows[0]?.batch_cursor_id ?? null; + const batchCount = Number(rows[0]?.batch_count ?? 0); + scanned += batchCount; + + const notifications = rows + .filter((row) => row.search_id) + .map((row) => ({ + recipientId: row.user_id, type: "saved_search_alert", entityType: "saved_search", - entityId: search.search_id, - }); + entityId: row.search_id, + })); + + matched += notifications.length; - alerted++; - } catch (err) { - logger.error({ err, searchId: search.search_id }, "cron:savedSearchAlert — failed to process search"); + for (const notificationChunk of chunk(notifications, NOTIFICATION_CHUNK_SIZE)) { + enqueued += await enqueueNotificationsBulk(notificationChunk); } + + const batchUpdated = await markSearchesAlerted(notifications.map((notification) => notification.entityId)); + updated += batchUpdated; + + logger.debug( + { + batchCount, + batchMatched: notifications.length, + batchUpdated, + batchQueryDurationMs, + cursorId: batchCursorId, + }, + "cron:savedSearchAlert — batch processed", + ); + + if (batchCount < SEARCH_BATCH_SIZE || batchCursorId === null) break; + cursorId = batchCursorId; } logger.info( - { alerted, total: searches.length, durationMs: Date.now() - startedAt }, + { + scanned, + matched, + enqueued, + updated, + queryDurationMs, + durationMs: Date.now() - startedAt, + }, "cron:savedSearchAlert — run complete", ); }; diff --git a/src/services/listing.service.js b/src/services/listing.service.js index 0338ca0..40144c4 100644 --- a/src/services/listing.service.js +++ b/src/services/listing.service.js @@ -4,7 +4,6 @@ import { pool } from "../db/client.js"; import { logger } from "../logger/index.js"; import { AppError } from "../middleware/errorHandler.js"; import { assertPgOwnerVerified } from "../db/utils/pgOwner.js"; -import { scoreListingsForUser } from "../db/utils/compatibility.js"; import { expirePendingRequestsForListing } from "./interest.service.js"; import { EXPIRED_LISTING_MESSAGE, UNAVAILABLE_LISTING_MESSAGE } from "./listingLifecycle.js"; import { dedupePreferencesByKey } from "../config/preferences.js"; @@ -369,6 +368,7 @@ export const searchListings = async (userId, filters) => { radius, amenityIds = [], cursorTime, + cursorScore, cursorId, limit = 20, } = filters; @@ -460,8 +460,25 @@ export const searchListings = async (userId, filters) => { p += 2; } + if (sortBy === "compatibility" && cursorScore !== undefined && cursorId !== undefined) { + clauses.push(`( + compatibility_score.score < $${p} + OR (compatibility_score.score = $${p} AND l.listing_id > $${p + 1}::uuid) + )`); + params.push(cursorScore, cursorId); + p += 2; + } + + const userIdParam = p; + params.push(userId); + p++; + params.push(limit + 1); const limitParam = p; + const orderBy = + sortBy === "compatibility" ? + `compatibility_score.score DESC, l.listing_id ASC` + : `l.created_at DESC, l.listing_id ASC`; const { rows } = await pool.query( `SELECT @@ -497,10 +514,32 @@ export const searchListings = async (userId, filters) => { WHEN ri_loc.rent_index_id IS NOT NULL THEN 'locality' WHEN ri_city.rent_index_id IS NOT NULL THEN 'city' ELSE NULL - END AS ri_resolution + END AS ri_resolution, + compatibility_score.score AS compatibility_score, + listing_preference_count.preference_count AS listing_preference_count, + EXISTS ( + SELECT 1 FROM user_preferences up_exists WHERE up_exists.user_id = $${userIdParam}::uuid + ) AS user_has_preferences FROM listings l JOIN users u ON u.user_id = l.posted_by LEFT JOIN properties p ON p.property_id = l.property_id AND p.deleted_at IS NULL + CROSS JOIN LATERAL ( + SELECT CASE + WHEN $${userIdParam}::uuid IS NULL THEN 0 + ELSE COUNT(*)::int + END AS score + FROM listing_preferences lp_score + JOIN user_preferences up_score + ON up_score.preference_key = lp_score.preference_key + AND up_score.preference_value = lp_score.preference_value + AND up_score.user_id = $${userIdParam}::uuid + WHERE lp_score.listing_id = l.listing_id + ) compatibility_score + CROSS JOIN LATERAL ( + SELECT COUNT(*)::int AS preference_count + FROM listing_preferences lp_count + WHERE lp_count.listing_id = l.listing_id + ) listing_preference_count LEFT JOIN rent_index ri_loc ON ri_loc.city = l.city AND ri_loc.locality = NULLIF(LOWER(TRIM(COALESCE(l.locality, ''))), '') @@ -510,7 +549,7 @@ export const searchListings = async (userId, filters) => { AND ri_city.locality IS NULL AND ri_city.room_type = l.room_type WHERE ${clauses.join(" AND ")} - ORDER BY l.created_at DESC, l.listing_id ASC + ORDER BY ${orderBy} LIMIT $${limitParam}`, params, ); @@ -518,53 +557,33 @@ export const searchListings = async (userId, filters) => { const hasNextPage = rows.length > limit; const items = hasNextPage ? rows.slice(0, limit) : rows; - let scoreMap = {}; - let userHasPreferences = false; - let listingPreferenceCounts = new Map(); - - if (userId !== null) { - const listingIds = items.map((r) => r.listing_id); - scoreMap = await scoreListingsForUser(userId, listingIds); - - const { rows: preferenceRows } = await pool.query( - `SELECT EXISTS ( - SELECT 1 FROM user_preferences WHERE user_id = $1 - ) AS has_preferences`, - [userId], - ); - userHasPreferences = preferenceRows[0]?.has_preferences === true; - - if (listingIds.length) { - const { rows: listingPreferenceRows } = await pool.query( - `SELECT listing_id, COUNT(*)::int AS preference_count - FROM listing_preferences - WHERE listing_id = ANY($1::uuid[]) - GROUP BY listing_id`, - [listingIds], - ); - listingPreferenceCounts = new Map( - listingPreferenceRows.map((row) => [row.listing_id, Number(row.preference_count)]), - ); - } - } - const enrichedItems = items.map((row) => ({ ...row, rentPerMonth: row.rent_per_month / 100, depositAmount: row.deposit_amount / 100, rent_per_month: undefined, deposit_amount: undefined, - compatibilityScore: userId !== null ? (scoreMap[row.listing_id] ?? 0) : 0, + compatibilityScore: userId !== null ? Number(row.compatibility_score ?? 0) : 0, compatibilityAvailable: - userId !== null && userHasPreferences && (listingPreferenceCounts.get(row.listing_id) ?? 0) > 0, + userId !== null && row.user_has_preferences === true && Number(row.listing_preference_count ?? 0) > 0, rentDeviation: rentDeviationPct(row.rent_per_month, row.ri_p50), + compatibility_score: undefined, + listing_preference_count: undefined, + user_has_preferences: undefined, ri_p50: undefined, ri_resolution: undefined, })); if (sortBy === "compatibility") { - enrichedItems.sort((a, b) => b.compatibilityScore - a.compatibilityScore); - return { items: enrichedItems, nextCursor: null }; + const nextCursor = + hasNextPage ? + { + cursorScore: enrichedItems[enrichedItems.length - 1].compatibilityScore, + cursorId: enrichedItems[enrichedItems.length - 1].listing_id, + } + : null; + + return { items: enrichedItems, nextCursor }; } const nextCursor = diff --git a/src/services/savedSearch.service.js b/src/services/savedSearch.service.js index 4851397..851c9ef 100644 --- a/src/services/savedSearch.service.js +++ b/src/services/savedSearch.service.js @@ -4,27 +4,54 @@ import { pool } from "../db/client.js"; import { logger } from "../logger/index.js"; import { AppError } from "../middleware/errorHandler.js"; +// Keep this in sync with the saved_searches_active_cap_per_user DB trigger. const MAX_SAVED_SEARCHES_PER_USER = 10; +const SAVED_SEARCH_CAP_CONSTRAINT = "saved_searches_active_cap_per_user"; +const savedSearchLimitMessage = `You can save at most ${MAX_SAVED_SEARCHES_PER_USER} searches`; export const createSavedSearch = async (userId, { name, filters }) => { - const { rows: countRows } = await pool.query( - `SELECT COUNT(*)::int AS count FROM saved_searches WHERE user_id = $1 AND deleted_at IS NULL`, - [userId], - ); - - if (countRows[0].count >= MAX_SAVED_SEARCHES_PER_USER) { - throw new AppError(`You can save at most ${MAX_SAVED_SEARCHES_PER_USER} searches`, 422); + const client = await pool.connect(); + + try { + await client.query("BEGIN"); + + await client.query(`SELECT pg_advisory_xact_lock(hashtext($1::text))`, [userId]); + + const { rows: countRows } = await client.query( + `SELECT COUNT(*)::int AS count FROM saved_searches WHERE user_id = $1 AND deleted_at IS NULL`, + [userId], + ); + + if (countRows[0].count >= MAX_SAVED_SEARCHES_PER_USER) { + throw new AppError(savedSearchLimitMessage, 422); + } + + const { rows } = await client.query( + `INSERT INTO saved_searches (user_id, name, filters) + VALUES ($1, $2, $3) + RETURNING search_id, name, filters, last_alerted_at, created_at`, + [userId, name, JSON.stringify(filters)], + ); + + await client.query("COMMIT"); + + logger.info({ userId, searchId: rows[0].search_id }, "Saved search created"); + return rows[0]; + } catch (err) { + try { + await client.query("ROLLBACK"); + } catch (rollbackErr) { + logger.error({ rollbackErr }, "createSavedSearch: rollback failed"); + } + + if (err instanceof AppError) throw err; + if (err.code === "23514" && err.constraint === SAVED_SEARCH_CAP_CONSTRAINT) { + throw new AppError(savedSearchLimitMessage, 422); + } + throw err; + } finally { + client.release(); } - - const { rows } = await pool.query( - `INSERT INTO saved_searches (user_id, name, filters) - VALUES ($1, $2, $3) - RETURNING search_id, name, filters, last_alerted_at, created_at`, - [userId, name, JSON.stringify(filters)], - ); - - logger.info({ userId, searchId: rows[0].search_id }, "Saved search created"); - return rows[0]; }; export const listSavedSearches = async (userId) => { diff --git a/src/validators/listing.validators.js b/src/validators/listing.validators.js index 8156808..d07c4ca 100644 --- a/src/validators/listing.validators.js +++ b/src/validators/listing.validators.js @@ -22,38 +22,41 @@ export const listingParamsSchema = z.object({ }); export const searchListingsSchema = z.object({ - query: buildKeysetPaginationQuerySchema({ - sortBy: z.enum(["recent", "compatibility"]).default("recent"), + query: buildKeysetPaginationQuerySchema( + { + sortBy: z.enum(["recent", "compatibility"]).default("recent"), - city: z.string().min(1).max(100).optional(), + city: z.string().min(1).max(100).optional(), - minRent: z.coerce.number().int().min(0).optional(), - maxRent: z.coerce.number().int().min(0).optional(), + minRent: z.coerce.number().int().min(0).optional(), + maxRent: z.coerce.number().int().min(0).optional(), - roomType: z.enum(["single", "double", "triple", "entire_flat"]).optional(), - bedType: z.enum(["single_bed", "double_bed", "bunk_bed"]).optional(), + roomType: z.enum(["single", "double", "triple", "entire_flat"]).optional(), + bedType: z.enum(["single_bed", "double_bed", "bunk_bed"]).optional(), - preferredGender: z.enum(["male", "female", "other", "prefer_not_to_say"]).optional(), + preferredGender: z.enum(["male", "female", "other", "prefer_not_to_say"]).optional(), - listingType: z.enum(["student_room", "pg_room", "hostel_bed"]).optional(), + listingType: z.enum(["student_room", "pg_room", "hostel_bed"]).optional(), - availableFrom: z.string().date({ error: "availableFrom must be a valid date (YYYY-MM-DD)" }).optional(), + availableFrom: z.string().date({ error: "availableFrom must be a valid date (YYYY-MM-DD)" }).optional(), - lat: z.coerce.number().min(-90).max(90).optional(), - lng: z.coerce.number().min(-180).max(180).optional(), - radius: z.coerce.number().int().min(100).max(50_000).default(5_000), + lat: z.coerce.number().min(-90).max(90).optional(), + lng: z.coerce.number().min(-180).max(180).optional(), + radius: z.coerce.number().int().min(100).max(50_000).default(5_000), - amenityIds: z.preprocess( - (val) => { - if (typeof val !== "string") return val; - return val - .split(",") - .map((s) => s.trim()) - .filter(Boolean); - }, - z.array(z.uuid({ error: "Each amenity ID must be a valid UUID" })).default([]), - ), - }) + amenityIds: z.preprocess( + (val) => { + if (typeof val !== "string") return val; + return val + .split(",") + .map((s) => s.trim()) + .filter(Boolean); + }, + z.array(z.uuid({ error: "Each amenity ID must be a valid UUID" })).default([]), + ), + }, + { allowCursorScore: true }, + ) .refine( (data) => { if (data.minRent !== undefined && data.maxRent !== undefined) { @@ -73,6 +76,16 @@ export const searchListingsSchema = z.object({ error: "lat and lng must be provided together for proximity search", path: ["lng"], }, + ) + .refine( + (data) => { + if (data.sortBy === "compatibility") return data.cursorTime === undefined; + return data.cursorScore === undefined; + }, + { + error: "compatibility sorting uses cursorScore; recent sorting uses cursorTime", + path: ["cursorScore"], + }, ), }); diff --git a/src/validators/pagination.validators.js b/src/validators/pagination.validators.js index b78ef03..424e922 100644 --- a/src/validators/pagination.validators.js +++ b/src/validators/pagination.validators.js @@ -2,26 +2,28 @@ import { z } from "zod"; export const strictCursorTimeSchema = z.iso.datetime({ offset: true }).optional(); -const keysetPaginationFields = { +const buildKeysetPaginationFields = (allowCursorScore) => ({ cursorTime: strictCursorTimeSchema, + ...(allowCursorScore ? { cursorScore: z.coerce.number().int().min(0).optional() } : {}), cursorId: z.uuid({ error: "cursorId must be a valid UUID" }).optional(), limit: z.coerce.number().int().min(1).max(100).default(20), -}; +}); -export const buildKeysetPaginationQuerySchema = (extraFields = {}) => +export const buildKeysetPaginationQuerySchema = (extraFields = {}, { allowCursorScore = false } = {}) => z .object({ ...extraFields, - ...keysetPaginationFields, + ...buildKeysetPaginationFields(allowCursorScore), }) .refine( (data) => { const hasTime = data.cursorTime !== undefined; + const hasScore = allowCursorScore && data.cursorScore !== undefined; const hasId = data.cursorId !== undefined; - return hasTime === hasId; + return hasId === (hasTime || hasScore) && !(hasTime && hasScore); }, { - error: "cursorTime and cursorId must be provided together", + error: "cursorId must be provided with exactly one cursor value", path: ["cursorTime"], }, ); diff --git a/src/workers/notificationQueue.js b/src/workers/notificationQueue.js index 0df7f58..845d4c6 100644 --- a/src/workers/notificationQueue.js +++ b/src/workers/notificationQueue.js @@ -26,3 +26,21 @@ export const enqueueNotification = (payload) => { ); } }; + +export const enqueueNotificationsBulk = async (payloads) => { + if (!payloads.length) return 0; + + try { + await getQueue(NOTIFICATION_QUEUE_NAME).addBulk( + payloads.map((payload) => ({ + name: "send-notification", + data: payload, + opts: JOB_OPTIONS, + })), + ); + return payloads.length; + } catch (err) { + logger.error({ err, count: payloads.length }, "Failed to bulk enqueue notifications"); + throw err; + } +};