Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Stores bounded verification proofs for the SQLx PostgreSQL migration advisory lock.
-- Each row captures one API-side cluster synchronization probe so operators can
-- audit pool pressure and lock mutual exclusion across rolling deploys.
CREATE TABLE IF NOT EXISTS sqlx_migration_lock_audits (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
database_name TEXT NOT NULL,
lock_id BIGINT NOT NULL,
probe_concurrency INT NOT NULL CHECK (probe_concurrency > 0),
blocked_probe_count INT NOT NULL CHECK (blocked_probe_count >= 0),
available_after_release BOOLEAN NOT NULL,
pool_total_before INT NOT NULL CHECK (pool_total_before >= 0),
pool_waiting_before INT NOT NULL CHECK (pool_waiting_before >= 0),
pool_total_after INT NOT NULL CHECK (pool_total_after >= 0),
pool_waiting_after INT NOT NULL CHECK (pool_waiting_after >= 0),
duration_ms INT NOT NULL CHECK (duration_ms >= 0),
status TEXT NOT NULL CHECK (status IN ('synchronized', 'failed')),
details JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_sqlx_migration_lock_audits_created
ON sqlx_migration_lock_audits (created_at DESC, id DESC);

CREATE INDEX IF NOT EXISTS idx_sqlx_migration_lock_audits_db_status_created
ON sqlx_migration_lock_audits (database_name, status, created_at DESC, id DESC);
22 changes: 22 additions & 0 deletions backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,25 @@ model verdicts {
created_at DateTime @default(now()) @db.Timestamptz(6)
disputes disputes @relation(fields: [dispute_id], references: [id], onDelete: Cascade, onUpdate: NoAction)
}

/// Captures operational proofs that SQLx migration advisory locking is shared by all API replicas.
/// Check constraints are enforced in SQL migration 20260529000002_sqlx_migration_lock_audits.sql.
model sqlx_migration_lock_audits {
id String @id @default(dbgenerated("uuid_generate_v4()")) @db.Uuid
database_name String
lock_id BigInt
probe_concurrency Int
blocked_probe_count Int
available_after_release Boolean
pool_total_before Int
pool_waiting_before Int
pool_total_after Int
pool_waiting_after Int
duration_ms Int
status String
details Json @default("{}")
created_at DateTime @default(now()) @db.Timestamptz(6)

@@index([created_at(sort: Desc), id(sort: Desc)], map: "idx_sqlx_migration_lock_audits_created")
@@index([database_name, status, created_at(sort: Desc), id(sort: Desc)], map: "idx_sqlx_migration_lock_audits_db_status_created")
}
57 changes: 57 additions & 0 deletions backend/src/config/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,63 @@ const POOL_STATEMENT_TIMEOUT_MS = positiveIntEnv("POOL_STATEMENT_TIMEOUT_MS", 50
const POOL_LOCK_TIMEOUT_MS = positiveIntEnv("POOL_LOCK_TIMEOUT_MS", 1000);
const POOL_IDLE_IN_TX_TIMEOUT_MS = positiveIntEnv("POOL_IDLE_IN_TX_TIMEOUT_MS", 5000);

const SQLX_MIGRATION_LOCK_PROBE_CONCURRENCY = positiveIntEnv("SQLX_MIGRATION_LOCK_PROBE_CONCURRENCY", 4);
const SQLX_MIGRATION_LOCK_TIMEOUT_MS = positiveIntEnv("SQLX_MIGRATION_LOCK_TIMEOUT_MS", 1500);
const SQLX_MIGRATION_STATEMENT_TIMEOUT_MS = positiveIntEnv("SQLX_MIGRATION_STATEMENT_TIMEOUT_MS", 3000);

// SQLx derives its PostgreSQL migration advisory lock from the current database
// name as: 0x3d32ad9e * crc32(database_name). Keep this local implementation
// dependency-free so the Node API can audit the exact same cluster-wide mutex
// used by Rust/SQLx migrators before any backend instance attempts schema work.
const SQLX_LOCK_MULTIPLIER = 0x3d32ad9en;
const CRC32_TABLE = Array.from({ length: 256 }, (_, index) => {
let crc = index;
for (let bit = 0; bit < 8; bit++) {
crc = (crc & 1) !== 0 ? 0xedb88320 ^ (crc >>> 1) : crc >>> 1;
}
return crc >>> 0;
});

function crc32IsoHdlc(input: string): number {
const bytes = Buffer.from(input, "utf8");
let crc = 0xffffffff;
for (const byte of bytes) {
crc = CRC32_TABLE[(crc ^ byte) & 0xff] ^ (crc >>> 8);
}
return (crc ^ 0xffffffff) >>> 0;
}

export interface SqlxMigrationLockConfig {
databaseName: string;
lockId: string;
lockKeyClassId: string;
lockKeyObjId: string;
probeConcurrency: number;
lockTimeoutMs: number;
statementTimeoutMs: number;
}

export function getSqlxMigrationLockId(databaseName: string): bigint {
return SQLX_LOCK_MULTIPLIER * BigInt(crc32IsoHdlc(databaseName));
}

export async function getSqlxMigrationLockConfig(): Promise<SqlxMigrationLockConfig> {
const { rows } = await pool.query<{ current_database: string }>("SELECT current_database()");
const databaseName = rows[0].current_database;
const lockId = getSqlxMigrationLockId(databaseName);
const unsignedLockId = BigInt.asUintN(64, lockId);

return {
databaseName,
lockId: lockId.toString(),
lockKeyClassId: (unsignedLockId >> 32n).toString(),
lockKeyObjId: (unsignedLockId & 0xffffffffn).toString(),
probeConcurrency: SQLX_MIGRATION_LOCK_PROBE_CONCURRENCY,
lockTimeoutMs: SQLX_MIGRATION_LOCK_TIMEOUT_MS,
statementTimeoutMs: SQLX_MIGRATION_STATEMENT_TIMEOUT_MS,
};
}

// ---------------------------------------------------------------------------
// Build the pool with resilient options
// ---------------------------------------------------------------------------
Expand Down
Loading
Loading