Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions indexer/src/backup/backup.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,29 @@ export class BackupService {
};
}

/** Compliance: backup and recovery audit trail for a period */
/** Compliance: backup and recovery audit trail for a period.
*
* Backups and recovery records are fetched in parallel since
* both are independent read queries.
*/
async getBackupAuditTrail(since: string, limit = 200): Promise<{
backups: BackupManifestRecord[];
recoveries: RecoveryRecordEntity[];
}> {
const backups = await this.backupManifestRepo
.createQueryBuilder('b')
.where('b.createdAt >= :since', { since })
.orderBy('b.createdAt', 'DESC')
.take(limit)
.getMany();
const recoveries = await this.recoveryRecordRepo
.createQueryBuilder('r')
.where('r.executedAt >= :since', { since })
.orderBy('r.executedAt', 'DESC')
.take(limit)
.getMany();
const [backups, recoveries] = await Promise.all([
this.backupManifestRepo
.createQueryBuilder('b')
.where('b.createdAt >= :since', { since })
.orderBy('b.createdAt', 'DESC')
.take(limit)
.getMany(),
this.recoveryRecordRepo
.createQueryBuilder('r')
.where('r.executedAt >= :since', { since })
.orderBy('r.executedAt', 'DESC')
.take(limit)
.getMany(),
]);
return { backups, recoveries };
}

Expand Down
61 changes: 34 additions & 27 deletions indexer/src/events/event-processor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,18 @@ export class EventProcessorService {
},
});

// Mark rewards as claimed
for (const reward of rewards) {
// Mark rewards as claimed β€” batch update in parallel
const updatedRewards = rewards.map((reward) => {
reward.status = RewardStatus.CLAIMED;
reward.claimedAt = data.timestamp;
await this.rewardRepo.save(reward);
return reward;
});

if (updatedRewards.length > 0) {
await this.rewardRepo.save(updatedRewards);
}

this.logger.log(`Indexed RewardClaimedEvent for ${data.user}`);
this.logger.log(`Indexed RewardClaimedEvent for ${data.user} (${updatedRewards.length} rewards claimed)`);
}

private async handleRewardPoolFundedEvent(event: ProcessedEvent): Promise<void> {
Expand Down Expand Up @@ -455,18 +459,19 @@ export class EventProcessorService {
mintedTimestamp: event.timestamp,
});

await this.contentTokenRepo.save(token);

// Create provenance record
await this.createProvenanceRecord({
tokenId: data.token_id,
eventType: ProvenanceEventType.MINT,
fromAddress: null,
toAddress: data.creator,
timestamp: event.timestamp,
ledger: event.ledger,
txHash: event.txHash,
});
// Save token and create provenance record in parallel
await Promise.all([
this.contentTokenRepo.save(token),
this.createProvenanceRecord({
tokenId: data.token_id,
eventType: ProvenanceEventType.MINT,
fromAddress: null,
toAddress: data.creator,
timestamp: event.timestamp,
ledger: event.ledger,
txHash: event.txHash,
}),
]);

this.logger.log(`Indexed ContentMintedEvent for token ${data.token_id}`);
}
Expand All @@ -484,18 +489,20 @@ export class EventProcessorService {
token.lastTransferLedger = event.ledger;
token.lastTransferTxHash = event.txHash;
token.lastTransferTimestamp = data.timestamp;
await this.contentTokenRepo.save(token);

// Create provenance record
await this.createProvenanceRecord({
tokenId: data.token_id,
eventType: ProvenanceEventType.TRANSFER,
fromAddress: data.from,
toAddress: data.to,
timestamp: data.timestamp,
ledger: event.ledger,
txHash: event.txHash,
});
// Save token update and create provenance record in parallel
await Promise.all([
this.contentTokenRepo.save(token),
this.createProvenanceRecord({
tokenId: data.token_id,
eventType: ProvenanceEventType.TRANSFER,
fromAddress: data.from,
toAddress: data.to,
timestamp: data.timestamp,
ledger: event.ledger,
txHash: event.txHash,
}),
]);

this.logger.log(`Indexed OwnershipTransferredEvent for token ${data.token_id}`);
}
Expand Down
49 changes: 37 additions & 12 deletions indexer/src/horizon/horizon.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import * as StellarSdk from '@stellar/stellar-sdk';
import { Server, ServerApi } from '@stellar/stellar-sdk/lib/horizon';
import { parallelBatch } from '../utils/parallel';

export interface ProcessedEvent {
type: string;
Expand Down Expand Up @@ -90,18 +91,29 @@ export class HorizonService implements OnModuleInit {
}

/**
* Fetch operations for a specific ledger range
* Fetch operations for a specific ledger range.
*
* Uses parallel batched fetching (concurrency=10) to speed up
* historical backfill by fetching multiple ledgers simultaneously.
*/
async fetchOperationsInRange(
startLedger: number,
endLedger: number,
): Promise<ProcessedEvent[]> {
this.logger.log(`Fetching operations from ledger ${startLedger} to ${endLedger}`);

const allEvents: ProcessedEvent[] = [];

for (let ledger = startLedger; ledger <= endLedger; ledger++) {
try {
const totalLedgers = endLedger - startLedger + 1;
this.logger.log(
`Fetching operations from ledger ${startLedger} to ${endLedger} (${totalLedgers} ledgers, parallel)`,
);

const ledgerNumbers = Array.from(
{ length: totalLedgers },
(_, i) => startLedger + i,
);

const { results, errors, durationMs } = await parallelBatch(
ledgerNumbers,
async (ledger) => {
const events: ProcessedEvent[] = [];
const operations = await this.server
.operations()
.forLedger(ledger.toString())
Expand All @@ -113,16 +125,29 @@ export class HorizonService implements OnModuleInit {
const invokeOp = operation as ServerApi.InvokeHostFunctionOperationRecord;

if (this.isContractOperation(invokeOp)) {
const events = await this.extractEventsFromOperation(invokeOp);
allEvents.push(...events);
const extracted = await this.extractEventsFromOperation(invokeOp);
events.push(...extracted);
}
}
}
} catch (error) {
this.logger.warn(`Error fetching ledger ${ledger}: ${error.message}`);
}
return events;
},
{ concurrency: 10, continueOnError: true },
);

if (errors.length > 0) {
this.logger.warn(
`${errors.length}/${totalLedgers} ledger fetches failed during parallel fetch`,
);
}

// Flatten results (each element is an array of events for one ledger)
const allEvents = results.filter(Boolean).flat();

this.logger.log(
`Parallel fetch complete: ${allEvents.length} events from ${totalLedgers} ledgers in ${durationMs}ms`,
);

return allEvents;
}

Expand Down
54 changes: 48 additions & 6 deletions indexer/src/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,21 +177,63 @@ export class IndexerService implements OnModuleInit, OnModuleDestroy {
}

/**
* Backfill historical data
* Backfill historical data with parallel event processing.
*
* Events are fetched in parallel (handled by HorizonService), then
* processed in parallel batches (concurrency=20) for throughput.
* State is updated after each batch to provide progress tracking.
*/
async backfillHistoricalData(startLedger: number, endLedger: number): Promise<void> {
this.logger.log(`Starting backfill from ledger ${startLedger} to ${endLedger}`);
this.logger.log(`Starting parallel backfill from ledger ${startLedger} to ${endLedger}`);
const backfillStart = Date.now();

try {
// Parallel ledger fetching (handled inside HorizonService)
const events = await this.horizonService.fetchOperationsInRange(startLedger, endLedger);

this.logger.log(`Found ${events.length} events to process`);
this.logger.log(`Found ${events.length} events to process in parallel`);

for (const event of events) {
await this.handleEvent(event);
const BATCH_SIZE = 20;
let processedCount = 0;
let errorCount = 0;

for (let i = 0; i < events.length; i += BATCH_SIZE) {
const batch = events.slice(i, i + BATCH_SIZE);

const results = await Promise.allSettled(
batch.map((event) => this.eventProcessor.processEvent(event)),
);

for (const result of results) {
if (result.status === 'fulfilled') {
processedCount++;
} else {
errorCount++;
this.logger.warn(`Backfill event error: ${result.reason?.message}`);
}
}

// Periodic state update after each batch
const lastEvent = batch[batch.length - 1];
const state = await this.indexerStateRepo.findOne({
where: { key: this.stateKey },
});

if (state && lastEvent) {
state.lastProcessedLedger = lastEvent.ledger;
state.totalEventsProcessed += batch.length;
state.totalErrors += results.filter((r) => r.status === 'rejected').length;
await this.indexerStateRepo.save(state);
}
}

this.logger.log('Backfill completed successfully');
const durationMs = Date.now() - backfillStart;
const eventsPerSec = processedCount > 0 ? Math.round((processedCount / durationMs) * 1000) : 0;

this.logger.log(
`Backfill completed: ${processedCount} events processed, ${errorCount} errors, ` +
`${durationMs}ms elapsed (${eventsPerSec} events/sec)`,
);
} catch (error) {
this.logger.error(`Backfill failed: ${error.message}`, error.stack);
throw error;
Expand Down
24 changes: 14 additions & 10 deletions indexer/src/performance/health.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,30 @@ export class HealthService {
ledgerLagSeconds: number;
staleAfterSeconds: number;
};
durationMs: number;
}> {
const startMs = Date.now();
const staleAfterSeconds = this.configService.get<number>('indexer.staleAfterSeconds') ?? 900;
const checks = {
database: await this.checkDatabase(),
horizon: await this.checkHorizon(),
indexerState: 'error' as ServiceStatus,
};

const state = await this.indexerStateRepo.findOne({
where: { key: this.stateKey },
});
// Run all independent checks in parallel
const [dbStatus, horizonStatus, state] = await Promise.all([
this.checkDatabase(),
this.checkHorizon(),
this.indexerStateRepo.findOne({ where: { key: this.stateKey } }),
]);

const lastProcessedTimestamp = Number(state?.lastProcessedTimestamp ?? '0');
const ledgerLagSeconds =
lastProcessedTimestamp > 0
? Math.max(0, Math.floor(Date.now() / 1000) - lastProcessedTimestamp)
: staleAfterSeconds + 1;

checks.indexerState = !state || ledgerLagSeconds > staleAfterSeconds ? 'degraded' : 'ok';
this.metricsService.updateDependencyHealth('indexer_state', checks.indexerState === 'ok');
const indexerState: ServiceStatus =
!state || ledgerLagSeconds > staleAfterSeconds ? 'degraded' : 'ok';

this.metricsService.updateDependencyHealth('indexer_state', indexerState === 'ok');

const checks = { database: dbStatus, horizon: horizonStatus, indexerState };
const statuses = Object.values(checks);
const overallStatus: ServiceStatus = statuses.includes('error')
? 'error'
Expand Down Expand Up @@ -83,6 +86,7 @@ export class HealthService {
ledgerLagSeconds,
staleAfterSeconds,
},
durationMs: Date.now() - startMs,
};
}

Expand Down
Loading
Loading