Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion backend/dist/config/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ exports.pool = new pg_1.Pool({
// ---------------------------------------------------------------------------
// Pool event listeners — structured logging for diagnostics
// ---------------------------------------------------------------------------
exports.pool.on("connect", () => {
exports.pool.on("connect", (client) => {
client
.query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED")
.catch((err) => {
console.error("[POOL] Failed to configure transaction isolation:", err.message);
});
if (process.env.NODE_ENV !== "production") {
console.log(`[POOL] New client connected | total=${exports.pool.totalCount} idle=${exports.pool.idleCount} waiting=${exports.pool.waitingCount}`);
}
Expand Down
2 changes: 2 additions & 0 deletions backend/dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const activity_1 = __importDefault(require("./routes/activity"));
const uploads_1 = __importDefault(require("./routes/uploads"));
const bulk_1 = __importDefault(require("./routes/bulk"));
const pool_1 = __importDefault(require("./routes/pool"));
const state_1 = __importDefault(require("./routes/state"));
dotenv_1.default.config();
const app = (0, express_1.default)();
const port = process.env.PORT || 3001;
Expand All @@ -35,6 +36,7 @@ app.use("/api/v1/activity", activity_1.default);
app.use("/api/v1/uploads", uploads_1.default);
app.use("/api/v1/bulk", bulk_1.default);
app.use("/api/v1/pool", pool_1.default);
app.use("/api/v1/state", state_1.default);
// Basic healthcheck route
app.get("/health", async (req, res) => {
const startTime = Date.now();
Expand Down
51 changes: 51 additions & 0 deletions backend/dist/routes/state.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const express_1 = require("express");
const zod_1 = require("zod");
const db_1 = require("../config/db");
const tracing_1 = require("../utils/tracing");
const router = (0, express_1.Router)();
const recoveryQuerySchema = zod_1.z.object({
status: zod_1.z.enum(["pending", "committed", "failed", "abandoned"]).optional(),
limit: zod_1.z.coerce.number().int().min(1).max(200).default(50),
});
/**
* GET /api/v1/state/write-recovery
*
* Lists durable write-recovery rows for interrupted or retryable database
* mutations. The query is intentionally bounded and ordered by the indexed
* status/updated_at tuple from the migration to avoid table scans under load.
*/
router.get("/write-recovery", async (req, res) => {
try {
const query = recoveryQuerySchema.parse(req.query);
const params = [query.limit];
let sql = `
SELECT id, idempotency_key, operation, entity_type, entity_id, status,
attempts, last_error, recovery_payload, created_at, updated_at
FROM write_recovery_records
`;
if (query.status) {
params.unshift(query.status);
sql += " WHERE status = $1 ORDER BY updated_at DESC, id DESC LIMIT $2";
}
else {
sql += " ORDER BY updated_at DESC, id DESC LIMIT $1";
}
const result = await db_1.pool.query(sql, params);
tracing_1.logger.info("Write recovery state queried", {
status: query.status || "any",
limit: query.limit,
returned: result.rowCount,
});
res.status(200).json(result.rows);
}
catch (error) {
if (error instanceof zod_1.z.ZodError) {
return res.status(400).json({ error: error.issues });
}
tracing_1.logger.error("Write recovery state query failed", { error: error.message });
res.status(500).json({ error: "Failed to retrieve write recovery state" });
}
});
exports.default = router;
28 changes: 28 additions & 0 deletions backend/migrations/20260527000002_write_recovery_records.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Durable ledger for database writes that may be interrupted after the request is accepted.
-- A pending/failed row gives operators and retry workers a stable idempotency key to inspect.
CREATE TABLE IF NOT EXISTS write_recovery_records (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
idempotency_key TEXT NOT NULL UNIQUE,
operation TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id UUID,
status TEXT NOT NULL DEFAULT 'pending',
attempts INT NOT NULL DEFAULT 0,
last_error TEXT,
recovery_payload JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT write_recovery_status_check
CHECK (status IN ('pending', 'committed', 'failed', 'abandoned'))
);

CREATE INDEX IF NOT EXISTS idx_write_recovery_status_updated
ON write_recovery_records (status, updated_at DESC, id DESC);

CREATE INDEX IF NOT EXISTS idx_write_recovery_entity
ON write_recovery_records (entity_type, entity_id)
WHERE entity_id IS NOT NULL;

CREATE TRIGGER write_recovery_records_updated_at
BEFORE UPDATE ON write_recovery_records
FOR EACH ROW EXECUTE FUNCTION set_updated_at();
8 changes: 7 additions & 1 deletion backend/src/config/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ export const pool = new Pool({
// ---------------------------------------------------------------------------
// Pool event listeners — structured logging for diagnostics
// ---------------------------------------------------------------------------
pool.on("connect", () => {
pool.on("connect", (client: PoolClient) => {
client
.query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED")
.catch((err) => {
console.error("[POOL] Failed to configure transaction isolation:", err.message);
});

if (process.env.NODE_ENV !== "production") {
console.log(
`[POOL] New client connected | total=${pool.totalCount} idle=${pool.idleCount} waiting=${pool.waitingCount}`
Expand Down
2 changes: 2 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import activityRoutes from "./routes/activity";
import uploadsRoutes from "./routes/uploads";
import bulkRoutes from "./routes/bulk";
import poolRoutes from "./routes/pool";
import stateRoutes from "./routes/state";

dotenv.config();

Expand Down Expand Up @@ -56,6 +57,7 @@ app.use("/api/v1/activity", activityRoutes);
app.use("/api/v1/uploads", uploadsRoutes);
app.use("/api/v1/bulk", bulkRoutes);
app.use("/api/v1/pool", poolRoutes);
app.use("/api/v1/state", stateRoutes);

// Health check endpoint with database connectivity verification
app.get("/health", async (req: Request, res: Response) => {
Expand Down
57 changes: 57 additions & 0 deletions backend/src/routes/state.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { Router, Request, Response } from "express";
import { z } from "zod";
import { pool } from "../config/db";
import { logger } from "../utils/tracing";

const router = Router();

const recoveryQuerySchema = z.object({
status: z.enum(["pending", "committed", "failed", "abandoned"]).optional(),
limit: z.coerce.number().int().min(1).max(200).default(50),
});

/**
* GET /api/v1/state/write-recovery
*
* Lists durable write-recovery rows for interrupted or retryable database
* mutations. The query is intentionally bounded and ordered by the indexed
* status/updated_at tuple from the migration to avoid table scans under load.
*/
router.get("/write-recovery", async (req: Request, res: Response) => {
try {
const query = recoveryQuerySchema.parse(req.query);
const params: Array<string | number> = [query.limit];

let sql = `
SELECT id, idempotency_key, operation, entity_type, entity_id, status,
attempts, last_error, recovery_payload, created_at, updated_at
FROM write_recovery_records
`;

if (query.status) {
params.unshift(query.status);
sql += " WHERE status = $1 ORDER BY updated_at DESC, id DESC LIMIT $2";
} else {
sql += " ORDER BY updated_at DESC, id DESC LIMIT $1";
}

const result = await pool.query(sql, params);

logger.info("Write recovery state queried", {
status: query.status || "any",
limit: query.limit,
returned: result.rowCount,
});

res.status(200).json(result.rows);
} catch (error: any) {
if (error instanceof z.ZodError) {
return res.status(400).json({ error: error.issues });
}

logger.error("Write recovery state query failed", { error: error.message });
res.status(500).json({ error: "Failed to retrieve write recovery state" });
}
});

export default router;
Loading