diff --git a/backend/migrations/20260529000002_sqlx_migration_lock_audits.sql b/backend/migrations/20260529000002_sqlx_migration_lock_audits.sql new file mode 100644 index 00000000..ef040d0e --- /dev/null +++ b/backend/migrations/20260529000002_sqlx_migration_lock_audits.sql @@ -0,0 +1,25 @@ +-- Stores bounded verification proofs for the SQLx PostgreSQL migration advisory lock. +-- Each row captures one API-side cluster synchronization probe so operators can +-- audit pool pressure and lock mutual exclusion across rolling deploys. +CREATE TABLE IF NOT EXISTS sqlx_migration_lock_audits ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + database_name TEXT NOT NULL, + lock_id BIGINT NOT NULL, + probe_concurrency INT NOT NULL CHECK (probe_concurrency > 0), + blocked_probe_count INT NOT NULL CHECK (blocked_probe_count >= 0), + available_after_release BOOLEAN NOT NULL, + pool_total_before INT NOT NULL CHECK (pool_total_before >= 0), + pool_waiting_before INT NOT NULL CHECK (pool_waiting_before >= 0), + pool_total_after INT NOT NULL CHECK (pool_total_after >= 0), + pool_waiting_after INT NOT NULL CHECK (pool_waiting_after >= 0), + duration_ms INT NOT NULL CHECK (duration_ms >= 0), + status TEXT NOT NULL CHECK (status IN ('synchronized', 'failed')), + details JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_sqlx_migration_lock_audits_created + ON sqlx_migration_lock_audits (created_at DESC, id DESC); + +CREATE INDEX IF NOT EXISTS idx_sqlx_migration_lock_audits_db_status_created + ON sqlx_migration_lock_audits (database_name, status, created_at DESC, id DESC); diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index bb5fe44f..ccc46995 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -321,3 +321,25 @@ model verdicts { created_at DateTime @default(now()) @db.Timestamptz(6) disputes disputes @relation(fields: [dispute_id], references: [id], onDelete: Cascade, onUpdate: NoAction) } + +/// Captures operational proofs that SQLx migration advisory locking is shared by all API replicas. +/// Check constraints are enforced in SQL migration 20260529000002_sqlx_migration_lock_audits.sql. +model sqlx_migration_lock_audits { + id String @id @default(dbgenerated("uuid_generate_v4()")) @db.Uuid + database_name String + lock_id BigInt + probe_concurrency Int + blocked_probe_count Int + available_after_release Boolean + pool_total_before Int + pool_waiting_before Int + pool_total_after Int + pool_waiting_after Int + duration_ms Int + status String + details Json @default("{}") + created_at DateTime @default(now()) @db.Timestamptz(6) + + @@index([created_at(sort: Desc), id(sort: Desc)], map: "idx_sqlx_migration_lock_audits_created") + @@index([database_name, status, created_at(sort: Desc), id(sort: Desc)], map: "idx_sqlx_migration_lock_audits_db_status_created") +} diff --git a/backend/src/config/db.ts b/backend/src/config/db.ts index 821f88a7..b8d003d9 100644 --- a/backend/src/config/db.ts +++ b/backend/src/config/db.ts @@ -34,6 +34,63 @@ const POOL_STATEMENT_TIMEOUT_MS = positiveIntEnv("POOL_STATEMENT_TIMEOUT_MS", 50 const POOL_LOCK_TIMEOUT_MS = positiveIntEnv("POOL_LOCK_TIMEOUT_MS", 1000); const POOL_IDLE_IN_TX_TIMEOUT_MS = positiveIntEnv("POOL_IDLE_IN_TX_TIMEOUT_MS", 5000); +const SQLX_MIGRATION_LOCK_PROBE_CONCURRENCY = positiveIntEnv("SQLX_MIGRATION_LOCK_PROBE_CONCURRENCY", 4); +const SQLX_MIGRATION_LOCK_TIMEOUT_MS = positiveIntEnv("SQLX_MIGRATION_LOCK_TIMEOUT_MS", 1500); +const SQLX_MIGRATION_STATEMENT_TIMEOUT_MS = positiveIntEnv("SQLX_MIGRATION_STATEMENT_TIMEOUT_MS", 3000); + +// SQLx derives its PostgreSQL migration advisory lock from the current database +// name as: 0x3d32ad9e * crc32(database_name). Keep this local implementation +// dependency-free so the Node API can audit the exact same cluster-wide mutex +// used by Rust/SQLx migrators before any backend instance attempts schema work. +const SQLX_LOCK_MULTIPLIER = 0x3d32ad9en; +const CRC32_TABLE = Array.from({ length: 256 }, (_, index) => { + let crc = index; + for (let bit = 0; bit < 8; bit++) { + crc = (crc & 1) !== 0 ? 0xedb88320 ^ (crc >>> 1) : crc >>> 1; + } + return crc >>> 0; +}); + +function crc32IsoHdlc(input: string): number { + const bytes = Buffer.from(input, "utf8"); + let crc = 0xffffffff; + for (const byte of bytes) { + crc = CRC32_TABLE[(crc ^ byte) & 0xff] ^ (crc >>> 8); + } + return (crc ^ 0xffffffff) >>> 0; +} + +export interface SqlxMigrationLockConfig { + databaseName: string; + lockId: string; + lockKeyClassId: string; + lockKeyObjId: string; + probeConcurrency: number; + lockTimeoutMs: number; + statementTimeoutMs: number; +} + +export function getSqlxMigrationLockId(databaseName: string): bigint { + return SQLX_LOCK_MULTIPLIER * BigInt(crc32IsoHdlc(databaseName)); +} + +export async function getSqlxMigrationLockConfig(): Promise { + const { rows } = await pool.query<{ current_database: string }>("SELECT current_database()"); + const databaseName = rows[0].current_database; + const lockId = getSqlxMigrationLockId(databaseName); + const unsignedLockId = BigInt.asUintN(64, lockId); + + return { + databaseName, + lockId: lockId.toString(), + lockKeyClassId: (unsignedLockId >> 32n).toString(), + lockKeyObjId: (unsignedLockId & 0xffffffffn).toString(), + probeConcurrency: SQLX_MIGRATION_LOCK_PROBE_CONCURRENCY, + lockTimeoutMs: SQLX_MIGRATION_LOCK_TIMEOUT_MS, + statementTimeoutMs: SQLX_MIGRATION_STATEMENT_TIMEOUT_MS, + }; +} + // --------------------------------------------------------------------------- // Build the pool with resilient options // --------------------------------------------------------------------------- diff --git a/backend/src/routes/state.ts b/backend/src/routes/state.ts index 3c40ed08..6655dc35 100644 --- a/backend/src/routes/state.ts +++ b/backend/src/routes/state.ts @@ -1,6 +1,7 @@ import { Router, Request, Response } from "express"; +import { PoolClient } from "pg"; import { z } from "zod"; -import { pool } from "../config/db"; +import { pool, getPoolHealthStats, getSqlxMigrationLockConfig } from "../config/db"; import { buildJobSearchQuery, summarizePlan } from "../utils/jobSearchPlan"; import { logger } from "../utils/tracing"; @@ -160,4 +161,349 @@ router.get("/job-search-plan", async (req: Request, res: Response) => { }); +const booleanParamSchema = z.preprocess((value) => { + if (typeof value === "string") { + if (value.toLowerCase() === "true") return true; + if (value.toLowerCase() === "false") return false; + } + return value; +}, z.boolean()); + +const migrationLockVerifySchema = z.object({ + concurrency: z.coerce.number().int().min(1).max(16).optional(), + record_audit: booleanParamSchema.default(true), +}); + +async function queryMigrationLockSnapshot() { + const lockConfig = await getSqlxMigrationLockConfig(); + + const [migrationResult, lockResult, auditResult] = await Promise.all([ + pool.query<{ + total_migrations: string; + failed_migrations: string; + latest_version: string | null; + latest_installed_on: Date | null; + }>(` + SELECT COUNT(*)::text AS total_migrations, + COUNT(*) FILTER (WHERE success = false)::text AS failed_migrations, + MAX(version)::text AS latest_version, + MAX(installed_on) AS latest_installed_on + FROM _sqlx_migrations + `), + pool.query<{ + pid: number; + application_name: string | null; + state: string | null; + granted: boolean; + wait_event_type: string | null; + wait_event: string | null; + query_age_seconds: string | null; + }>(` + SELECT a.pid, + a.application_name, + a.state, + l.granted, + a.wait_event_type, + a.wait_event, + EXTRACT(EPOCH FROM (NOW() - a.query_start))::text AS query_age_seconds + FROM pg_locks l + LEFT JOIN pg_stat_activity a ON a.pid = l.pid + WHERE l.locktype = 'advisory' + AND l.classid::bigint = $1::bigint + AND l.objid::bigint = $2::bigint + AND l.objsubid = 1 + ORDER BY l.granted DESC, a.query_start NULLS LAST, a.pid + `, [lockConfig.lockKeyClassId, lockConfig.lockKeyObjId]), + pool.query<{ + id: string; + status: string; + probe_concurrency: number; + blocked_probe_count: number; + available_after_release: boolean; + pool_waiting_after: number; + duration_ms: number; + created_at: Date; + }>(` + SELECT id, status, probe_concurrency, blocked_probe_count, + available_after_release, pool_waiting_after, duration_ms, created_at + FROM sqlx_migration_lock_audits + WHERE database_name = $1 + ORDER BY created_at DESC, id DESC + LIMIT 1 + `, [lockConfig.databaseName]), + ]); + + const migrationStats = migrationResult.rows[0]; + return { + lockConfig, + migrations: { + total: Number.parseInt(migrationStats.total_migrations, 10), + failed: Number.parseInt(migrationStats.failed_migrations, 10), + latestVersion: migrationStats.latest_version, + latestInstalledOn: migrationStats.latest_installed_on + ? migrationStats.latest_installed_on.toISOString() + : null, + }, + lockHolders: lockResult.rows.map((row) => ({ + pid: row.pid, + applicationName: row.application_name, + state: row.state, + granted: row.granted, + waitEventType: row.wait_event_type, + waitEvent: row.wait_event, + queryAgeSeconds: row.query_age_seconds ? Number.parseFloat(row.query_age_seconds) : null, + })), + latestAudit: auditResult.rows[0] + ? { + id: auditResult.rows[0].id, + status: auditResult.rows[0].status, + probeConcurrency: auditResult.rows[0].probe_concurrency, + blockedProbeCount: auditResult.rows[0].blocked_probe_count, + availableAfterRelease: auditResult.rows[0].available_after_release, + poolWaitingAfter: auditResult.rows[0].pool_waiting_after, + durationMs: auditResult.rows[0].duration_ms, + createdAt: auditResult.rows[0].created_at.toISOString(), + } + : null, + }; +} + +/** + * GET /api/v1/state/migration-lock + * + * Returns a read-only view of the SQLx migration ledger and the exact + * PostgreSQL advisory lock key SQLx derives from current_database(). Operators + * can call this on every API replica to confirm the cluster is using one shared + * migration mutex before a rolling deploy begins. + */ +router.get("/migration-lock", async (req: Request, res: Response) => { + const startedAt = Date.now(); + + try { + const snapshot = await queryMigrationLockSnapshot(); + const stats = getPoolHealthStats(); + + logger.info("SQLx migration lock status queried", { + databaseName: snapshot.lockConfig.databaseName, + lockId: snapshot.lockConfig.lockId, + lockHolders: snapshot.lockHolders.length, + failedMigrations: snapshot.migrations.failed, + poolTotal: stats.totalConnections, + poolIdle: stats.idleConnections, + poolWaiting: stats.waitingRequests, + durationMs: Date.now() - startedAt, + }); + + res.status(snapshot.migrations.failed === 0 ? 200 : 409).json({ + status: snapshot.migrations.failed === 0 ? "ready" : "dirty", + migrationLock: { + databaseName: snapshot.lockConfig.databaseName, + lockId: snapshot.lockConfig.lockId, + lockKeyClassId: snapshot.lockConfig.lockKeyClassId, + lockKeyObjId: snapshot.lockConfig.lockKeyObjId, + holders: snapshot.lockHolders, + }, + migrations: snapshot.migrations, + latestAudit: snapshot.latestAudit, + pool: { + totalConnections: stats.totalConnections, + idleConnections: stats.idleConnections, + activeConnections: stats.activeConnections, + waitingRequests: stats.waitingRequests, + maxConnections: stats.maxConnections, + }, + responseTimeMs: Date.now() - startedAt, + }); + } catch (error: any) { + logger.error("SQLx migration lock status query failed", { + error: error.message || String(error), + durationMs: Date.now() - startedAt, + }); + res.status(500).json({ error: "Failed to inspect SQLx migration lock state" }); + } +}); + +/** + * POST /api/v1/state/migration-lock/verify + * + * Proves cluster synchronization by taking the SQLx session-level advisory lock + * on one connection, fanning out bounded concurrent pg_try_advisory_lock probes + * through the pool, and then verifying the lock becomes available after the + * holder releases it. This catches per-replica lock-key drift and pool pressure + * before migrations run during deploys. + */ +router.post("/migration-lock/verify", async (req: Request, res: Response) => { + const startedAt = Date.now(); + let holderClient: PoolClient | null = null; + let holderLockAcquired = false; + + try { + const input = migrationLockVerifySchema.parse(req.body || {}); + const lockConfig = await getSqlxMigrationLockConfig(); + const statsBefore = getPoolHealthStats(); + const maxProbeConcurrency = Math.max(0, statsBefore.maxConnections - 1); + const concurrency = input.concurrency ?? Math.min( + lockConfig.probeConcurrency, + maxProbeConcurrency + ); + + if (maxProbeConcurrency < 1) { + return res.status(503).json({ + error: "At least two pool connections are required to verify migration lock synchronization", + }); + } + + if (concurrency > maxProbeConcurrency) { + return res.status(400).json({ + error: `concurrency must be <= ${maxProbeConcurrency} so one connection can hold the migration lock`, + }); + } + + holderClient = await pool.connect(); + const lockedClient = holderClient; + await lockedClient.query(`SET statement_timeout = ${lockConfig.statementTimeoutMs}`); + await lockedClient.query(`SET lock_timeout = ${lockConfig.lockTimeoutMs}`); + await lockedClient.query("SELECT pg_advisory_lock($1::bigint)", [lockConfig.lockId]); + holderLockAcquired = true; + + const probeResults = await Promise.all( + Array.from({ length: concurrency }, async (_, index) => { + const probeStartedAt = Date.now(); + const probeClient = await pool.connect(); + try { + await probeClient.query(`SET statement_timeout = ${lockConfig.statementTimeoutMs}`); + const result = await probeClient.query<{ acquired: boolean }>( + "SELECT pg_try_advisory_lock($1::bigint) AS acquired", + [lockConfig.lockId] + ); + const acquired = result.rows[0].acquired; + if (acquired) { + await probeClient.query("SELECT pg_advisory_unlock($1::bigint)", [lockConfig.lockId]); + } + + return { + probe: index + 1, + acquiredWhileHeld: acquired, + durationMs: Date.now() - probeStartedAt, + }; + } finally { + probeClient.release(); + } + }) + ); + + const blockedProbeCount = probeResults.filter((probe) => !probe.acquiredWhileHeld).length; + const synchronized = blockedProbeCount === concurrency; + + await lockedClient.query("SELECT pg_advisory_unlock($1::bigint)", [lockConfig.lockId]); + holderLockAcquired = false; + lockedClient.release(); + holderClient = null; + + const releaseProbeClient = await pool.connect(); + let availableAfterRelease = false; + try { + const releaseProbe = await releaseProbeClient.query<{ acquired: boolean }>( + "SELECT pg_try_advisory_lock($1::bigint) AS acquired", + [lockConfig.lockId] + ); + availableAfterRelease = releaseProbe.rows[0].acquired; + if (availableAfterRelease) { + await releaseProbeClient.query("SELECT pg_advisory_unlock($1::bigint)", [lockConfig.lockId]); + } + } finally { + releaseProbeClient.release(); + } + + const healthy = synchronized && availableAfterRelease; + const statsAfter = getPoolHealthStats(); + const durationMs = Date.now() - startedAt; + + if (input.record_audit) { + await pool.query( + ` + INSERT INTO sqlx_migration_lock_audits ( + database_name, lock_id, probe_concurrency, blocked_probe_count, + available_after_release, pool_total_before, pool_waiting_before, + pool_total_after, pool_waiting_after, duration_ms, status, details + ) VALUES ($1, $2::bigint, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12::jsonb) + `, + [ + lockConfig.databaseName, + lockConfig.lockId, + concurrency, + blockedProbeCount, + availableAfterRelease, + statsBefore.totalConnections, + statsBefore.waitingRequests, + statsAfter.totalConnections, + statsAfter.waitingRequests, + durationMs, + healthy ? "synchronized" : "failed", + JSON.stringify({ probeResults }), + ] + ); + } + + logger.info("SQLx migration lock synchronization verified", { + healthy, + databaseName: lockConfig.databaseName, + lockId: lockConfig.lockId, + concurrency, + blockedProbeCount, + availableAfterRelease, + durationMs, + poolTotalBefore: statsBefore.totalConnections, + poolWaitingBefore: statsBefore.waitingRequests, + poolTotalAfter: statsAfter.totalConnections, + poolWaitingAfter: statsAfter.waitingRequests, + }); + + res.status(healthy ? 200 : 409).json({ + status: healthy ? "synchronized" : "failed", + migrationLock: { + databaseName: lockConfig.databaseName, + lockId: lockConfig.lockId, + probeConcurrency: concurrency, + blockedProbeCount, + availableAfterRelease, + }, + probes: probeResults, + pool: { + before: { + totalConnections: statsBefore.totalConnections, + idleConnections: statsBefore.idleConnections, + waitingRequests: statsBefore.waitingRequests, + }, + after: { + totalConnections: statsAfter.totalConnections, + idleConnections: statsAfter.idleConnections, + waitingRequests: statsAfter.waitingRequests, + }, + }, + durationMs, + }); + } catch (error: any) { + if (holderClient && holderLockAcquired) { + await holderClient.query("SELECT pg_advisory_unlock_all()").catch(() => undefined); + } + if (holderClient) { + holderClient.release(); + } + + if (error instanceof z.ZodError) { + return res.status(400).json({ error: error.issues }); + } + + logger.error("SQLx migration lock synchronization verification failed", { + error: error.message || String(error), + durationMs: Date.now() - startedAt, + poolTotal: pool.totalCount, + poolIdle: pool.idleCount, + poolWaiting: pool.waitingCount, + }); + res.status(500).json({ error: "Failed to verify SQLx migration lock synchronization" }); + } +}); + export default router;