Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/src/indexer/indexer.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ describe('IndexerService.poll', () => {
const calls = (eventProcessor.process as jest.Mock).mock.calls;
expect(calls[0][0].ledger).toBe(11);
expect(calls[1][0].ledger).toBe(12);
expect(calls[0][1]).toEqual(expect.objectContaining({ correlationId: expect.any(String) }));
expect(calls[1][1]).toEqual(calls[0][1]);
});

it('skips invalid events (missing contractId)', async () => {
Expand Down
46 changes: 34 additions & 12 deletions backend/src/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ import { EventProcessorService } from './processors/event-processor.service';
import { EventNormalizerService } from './processors/event-normalizer.service';
import { CursorService } from './projections/cursor.service';
import { compareEventsByCursor } from './processors/event-ordering.util';
import { randomUUID } from 'crypto';
import { INDEXER_STREAM_CORE_GAME } from './indexer.constants';

export interface IndexerLogContext {
correlationId: string;
}

/** Observability counters for indexer health metrics. */
export interface IndexerMetrics {
ingestedTotal: number;
Expand Down Expand Up @@ -35,10 +40,10 @@ export class IndexerService {
private readonly configService: ConfigService,
) {}

async ingest(event: IngestedEventDto) {
async ingest(event: IngestedEventDto, context?: IndexerLogContext) {
const t0 = Date.now();
try {
await this.eventProcessor.process(event);
await this.eventProcessor.process(event, context);
await this.cursorService.checkpoint(
event.network,
INDEXER_STREAM_CORE_GAME,
Expand All @@ -50,6 +55,7 @@ export class IndexerService {
this.metrics.lastCursorLedger = event.ledger;
this.logger.log({
msg: 'indexer.ingest.ok',
correlationId: context?.correlationId,
topic: event.topic,
ledger: event.ledger,
txHash: event.txHash,
Expand All @@ -60,6 +66,7 @@ export class IndexerService {
this.metrics.projectionErrors++;
this.logger.error({
msg: 'indexer.ingest.error',
correlationId: context?.correlationId,
topic: event.topic,
ledger: event.ledger,
error: err instanceof Error ? err.message : String(err),
Expand All @@ -68,15 +75,21 @@ export class IndexerService {
}
}

async poll(): Promise<number> {
async poll(context?: IndexerLogContext): Promise<number> {
const network =
(this.configService.get<string>('SOROBAN_NETWORK') as 'testnet' | 'mainnet') ||
'testnet';
const rpcUrl = this.configService.get<string>('SOROBAN_RPC_URL');
const contractId = this.configService.get<string>('SOROBAN_CORE_GAME_CONTRACT_ID');

const cycleContext: IndexerLogContext = context ?? { correlationId: randomUUID() };

if (!rpcUrl || !contractId) {
this.logger.warn('SOROBAN_RPC_URL or SOROBAN_CORE_GAME_CONTRACT_ID not set — poll skipped');
this.logger.warn({
msg: 'indexer.poll.skipped',
correlationId: cycleContext.correlationId,
reason: 'missing_config',
});
return 0;
}

Expand All @@ -86,17 +99,21 @@ export class IndexerService {

this.logger.log({
msg: 'indexer.poll.tick',
correlationId: cycleContext.correlationId,
network,
rpc: rpcUrl ?? 'unset',
contract: contractId ?? 'unset',
cursorLedger: cursor.lastLedger,
cursorTxHash: cursor.lastTxHash,
cursorEventIndex: cursor.lastEventIndex,
metrics: { ...this.metrics },
});
this.logger.debug(
`poll network=${network} rpc=${rpcUrl} contract=${contractId} cursor=${cursor.lastLedger}:${cursor.lastTxHash}:${cursor.lastEventIndex}`,
);
this.logger.debug({
msg: 'indexer.poll.debug',
correlationId: cycleContext.correlationId,
network,
cursorLedger: cursor.lastLedger,
cursorTxHash: cursor.lastTxHash,
cursorEventIndex: cursor.lastEventIndex,
});

const rawEvents = await this.fetchEvents(rpcUrl, contractId, cursor.lastLedger);

Expand All @@ -107,11 +124,15 @@ export class IndexerService {

let ingested = 0;
for (const event of normalized) {
await this.ingest(event);
await this.ingest(event, cycleContext);
ingested++;
}

this.logger.log(`poll complete ingested=${ingested}`);
this.logger.log({
msg: 'indexer.poll.complete',
correlationId: cycleContext.correlationId,
ingested,
});
return ingested;
}

Expand Down Expand Up @@ -140,10 +161,11 @@ export class IndexerService {
return response.data?.result?.events ?? [];
}

recordReplaySkip(ledger: number, txHash: string, eventIndex: number) {
recordReplaySkip(ledger: number, txHash: string, eventIndex: number, context?: IndexerLogContext) {
this.metrics.replaySkips++;
this.logger.warn({
msg: 'indexer.replay.skip',
correlationId: context?.correlationId,
ledger,
txHash,
eventIndex,
Expand Down
15 changes: 10 additions & 5 deletions backend/src/indexer/processors/event-processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Repository } from 'typeorm';
import { IngestedEventDto } from '../dto/ingested-event.dto';
import { IngestedEventEntity } from '../entities/ingested-event.entity';
import { ProjectionService } from '../projections/projection.service';
import { IndexerLogContext } from '../indexer.service';

@Injectable()
export class EventProcessorService {
Expand All @@ -16,7 +17,7 @@ export class EventProcessorService {
private readonly projectionService: ProjectionService,
) {}

async process(event: IngestedEventDto): Promise<boolean> {
async process(event: IngestedEventDto, context?: IndexerLogContext): Promise<boolean> {
const exists = await this.eventsRepo.findOne({
where: {
network: event.network,
Expand All @@ -27,14 +28,18 @@ export class EventProcessorService {

if (exists) {
this.replaySkipCount++;
this.logger.debug(
`Duplicate event skipped txHash=${event.txHash} eventIndex=${event.eventIndex} totalSkipped=${this.replaySkipCount}`,
);
this.logger.debug({
msg: 'indexer.event.duplicate',
correlationId: context?.correlationId,
txHash: event.txHash,
eventIndex: event.eventIndex,
totalSkipped: this.replaySkipCount,
});
return false;
}

await this.eventsRepo.save(this.eventsRepo.create(event));
await this.projectionService.apply(event);
await this.projectionService.apply(event, context);
return true;
}

Expand Down
10 changes: 8 additions & 2 deletions backend/src/indexer/projections/projection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { SessionProjectionEntity } from '../entities/session-projection.entity';
import { IngestedEventDto } from '../dto/ingested-event.dto';
import { IndexerLogContext } from '../indexer.service';

@Injectable()
export class ProjectionService {
Expand All @@ -17,14 +18,19 @@ export class ProjectionService {
* Applies an event to the projection. Idempotent: replaying the same
* session_finalized event produces the same projection state (upsert by sessionId).
*/
async apply(event: IngestedEventDto): Promise<boolean> {
async apply(event: IngestedEventDto, context?: IndexerLogContext): Promise<boolean> {
if (event.topic !== 'session_finalized') {
return false;
}

const sessionId = String(event.payload.sessionId ?? '');
if (!sessionId) {
this.logger.warn(`session_finalized event missing sessionId txHash=${event.txHash}`);
this.logger.warn({
msg: 'indexer.projection.skipped',
correlationId: context?.correlationId,
reason: 'missing_session_id',
txHash: event.txHash,
});
return false;
}

Expand Down
5 changes: 4 additions & 1 deletion backend/src/indexer/queue/indexer-worker.service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomUUID } from 'crypto';
import { Injectable, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Cron, CronExpression } from '@nestjs/schedule';
Expand All @@ -17,18 +18,20 @@ export class IndexerWorkerService {

@Cron(CronExpression.EVERY_10_SECONDS)
async tick() {
const correlationId = randomUUID();
const network = this.configService.get<string>('SOROBAN_NETWORK') || INDEXER_NETWORK_TESTNET;
const cursor = await this.cursorService.getOrCreate(network, INDEXER_STREAM_CORE_GAME);

this.logger.log({
msg: 'indexer.worker.tick',
correlationId,
network,
cursorLedger: cursor.lastLedger,
cursorTxHash: cursor.lastTxHash,
cursorEventIndex: cursor.lastEventIndex,
metrics: { ...this.indexerService.metrics },
});

await this.indexerService.poll();
await this.indexerService.poll({ correlationId });
}
}
Loading