Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions migrations/010_saved_search_cap.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
CREATE OR REPLACE FUNCTION enforce_saved_search_cap()
RETURNS TRIGGER AS $$
DECLARE
active_count INTEGER;
BEGIN
IF NEW.deleted_at IS NOT NULL THEN
RETURN NEW;
END IF;

PERFORM pg_advisory_xact_lock(hashtext(NEW.user_id::text));

SELECT COUNT(*)::int
INTO active_count
FROM saved_searches
WHERE user_id = NEW.user_id
AND deleted_at IS NULL
AND (TG_OP <> 'UPDATE' OR search_id <> NEW.search_id);

IF active_count >= 10 THEN
RAISE EXCEPTION 'You can save at most 10 searches'
USING ERRCODE = '23514',
CONSTRAINT = 'saved_searches_active_cap_per_user';
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS trg_saved_searches_cap ON saved_searches;

CREATE TRIGGER trg_saved_searches_cap
BEFORE INSERT OR UPDATE OF user_id, deleted_at ON saved_searches
FOR EACH ROW EXECUTE FUNCTION enforce_saved_search_cap();
251 changes: 151 additions & 100 deletions src/cron/savedSearchAlert.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,126 +3,177 @@
import cron from "node-cron";
import { pool } from "../db/client.js";
import { logger } from "../logger/index.js";
import { enqueueNotification } from "../workers/notificationQueue.js";
import { enqueueNotificationsBulk } from "../workers/notificationQueue.js";

const SCHEDULE = process.env.CRON_SAVED_SEARCH_ALERT ?? "0 8 * * *";
const SEARCH_BATCH_SIZE = Number(process.env.SAVED_SEARCH_ALERT_BATCH_SIZE) || 500;
const NOTIFICATION_CHUNK_SIZE = Number(process.env.SAVED_SEARCH_ALERT_NOTIFICATION_CHUNK_SIZE) || 100;

const buildWhereClause = (filters, params) => {
const clauses = [`l.status = 'active'`, `l.deleted_at IS NULL`, `l.expires_at > NOW()`];
let p = params.length + 1;

if (filters.city) {
const escaped = filters.city.replace(/\\/g, "\\\\").replace(/%/g, "\\%").replace(/_/g, "\\_");
clauses.push(`LOWER(l.city) LIKE LOWER($${p}) ESCAPE '\\'`);
params.push(`${escaped}%`);
p++;
}
if (filters.minRent !== undefined) {
clauses.push(`l.rent_per_month >= $${p}`);
params.push(filters.minRent * 100);
p++;
}
if (filters.maxRent !== undefined) {
clauses.push(`l.rent_per_month <= $${p}`);
params.push(filters.maxRent * 100);
p++;
}
if (filters.roomType) {
clauses.push(`l.room_type = $${p}::room_type_enum`);
params.push(filters.roomType);
p++;
}
if (filters.bedType) {
clauses.push(`l.bed_type = $${p}::bed_type_enum`);
params.push(filters.bedType);
p++;
}
if (filters.preferredGender) {
clauses.push(`(l.preferred_gender = $${p}::gender_enum OR l.preferred_gender IS NULL)`);
params.push(filters.preferredGender);
p++;
}
if (filters.listingType) {
clauses.push(`l.listing_type = $${p}::listing_type_enum`);
params.push(filters.listingType);
p++;
const chunk = (items, size) => {
const chunks = [];
for (let i = 0; i < items.length; i += size) {
chunks.push(items.slice(i, i + size));
}
if (filters.availableFrom) {
clauses.push(`l.available_from <= $${p}`);
params.push(filters.availableFrom);
p++;
}
if (filters.amenityIds?.length) {
const ids = [...new Set(filters.amenityIds)];
clauses.push(
`EXISTS (
SELECT 1 FROM listing_amenities la
WHERE la.listing_id = l.listing_id
AND la.amenity_id = ANY($${p}::uuid[])
GROUP BY la.listing_id
HAVING COUNT(DISTINCT la.amenity_id) = $${p + 1}
)`,
);
params.push(ids, ids.length);
p += 2;
}

return clauses;
return chunks;
};

const runSavedSearchAlert = async () => {
const startedAt = Date.now();
logger.info("cron:savedSearchAlert — starting run");

const { rows: searches } = await pool.query(
`SELECT search_id, user_id, filters, last_alerted_at
FROM saved_searches
WHERE deleted_at IS NULL`,
const fetchMatchedSearches = async (cursorId) => {
const queryStartedAt = Date.now();
const { rows } = await pool.query(
`WITH search_batch AS (
SELECT search_id, user_id, filters, last_alerted_at
FROM saved_searches
WHERE deleted_at IS NULL
AND ($1::uuid IS NULL OR search_id > $1::uuid)
ORDER BY search_id ASC
LIMIT $2
),
candidate_listings AS (
SELECT l.*
FROM listings l
WHERE l.status = 'active'
AND l.deleted_at IS NULL
AND l.expires_at > NOW()
AND EXISTS (SELECT 1 FROM search_batch)
AND l.created_at > (
SELECT COALESCE(MIN(last_alerted_at), 'epoch'::timestamptz)
FROM search_batch
)
),
matched_searches AS (
SELECT DISTINCT s.search_id, s.user_id
FROM search_batch s
JOIN candidate_listings l
ON l.created_at > COALESCE(s.last_alerted_at, 'epoch'::timestamptz)
WHERE
(
NOT (s.filters ? 'city')
OR LOWER(l.city) LIKE LOWER(
REPLACE(REPLACE(REPLACE(s.filters->>'city', '\\', '\\\\'), '%', '\\%'), '_', '\\_') || '%'
) ESCAPE '\\'
)
AND (NOT (s.filters ? 'minRent') OR l.rent_per_month >= ((s.filters->>'minRent')::numeric * 100)::int)
AND (NOT (s.filters ? 'maxRent') OR l.rent_per_month <= ((s.filters->>'maxRent')::numeric * 100)::int)
AND (NOT (s.filters ? 'roomType') OR l.room_type::text = s.filters->>'roomType')
AND (NOT (s.filters ? 'bedType') OR l.bed_type::text = s.filters->>'bedType')
AND (
NOT (s.filters ? 'preferredGender')
OR l.preferred_gender::text = s.filters->>'preferredGender'
OR l.preferred_gender IS NULL
)
AND (NOT (s.filters ? 'listingType') OR l.listing_type::text = s.filters->>'listingType')
AND (NOT (s.filters ? 'availableFrom') OR l.available_from <= (s.filters->>'availableFrom')::date)
AND (
NOT (s.filters ? 'amenityIds')
OR jsonb_array_length(s.filters->'amenityIds') = 0
OR EXISTS (
SELECT 1
FROM listing_amenities la
WHERE la.listing_id = l.listing_id
AND la.amenity_id = ANY(
ARRAY(
SELECT jsonb_array_elements_text(s.filters->'amenityIds')::uuid
)
)
GROUP BY la.listing_id
HAVING COUNT(DISTINCT la.amenity_id) = jsonb_array_length(s.filters->'amenityIds')
)
)
),
batch_meta AS (
SELECT
(SELECT search_id FROM search_batch ORDER BY search_id DESC LIMIT 1) AS batch_cursor_id,
(SELECT COUNT(*)::int FROM search_batch) AS batch_count
)
SELECT
bm.batch_cursor_id,
bm.batch_count,
ms.search_id,
ms.user_id
FROM batch_meta bm
LEFT JOIN matched_searches ms ON TRUE
ORDER BY ms.search_id ASC`,
[cursorId, SEARCH_BATCH_SIZE],
);

if (!searches.length) {
logger.debug({ durationMs: Date.now() - startedAt }, "cron:savedSearchAlert — no saved searches");
return;
}

let alerted = 0;
return { rows, queryDurationMs: Date.now() - queryStartedAt };
};

for (const search of searches) {
try {
const filters = search.filters ?? {};
const params = [search.last_alerted_at ?? new Date(0)];
const clauses = buildWhereClause(filters, params);
clauses.push(`l.created_at > $1`);
const markSearchesAlerted = async (searchIds) => {
if (!searchIds.length) return 0;

const { rows: newListings } = await pool.query(
`SELECT l.listing_id FROM listings l
LEFT JOIN properties p ON p.property_id = l.property_id AND p.deleted_at IS NULL
WHERE ${clauses.join(" AND ")}
LIMIT 1`,
params,
);
const { rowCount } = await pool.query(
`UPDATE saved_searches
SET last_alerted_at = NOW()
WHERE search_id = ANY($1::uuid[])
AND deleted_at IS NULL`,
[searchIds],
);

if (!newListings.length) continue;
return rowCount;
};

await pool.query(`UPDATE saved_searches SET last_alerted_at = NOW() WHERE search_id = $1`, [
search.search_id,
]);
export const runSavedSearchAlert = async () => {
const startedAt = Date.now();
logger.info("cron:savedSearchAlert — starting run");

enqueueNotification({
recipientId: search.user_id,
let cursorId = null;
let scanned = 0;
let matched = 0;
let enqueued = 0;
let updated = 0;
let queryDurationMs = 0;

while (true) {
const { rows, queryDurationMs: batchQueryDurationMs } = await fetchMatchedSearches(cursorId);
queryDurationMs += batchQueryDurationMs;

const batchCursorId = rows[0]?.batch_cursor_id ?? null;
const batchCount = Number(rows[0]?.batch_count ?? 0);
scanned += batchCount;

const notifications = rows
.filter((row) => row.search_id)
.map((row) => ({
recipientId: row.user_id,
type: "saved_search_alert",
entityType: "saved_search",
entityId: search.search_id,
});
entityId: row.search_id,
}));

matched += notifications.length;

alerted++;
} catch (err) {
logger.error({ err, searchId: search.search_id }, "cron:savedSearchAlert — failed to process search");
for (const notificationChunk of chunk(notifications, NOTIFICATION_CHUNK_SIZE)) {
enqueued += await enqueueNotificationsBulk(notificationChunk);
}

const batchUpdated = await markSearchesAlerted(notifications.map((notification) => notification.entityId));
updated += batchUpdated;

logger.debug(
{
batchCount,
batchMatched: notifications.length,
batchUpdated,
batchQueryDurationMs,
cursorId: batchCursorId,
},
"cron:savedSearchAlert — batch processed",
);

if (batchCount < SEARCH_BATCH_SIZE || batchCursorId === null) break;
cursorId = batchCursorId;
}

logger.info(
{ alerted, total: searches.length, durationMs: Date.now() - startedAt },
{
scanned,
matched,
enqueued,
updated,
queryDurationMs,
durationMs: Date.now() - startedAt,
},
"cron:savedSearchAlert — run complete",
);
};
Expand Down
Loading
Loading