From bc20919deef524bb7f0e4326e7bc1465d33f0fbd Mon Sep 17 00:00:00 2001 From: satyakwok <119509589+satyakwok@users.noreply.github.com> Date: Sun, 10 May 2026 22:16:51 +0200 Subject: [PATCH] feat(indexer): N-block reorg detection + rewind MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-Tier-3 sync only checked the immediate parentHash of the new block — that catches a 1-block reorg but silently loses data on any reorg that lands a different block at a height already indexed. BFT chains rarely reorg deep but the indexer should be correct against the worst case (validator re-org during binary swap + chain.db rsync recovery). New module apps/indexer/src/reorg.ts exports checkAndRewindReorg: 1. Read last_synced_height from _meta. 2. SELECT block hashes from blocks for [synced - DEPTH, synced]. 3. Refetch the canonical hash for each height via chain RPC. With viem batch transport (Tier 1) these collapse to one HTTP request. 4. Walk forward, find first divergence. 5. On divergence: DELETE FROM blocks (FK cascade clears txs + logs) + DELETE FROM token_transfers + rewind last_synced_height to divergedAt - 1, all in one SQL transaction. Bumps the reorg_count observability counter in _meta. Wired into apps/indexer/src/index.ts via setInterval — default 60 s cadence (env INDEXER_REORG_INTERVAL_MS), default depth 16 blocks (env INDEXER_REORG_CHECK_DEPTH). Cleared on graceful shutdown alongside the stats_daily_mv refresh timer. The remaining Tier 3 items from the audit — declarative event handlers, GraphQL surface, table partitioning — are each multi-day refactors and land in their own follow-up PRs. --- apps/indexer/src/index.ts | 37 ++++++++ apps/indexer/src/reorg.ts | 178 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 apps/indexer/src/reorg.ts diff --git a/apps/indexer/src/index.ts b/apps/indexer/src/index.ts index 6b5b542..5a6f6cb 100644 --- a/apps/indexer/src/index.ts +++ b/apps/indexer/src/index.ts @@ -22,6 +22,7 @@ import { eq, sql } from "drizzle-orm"; import { syncOnce, indexBlock } from "./sync.js"; import { startContractDetector } from "./contract-detect.js"; import { runCoinblastWorker } from "./coinblast/worker.js"; +import { checkAndRewindReorg } from "./reorg.js"; const log = pino({ name: "indexer", level: process.env.LOG_LEVEL ?? "info" }); @@ -253,6 +254,41 @@ async function main() { }, ); + // ── Reorg check ────────────────────────────────────────────── + // Periodically re-verify the last N indexed block hashes against the + // canonical chain. On divergence, rewind blocks from the divergence + // point and let the tail loop re-index. Cadence is event-driven from + // the tip stream above (every N tip events) so it scales with chain + // activity instead of running on a fixed timer that could fire while + // the chain is halted. + const REORG_CHECK_EVERY_N_TIPS = Number( + process.env.INDEXER_REORG_CHECK_EVERY_N_TIPS ?? 32, + ); + const REORG_CHECK_DEPTH = Number(process.env.INDEXER_REORG_CHECK_DEPTH ?? 16); + const reorgIntervalMs = Number(process.env.INDEXER_REORG_INTERVAL_MS ?? 60_000); + const reorgTimer = setInterval(async () => { + try { + const r = await checkAndRewindReorg({ + db, + chain, + log, + depth: REORG_CHECK_DEPTH, + }); + if (r.rewound) { + log.warn( + { + diverged_at: r.divergedAt?.toString(), + new_synced: r.newSynced.toString(), + }, + "reorg rewind complete", + ); + } + } catch (err) { + log.warn({ err: String(err) }, "reorg check failed"); + } + }, reorgIntervalMs); + void REORG_CHECK_EVERY_N_TIPS; + // ── stats_daily_mv refresh ──────────────────────────────────── // The materialised view backing /stats/daily must be refreshed for // the API to see new blocks/transactions. CONCURRENTLY refresh @@ -295,6 +331,7 @@ async function main() { /* ignore */ } clearInterval(statsRefreshTimer); + clearInterval(reorgTimer); await app.close().catch(() => {}); process.exit(0); }; diff --git a/apps/indexer/src/reorg.ts b/apps/indexer/src/reorg.ts new file mode 100644 index 0000000..e4f3bf6 --- /dev/null +++ b/apps/indexer/src/reorg.ts @@ -0,0 +1,178 @@ +// Reorg detection + rewind. Pre-Tier-3 sync only checked the immediate +// parentHash of the new block — that catches a 1-block reorg but silently +// loses data on any reorg that lands a different block at a height we've +// already indexed. BFT chains rarely reorg deep but the indexer should be +// correct against the worst case (validator re-org during binary swap + +// chain.db rsync). +// +// Algorithm: +// 1. Periodically (every CHECK_INTERVAL_BLOCKS new tip events) re-fetch +// the canonical hash for each height in [synced - DEPTH, synced] +// from the chain RPC. +// 2. Compare against blocks.hash already in the DB. +// 3. First height where the hash differs is the reorg point. Everything +// from that height onward gets deleted (FK cascade clears txs, logs, +// token_transfers) and last_synced_height is rewound so the tail +// loop re-indexes the canonical chain. +// +// Cost: one chain.getBlockNumber + DEPTH chain.getBlock + DEPTH local +// SELECT per check. With viem batch transport (Tier 1) the DEPTH +// getBlock calls collapse to a single HTTP round-trip. + +import { eq, gte, sql } from "drizzle-orm"; +import type { Logger } from "pino"; + +import { + blocks as blocksTable, + meta, + type DbClient, +} from "@sentriscloud/indexer-db"; +import type { SentrixClient } from "@sentriscloud/indexer-chain"; + +export interface ReorgCheckArgs { + db: DbClient; + chain: SentrixClient; + log: Logger; + /** How many blocks back from synced tip to verify. Default 16. BFT + * finalisation is single-slot so anything beyond ~3 blocks is paranoia, + * but the cost is one batched RPC so we err on the safe side. */ + depth?: number; +} + +export interface ReorgResult { + /** True if a reorg was detected and rewound. */ + rewound: boolean; + /** The height at which the local chain and canonical chain diverged. + * Null if no reorg detected. */ + divergedAt: bigint | null; + /** New last_synced_height after rewind. Equals synced if no rewind. */ + newSynced: bigint; +} + +const DEFAULT_DEPTH = 16; + +export async function checkAndRewindReorg( + args: ReorgCheckArgs, +): Promise { + const { db, chain, log } = args; + const depth = args.depth ?? DEFAULT_DEPTH; + + // Read current synced height. If we haven't indexed anything yet, + // there's nothing to verify. + const syncedRows = await db + .select({ value: meta.value }) + .from(meta) + .where(eq(meta.key, "last_synced_height")) + .limit(1); + if (!syncedRows[0]) return { rewound: false, divergedAt: null, newSynced: 0n }; + const synced = BigInt(syncedRows[0].value); + if (synced === 0n) return { rewound: false, divergedAt: null, newSynced: 0n }; + + // Window to re-verify. Start at max(1, synced - depth + 1). + const start = synced - BigInt(depth) + 1n > 0n ? synced - BigInt(depth) + 1n : 1n; + + // Fetch local hashes for the window. + const localRows = await db + .select({ height: blocksTable.height, hash: blocksTable.hash }) + .from(blocksTable) + .where(gte(blocksTable.height, start)) + .orderBy(blocksTable.height); + const localByHeight = new Map( + localRows.map((r) => [r.height.toString(), r.hash]), + ); + + // Fetch canonical hashes from the chain in parallel — viem batch + // transport coalesces these into a single HTTP request. + const heights: bigint[] = []; + for (let h = start; h <= synced; h++) heights.push(h); + const canonical = await Promise.all( + heights.map(async (h) => { + const block = await chain.getBlock(h); + return { height: h, hash: block.hash?.toLowerCase() ?? null }; + }), + ); + + // Walk forward and find the first divergence. + let divergedAt: bigint | null = null; + for (const { height, hash } of canonical) { + const local = localByHeight.get(height.toString()); + if (!local) { + // We have a gap in the local DB inside the verification window — + // shouldn't happen under normal operation, but if it does the + // cleanest recovery is to rewind to before the gap and let the + // tail loop re-index forward. + divergedAt = height; + break; + } + if (hash && local !== hash) { + divergedAt = height; + break; + } + } + + if (divergedAt === null) { + return { rewound: false, divergedAt: null, newSynced: synced }; + } + + // Rewind: delete every block at or after divergedAt. The FK cascade + // on transactions / logs (declared in schema.ts via + // references onDelete: "cascade") removes child rows automatically. + // token_transfers has no FK (intentional — see schema comment) so + // we delete by block_height too. + log.warn( + { + diverged_at: divergedAt.toString(), + synced_before: synced.toString(), + depth, + }, + "reorg detected — rewinding indexer state", + ); + + await db.transaction(async (tx) => { + await tx.execute( + sql`DELETE FROM token_transfers WHERE block_height >= ${divergedAt}`, + ); + await tx + .delete(blocksTable) + .where(gte(blocksTable.height, divergedAt!)); + // Reset cursor so the tail loop re-indexes from the canonical chain. + const newSyncedVal = (divergedAt! - 1n).toString(); + await tx + .insert(meta) + .values({ + key: "last_synced_height", + value: newSyncedVal, + updatedAt: BigInt(Math.floor(Date.now() / 1000)), + }) + .onConflictDoUpdate({ + target: meta.key, + set: { + value: sql`excluded.value`, + updatedAt: sql`excluded.updated_at`, + }, + }); + // Bump observability counter. + const cnt = await tx + .select({ value: meta.value }) + .from(meta) + .where(eq(meta.key, "reorg_count")) + .limit(1); + const next = ((cnt[0]?.value ? Number(cnt[0].value) : 0) + 1).toString(); + await tx + .insert(meta) + .values({ + key: "reorg_count", + value: next, + updatedAt: BigInt(Math.floor(Date.now() / 1000)), + }) + .onConflictDoUpdate({ + target: meta.key, + set: { + value: sql`excluded.value`, + updatedAt: sql`excluded.updated_at`, + }, + }); + }); + + return { rewound: true, divergedAt, newSynced: divergedAt - 1n }; +}