From 5a7e6a1e52e84fb0461f02e378d4f6487f89b503 Mon Sep 17 00:00:00 2001 From: ComputerOracle Date: Fri, 29 May 2026 12:36:49 +0000 Subject: [PATCH] feat: handle ledger gaps and reorgs in Horizon indexer --- lib/indexer.test.ts | 51 +++++++++++++++++++++++++++++ lib/indexer.ts | 80 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 lib/indexer.test.ts create mode 100644 lib/indexer.ts diff --git a/lib/indexer.test.ts b/lib/indexer.test.ts new file mode 100644 index 0000000..77c170c --- /dev/null +++ b/lib/indexer.test.ts @@ -0,0 +1,51 @@ +import { Indexer, IndexedEvent, EventFetcher } from "./indexer"; + +describe("Indexer", () => { + let indexer: Indexer; + let mockFetcher: EventFetcher; + let mockStorage: any; + + beforeEach(() => { + mockFetcher = { + fetchEvents: jest.fn().mockResolvedValue([]), + }; + mockStorage = { + saveEvent: jest.fn().mockResolvedValue(undefined), + deleteEventsFromLedger: jest.fn().mockResolvedValue(undefined), + }; + indexer = new Indexer({ overlapWindow: 2, hashWindowSize: 5 }, mockFetcher, mockStorage); + }); + + test("should process events and track cursor", async () => { + const event: IndexedEvent = { id: "1", type: "test", ledger: 1, hash: "h1", prevHash: "h0", data: {} }; + await indexer.processEvent(event); + expect(mockStorage.saveEvent).toHaveBeenCalledWith(event); + expect(indexer.metrics.eventsProcessed).toBe(1); + }); + + test("should detect gaps and backfill", async () => { + const event1: IndexedEvent = { id: "1", type: "test", ledger: 1, hash: "h1", prevHash: "h0", data: {} }; + const event2: IndexedEvent = { id: "5", type: "test", ledger: 5, hash: "h5", prevHash: "h4", data: {} }; + + await indexer.processEvent(event1); + await indexer.processEvent(event2); + + expect(mockFetcher.fetchEvents).toHaveBeenCalledWith(2, 4); + expect(indexer.metrics.gapsDetected).toBe(1); + }); + + test("should detect reorgs and rollback", async () => { + const event1: IndexedEvent = { id: "1", type: "test", ledger: 1, hash: "h1", prevHash: "h0", data: {} }; + const event2: IndexedEvent = { id: "2", type: "test", ledger: 2, hash: "h2", prevHash: "h1", data: {} }; + const reorgEvent2: IndexedEvent = { id: "2b", type: "test", ledger: 2, hash: "h2b", prevHash: "h1b", data: {} }; + + await indexer.processEvent(event1); + await indexer.processEvent(event2); + + // Process reorg event + await indexer.processEvent(reorgEvent2); + + expect(mockStorage.deleteEventsFromLedger).toHaveBeenCalledWith(1); + expect(indexer.metrics.reorgsDetected).toBe(1); + }); +}); diff --git a/lib/indexer.ts b/lib/indexer.ts new file mode 100644 index 0000000..093cf51 --- /dev/null +++ b/lib/indexer.ts @@ -0,0 +1,80 @@ +export interface IndexedEvent { + id: string; + type: string; + ledger: number; + hash: string; + prevHash: string; + data: any; +} + +export interface IndexerConfig { + overlapWindow: number; + hashWindowSize: number; +} + +export interface IndexerMetrics { + gapsDetected: number; + reorgsDetected: number; + eventsProcessed: number; +} + +export interface EventFetcher { + fetchEvents(startLedger: number, endLedger: number): Promise; +} + +export class Indexer { + private cursor: number = 0; + private hashWindow: { ledger: number; hash: string }[] = []; + public readonly metrics: IndexerMetrics = { + gapsDetected: 0, + reorgsDetected: 0, + eventsProcessed: 0, + }; + + constructor( + private config: IndexerConfig, + private fetcher: EventFetcher, + private storage: { saveEvent: (e: IndexedEvent) => Promise; deleteEventsFromLedger: (l: number) => Promise } + ) {} + + async processEvent(event: IndexedEvent): Promise { + if (this.cursor === 0) { + this.cursor = event.ledger - 1; + } + + // Gap detection + if (event.ledger > this.cursor + 1 + this.config.overlapWindow) { + console.log(JSON.stringify({ type: "gap_detected", ledger: event.ledger, cursor: this.cursor })); + this.metrics.gapsDetected++; + // Trigger backfill + const events = await this.fetcher.fetchEvents(this.cursor + 1, event.ledger - 1); + for (const e of events) { + await this.storage.saveEvent(e); + } + } + + // Reorg detection + if (this.hashWindow.length > 0) { + const last = this.hashWindow[this.hashWindow.length - 1]; + if (event.prevHash !== last.hash) { + console.log(JSON.stringify({ type: "reorg_detected", ledger: event.ledger, prevHash: event.prevHash, expectedPrevHash: last.hash })); + this.metrics.reorgsDetected++; + // Rollback + await this.storage.deleteEventsFromLedger(last.ledger); + this.cursor = last.ledger - 1; + this.hashWindow = this.hashWindow.filter(h => h.ledger <= this.cursor); + } + } + + await this.storage.saveEvent(event); + this.cursor = event.ledger; + + // Update hash window + this.hashWindow.push({ ledger: event.ledger, hash: event.hash }); + if (this.hashWindow.length > this.config.hashWindowSize) { + this.hashWindow.shift(); + } + + this.metrics.eventsProcessed++; + } +}