Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- BE-API-099: indexes for bounded job search/filter plans.
-- These indexes are intentionally created concurrently so production deploys do
-- not block writes while the planner learns cheaper paths for /api/v1/jobs.
CREATE EXTENSION IF NOT EXISTS pg_trgm;

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_created_id_desc
ON jobs (created_at DESC, id DESC);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_status_created_id_desc
ON jobs (status, created_at DESC, id DESC);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_budget_created_id_desc
ON jobs (budget_usdc DESC, created_at DESC, id DESC);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_deadline_created_id_desc
ON jobs (deadline_at, created_at DESC, id DESC)
WHERE deadline_at IS NOT NULL;

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_skills_gin
ON jobs USING gin (skills);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_search_tsv_gin
ON jobs USING gin (
to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(description, ''))
);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_title_trgm
ON jobs USING gin (title gin_trgm_ops);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_jobs_description_trgm
ON jobs USING gin (description gin_trgm_ops);
5 changes: 5 additions & 0 deletions backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ model jobs {
@@index([client_address], map: "idx_jobs_client_address")
@@index([freelancer_address], map: "idx_jobs_freelancer_address")
@@index([status], map: "idx_jobs_status")
@@index([created_at(sort: Desc), id(sort: Desc)], map: "idx_jobs_created_id_desc")
@@index([status, created_at(sort: Desc), id(sort: Desc)], map: "idx_jobs_status_created_id_desc")
@@index([budget_usdc(sort: Desc), created_at(sort: Desc), id(sort: Desc)], map: "idx_jobs_budget_created_id_desc")
@@index([deadline_at, created_at(sort: Desc), id(sort: Desc)], map: "idx_jobs_deadline_created_id_desc")
@@index([skills], map: "idx_jobs_skills_gin", type: Gin)
}

model milestone_events {
Expand Down
57 changes: 48 additions & 9 deletions backend/src/config/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,28 @@ const connectionString = process.env.DATABASE_URL;
// ---------------------------------------------------------------------------
// Pool configuration — tuneable via environment variables
// ---------------------------------------------------------------------------
const POOL_MAX = parseInt(process.env.POOL_MAX_CONNECTIONS || "20", 10);
const POOL_MIN = parseInt(process.env.POOL_MIN_CONNECTIONS || "2", 10);
const POOL_IDLE_TIMEOUT_MS = parseInt(process.env.POOL_IDLE_TIMEOUT_MS || "30000", 10);
const POOL_CONNECTION_TIMEOUT_MS = parseInt(process.env.POOL_CONNECTION_TIMEOUT_MS || "5000", 10);
const POOL_HEALTH_CHECK_INTERVAL_MS = parseInt(process.env.POOL_HEALTH_CHECK_INTERVAL_MS || "30000", 10);
const POOL_CONNECT_RETRY_LIMIT = parseInt(process.env.POOL_CONNECT_RETRY_LIMIT || "3", 10);
const POOL_CONNECT_RETRY_BASE_DELAY_MS = parseInt(process.env.POOL_CONNECT_RETRY_BASE_DELAY_MS || "500", 10);
function positiveIntEnv(name: string, fallback: number): number {
const raw = process.env[name];
const parsed = raw === undefined ? fallback : Number.parseInt(raw, 10);
if (!Number.isFinite(parsed) || parsed <= 0) {
console.warn(`[POOL] Invalid ${name}=${raw}; using ${fallback}`);
return fallback;
}
return parsed;
}

const POOL_MAX = positiveIntEnv("POOL_MAX_CONNECTIONS", 20);
const POOL_MIN = positiveIntEnv("POOL_MIN_CONNECTIONS", 2);
const POOL_IDLE_TIMEOUT_MS = positiveIntEnv("POOL_IDLE_TIMEOUT_MS", 30000);
const POOL_CONNECTION_TIMEOUT_MS = positiveIntEnv("POOL_CONNECTION_TIMEOUT_MS", 5000);
const POOL_HEALTH_CHECK_INTERVAL_MS = positiveIntEnv("POOL_HEALTH_CHECK_INTERVAL_MS", 30000);
const POOL_CONNECT_RETRY_LIMIT = positiveIntEnv("POOL_CONNECT_RETRY_LIMIT", 3);
const POOL_CONNECT_RETRY_BASE_DELAY_MS = positiveIntEnv("POOL_CONNECT_RETRY_BASE_DELAY_MS", 500);
const POOL_MAX_USES = positiveIntEnv("POOL_MAX_USES", 7500);
const POOL_MAX_LIFETIME_SECONDS = positiveIntEnv("POOL_MAX_LIFETIME_SECONDS", 1800);
const POOL_STATEMENT_TIMEOUT_MS = positiveIntEnv("POOL_STATEMENT_TIMEOUT_MS", 5000);
const POOL_LOCK_TIMEOUT_MS = positiveIntEnv("POOL_LOCK_TIMEOUT_MS", 1000);
const POOL_IDLE_IN_TX_TIMEOUT_MS = positiveIntEnv("POOL_IDLE_IN_TX_TIMEOUT_MS", 5000);

// ---------------------------------------------------------------------------
// Build the pool with resilient options
Expand All @@ -28,6 +43,15 @@ export const pool = new Pool({
min: POOL_MIN,
idleTimeoutMillis: POOL_IDLE_TIMEOUT_MS,
connectionTimeoutMillis: POOL_CONNECTION_TIMEOUT_MS,
maxUses: POOL_MAX_USES,
maxLifetimeSeconds: POOL_MAX_LIFETIME_SECONDS,
statement_timeout: POOL_STATEMENT_TIMEOUT_MS,
query_timeout: POOL_STATEMENT_TIMEOUT_MS + 500,
lock_timeout: POOL_LOCK_TIMEOUT_MS,
idle_in_transaction_session_timeout: POOL_IDLE_IN_TX_TIMEOUT_MS,
application_name: process.env.PGAPPNAME || "lance-backend-api",
keepAlive: true,
keepAliveInitialDelayMillis: 10_000,
allowExitOnIdle: false, // Keep the pool alive even when the event loop has no other work
});

Expand All @@ -36,9 +60,14 @@ export const pool = new Pool({
// ---------------------------------------------------------------------------
pool.on("connect", (client: PoolClient) => {
client
.query("SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED")
.query(`
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET statement_timeout = ${POOL_STATEMENT_TIMEOUT_MS};
SET lock_timeout = ${POOL_LOCK_TIMEOUT_MS};
SET idle_in_transaction_session_timeout = ${POOL_IDLE_IN_TX_TIMEOUT_MS};
`)
.catch((err) => {
console.error("[POOL] Failed to configure transaction isolation:", err.message);
console.error("[POOL] Failed to configure session safety settings:", err.message);
});

if (process.env.NODE_ENV !== "production") {
Expand Down Expand Up @@ -80,6 +109,11 @@ export interface PoolHealthStats {
minConnections: number;
idleTimeoutMs: number;
connectionTimeoutMs: number;
statementTimeoutMs: number;
lockTimeoutMs: number;
idleInTransactionTimeoutMs: number;
maxUses: number;
maxLifetimeSeconds: number;
healthCheckIntervalMs: number;
lastHealthCheckAt: string | null;
lastHealthCheckOk: boolean;
Expand All @@ -104,6 +138,11 @@ export function getPoolHealthStats(): PoolHealthStats {
minConnections: POOL_MIN,
idleTimeoutMs: POOL_IDLE_TIMEOUT_MS,
connectionTimeoutMs: POOL_CONNECTION_TIMEOUT_MS,
statementTimeoutMs: POOL_STATEMENT_TIMEOUT_MS,
lockTimeoutMs: POOL_LOCK_TIMEOUT_MS,
idleInTransactionTimeoutMs: POOL_IDLE_IN_TX_TIMEOUT_MS,
maxUses: POOL_MAX_USES,
maxLifetimeSeconds: POOL_MAX_LIFETIME_SECONDS,
healthCheckIntervalMs: POOL_HEALTH_CHECK_INTERVAL_MS,
lastHealthCheckAt: lastHealthCheckAt ? lastHealthCheckAt.toISOString() : null,
lastHealthCheckOk,
Expand Down
1 change: 1 addition & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import bulkRoutes from "./routes/bulk";
import poolRoutes from "./routes/pool";
import stateRoutes from "./routes/state";
import { pool } from "./config/db";
import { startStorageCleanup, stopStorageCleanup } from "./utils/storage-cleanup";

dotenv.config();

Expand Down
141 changes: 59 additions & 82 deletions backend/src/routes/jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ import milestonesRoutes from "./milestones";
import deliverablesRoutes from "./deliverables";
import jobDisputesRoutes from "./job-disputes";
import { logger } from "../utils/tracing";
import { buildJobSearchQuery, executeReadOnlyJobSearch } from "../utils/jobSearchPlan";

const router = Router();

function positiveTimeoutMs(name: string, fallback: number): number {
const parsed = Number.parseInt(process.env[name] || String(fallback), 10);
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback;
}

// Validation schemas
const getJobsQuerySchema = z.object({
query: z.string().optional(),
status: z.string().optional(),
tag: z.string().optional(),
sort: z.string().optional(),
sort: z.enum(["created_at", "budget"]).default("created_at"),
limit: z.coerce.number().int().min(1).max(100).default(25),
cursor_created_at: z.coerce.date().optional(),
cursor_id: z.string().uuid().optional(),
Expand Down Expand Up @@ -48,6 +54,7 @@ function serializeJob(row: any) {

// GET /api/v1/jobs
router.get("/", async (req: Request, res: Response) => {
const startedAt = Date.now();
try {
const query = getJobsQuerySchema.parse(req.query);

Expand All @@ -65,91 +72,61 @@ router.get("/", async (req: Request, res: Response) => {
return res.status(400).json({ error: "min_budget cannot be greater than max_budget" });
}

const conditions: string[] = [];
const params: any[] = [];
const addParam = (value: any): string => {
params.push(value);
return `$${params.length}`;
};

if (query.query || (query.tag && query.tag !== "all")) {
const searchTerm = query.query || query.tag;
const placeholder = addParam(`%${searchTerm}%`);
conditions.push(`(title ILIKE ${placeholder} OR description ILIKE ${placeholder})`);
}
const builtQuery = buildJobSearchQuery(query);
const client = await pool.connect();

try {
// Read-only transaction settings are local to this request and protect
// the pool from expensive ad-hoc filters under concurrency.
await client.query("BEGIN READ ONLY ISOLATION LEVEL READ COMMITTED");
await client.query(`SET LOCAL statement_timeout = ${positiveTimeoutMs("JOB_SEARCH_STATEMENT_TIMEOUT_MS", 1500)}`);
const result = await executeReadOnlyJobSearch(client, builtQuery);
await client.query("COMMIT");

const rows = result.rows;
const hasNext = rows.length > query.limit;
const items = (hasNext ? rows.slice(0, query.limit) : rows).map(serializeJob);
const cursorSource = hasNext ? items[items.length - 1] : null;

logger.info("Paginated jobs queried", {
returned: items.length,
hasNext,
status: query.status || "any",
sort: query.sort,
planKey: builtQuery.planKey,
poolTotal: pool.totalCount,
poolIdle: pool.idleCount,
poolWaiting: pool.waitingCount,
durationMs: Date.now() - startedAt,
});

if (query.status) {
conditions.push(`status = ${addParam(query.status)}`);
}
if (query.min_budget !== undefined) {
conditions.push(`budget_usdc >= ${addParam(query.min_budget)}`);
}
if (query.max_budget !== undefined) {
conditions.push(`budget_usdc <= ${addParam(query.max_budget)}`);
}
if (query.skills) {
const skills = query.skills
.split(",")
.map((skill) => skill.trim())
.filter(Boolean);
if (skills.length > 0) {
conditions.push(`skills && ${addParam(skills)}::text[]`);
}
}
if (query.deadline_before) {
conditions.push(`deadline_at <= ${addParam(query.deadline_before)}`);
}
if (query.cursor_created_at && query.cursor_id) {
conditions.push(
`(created_at, id) < (${addParam(query.cursor_created_at)}, ${addParam(query.cursor_id)}::uuid)`
);
return res.status(200).json({
items,
next_cursor: cursorSource
? {
created_at: cursorSource.created_at,
id: cursorSource.id,
}
: null,
limit: query.limit,
});
} catch (error) {
await client.query("ROLLBACK").catch(() => undefined);
throw error;
} finally {
client.release();
}

const whereSql = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
const orderSql =
query.sort === "budget"
? "ORDER BY budget_usdc DESC, created_at DESC, id DESC"
: "ORDER BY created_at DESC, id DESC";
const limitPlaceholder = addParam(query.limit + 1);

const result = await pool.query(
`SELECT id, title, description, budget_usdc, milestones, client_address,
freelancer_address, status, metadata_hash, on_chain_job_id, skills, deadline_at,
created_at, updated_at
FROM jobs
${whereSql}
${orderSql}
LIMIT ${limitPlaceholder}`,
params
);

const rows = result.rows;
const hasNext = rows.length > query.limit;
const items = (hasNext ? rows.slice(0, query.limit) : rows).map(serializeJob);
const cursorSource = hasNext ? items[items.length - 1] : null;

logger.info("Paginated jobs queried", {
returned: items.length,
hasNext,
status: query.status || "any",
sort: query.sort || "created_at",
});

res.json({
items,
next_cursor: cursorSource
? {
created_at: cursorSource.created_at,
id: cursorSource.id,
}
: null,
limit: query.limit,
});
} catch (error) {
} catch (error: any) {
if (error instanceof z.ZodError) {
return res.status(400).json({ error: error.issues });
}
console.error("GET /jobs error:", error);
logger.error("GET /jobs error", {
error: error.message || String(error),
durationMs: Date.now() - startedAt,
poolTotal: pool.totalCount,
poolIdle: pool.idleCount,
poolWaiting: pool.waitingCount,
});
res.status(500).json({ error: "Internal server error" });
}
});
Expand All @@ -159,7 +136,7 @@ router.post("/", async (req: Request, res: Response) => {
try {
const data = createJobSchema.parse(req.body);

const result = await prisma.$transaction(async (tx) => {
const result = await prisma.$transaction(async (tx: any) => {
const job = await tx.jobs.create({
data: {
title: data.title,
Expand Down
Loading
Loading