Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions lib/indexer.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,52 @@
import { Indexer, IndexedEvent, EventFetcher } from "./indexer";

describe("Indexer", () => {
let indexer: Indexer;
let mockFetcher: EventFetcher;
let mockStorage: any;

beforeEach(() => {
mockFetcher = {
fetchEvents: jest.fn().mockResolvedValue([]),
};
mockStorage = {
saveEvent: jest.fn().mockResolvedValue(undefined),
deleteEventsFromLedger: jest.fn().mockResolvedValue(undefined),
};
indexer = new Indexer({ overlapWindow: 2, hashWindowSize: 5 }, mockFetcher, mockStorage);
});

test("should process events and track cursor", async () => {
const event: IndexedEvent = { id: "1", type: "test", ledger: 1, hash: "h1", prevHash: "h0", data: {} };
await indexer.processEvent(event);
expect(mockStorage.saveEvent).toHaveBeenCalledWith(event);
expect(indexer.metrics.eventsProcessed).toBe(1);
});

test("should detect gaps and backfill", async () => {
const event1: IndexedEvent = { id: "1", type: "test", ledger: 1, hash: "h1", prevHash: "h0", data: {} };
const event2: IndexedEvent = { id: "5", type: "test", ledger: 5, hash: "h5", prevHash: "h4", data: {} };

await indexer.processEvent(event1);
await indexer.processEvent(event2);

expect(mockFetcher.fetchEvents).toHaveBeenCalledWith(2, 4);
expect(indexer.metrics.gapsDetected).toBe(1);
});

test("should detect reorgs and rollback", async () => {
const event1: IndexedEvent = { id: "1", type: "test", ledger: 1, hash: "h1", prevHash: "h0", data: {} };
const event2: IndexedEvent = { id: "2", type: "test", ledger: 2, hash: "h2", prevHash: "h1", data: {} };
const reorgEvent2: IndexedEvent = { id: "2b", type: "test", ledger: 2, hash: "h2b", prevHash: "h1b", data: {} };

await indexer.processEvent(event1);
await indexer.processEvent(event2);

// Process reorg event
await indexer.processEvent(reorgEvent2);

expect(mockStorage.deleteEventsFromLedger).toHaveBeenCalledWith(1);
expect(indexer.metrics.reorgsDetected).toBe(1);
import { HorizonIndexer, HorizonEvent, cursorsDb, processedEventsDb } from './indexer';

describe('HorizonIndexer', () => {
Expand Down
78 changes: 78 additions & 0 deletions lib/indexer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,81 @@
export interface IndexedEvent {
id: string;
type: string;
ledger: number;
hash: string;
prevHash: string;
data: any;
}

export interface IndexerConfig {
overlapWindow: number;
hashWindowSize: number;
}

export interface IndexerMetrics {
gapsDetected: number;
reorgsDetected: number;
eventsProcessed: number;
}

export interface EventFetcher {
fetchEvents(startLedger: number, endLedger: number): Promise<IndexedEvent[]>;
}

export class Indexer {
private cursor: number = 0;
private hashWindow: { ledger: number; hash: string }[] = [];
public readonly metrics: IndexerMetrics = {
gapsDetected: 0,
reorgsDetected: 0,
eventsProcessed: 0,
};

constructor(
private config: IndexerConfig,
private fetcher: EventFetcher,
private storage: { saveEvent: (e: IndexedEvent) => Promise<void>; deleteEventsFromLedger: (l: number) => Promise<void> }
) {}

async processEvent(event: IndexedEvent): Promise<void> {
if (this.cursor === 0) {
this.cursor = event.ledger - 1;
}

// Gap detection
if (event.ledger > this.cursor + 1 + this.config.overlapWindow) {
console.log(JSON.stringify({ type: "gap_detected", ledger: event.ledger, cursor: this.cursor }));
this.metrics.gapsDetected++;
// Trigger backfill
const events = await this.fetcher.fetchEvents(this.cursor + 1, event.ledger - 1);
for (const e of events) {
await this.storage.saveEvent(e);
}
}

// Reorg detection
if (this.hashWindow.length > 0) {
const last = this.hashWindow[this.hashWindow.length - 1];
if (event.prevHash !== last.hash) {
console.log(JSON.stringify({ type: "reorg_detected", ledger: event.ledger, prevHash: event.prevHash, expectedPrevHash: last.hash }));
this.metrics.reorgsDetected++;
// Rollback
await this.storage.deleteEventsFromLedger(last.ledger);
this.cursor = last.ledger - 1;
this.hashWindow = this.hashWindow.filter(h => h.ledger <= this.cursor);
}
}

await this.storage.saveEvent(event);
this.cursor = event.ledger;

// Update hash window
this.hashWindow.push({ ledger: event.ledger, hash: event.hash });
if (this.hashWindow.length > this.config.hashWindowSize) {
this.hashWindow.shift();
}

this.metrics.eventsProcessed++;
import { randomUUID } from 'crypto';

export interface IndexerConfig {
Expand Down
Loading