diff --git a/indexer/src/backup/backup.service.ts b/indexer/src/backup/backup.service.ts index 285f2456..f38cc728 100644 --- a/indexer/src/backup/backup.service.ts +++ b/indexer/src/backup/backup.service.ts @@ -122,23 +122,29 @@ export class BackupService { }; } - /** Compliance: backup and recovery audit trail for a period */ + /** Compliance: backup and recovery audit trail for a period. + * + * Backups and recovery records are fetched in parallel since + * both are independent read queries. + */ async getBackupAuditTrail(since: string, limit = 200): Promise<{ backups: BackupManifestRecord[]; recoveries: RecoveryRecordEntity[]; }> { - const backups = await this.backupManifestRepo - .createQueryBuilder('b') - .where('b.createdAt >= :since', { since }) - .orderBy('b.createdAt', 'DESC') - .take(limit) - .getMany(); - const recoveries = await this.recoveryRecordRepo - .createQueryBuilder('r') - .where('r.executedAt >= :since', { since }) - .orderBy('r.executedAt', 'DESC') - .take(limit) - .getMany(); + const [backups, recoveries] = await Promise.all([ + this.backupManifestRepo + .createQueryBuilder('b') + .where('b.createdAt >= :since', { since }) + .orderBy('b.createdAt', 'DESC') + .take(limit) + .getMany(), + this.recoveryRecordRepo + .createQueryBuilder('r') + .where('r.executedAt >= :since', { since }) + .orderBy('r.executedAt', 'DESC') + .take(limit) + .getMany(), + ]); return { backups, recoveries }; } diff --git a/indexer/src/events/event-processor.service.ts b/indexer/src/events/event-processor.service.ts index a60ee660..51bc5cbd 100644 --- a/indexer/src/events/event-processor.service.ts +++ b/indexer/src/events/event-processor.service.ts @@ -287,14 +287,18 @@ export class EventProcessorService { }, }); - // Mark rewards as claimed - for (const reward of rewards) { + // Mark rewards as claimed — batch update in parallel + const updatedRewards = rewards.map((reward) => { reward.status = RewardStatus.CLAIMED; reward.claimedAt = data.timestamp; - await this.rewardRepo.save(reward); + return reward; + }); + + if (updatedRewards.length > 0) { + await this.rewardRepo.save(updatedRewards); } - this.logger.log(`Indexed RewardClaimedEvent for ${data.user}`); + this.logger.log(`Indexed RewardClaimedEvent for ${data.user} (${updatedRewards.length} rewards claimed)`); } private async handleRewardPoolFundedEvent(event: ProcessedEvent): Promise { @@ -455,18 +459,19 @@ export class EventProcessorService { mintedTimestamp: event.timestamp, }); - await this.contentTokenRepo.save(token); - - // Create provenance record - await this.createProvenanceRecord({ - tokenId: data.token_id, - eventType: ProvenanceEventType.MINT, - fromAddress: null, - toAddress: data.creator, - timestamp: event.timestamp, - ledger: event.ledger, - txHash: event.txHash, - }); + // Save token and create provenance record in parallel + await Promise.all([ + this.contentTokenRepo.save(token), + this.createProvenanceRecord({ + tokenId: data.token_id, + eventType: ProvenanceEventType.MINT, + fromAddress: null, + toAddress: data.creator, + timestamp: event.timestamp, + ledger: event.ledger, + txHash: event.txHash, + }), + ]); this.logger.log(`Indexed ContentMintedEvent for token ${data.token_id}`); } @@ -484,18 +489,20 @@ export class EventProcessorService { token.lastTransferLedger = event.ledger; token.lastTransferTxHash = event.txHash; token.lastTransferTimestamp = data.timestamp; - await this.contentTokenRepo.save(token); - // Create provenance record - await this.createProvenanceRecord({ - tokenId: data.token_id, - eventType: ProvenanceEventType.TRANSFER, - fromAddress: data.from, - toAddress: data.to, - timestamp: data.timestamp, - ledger: event.ledger, - txHash: event.txHash, - }); + // Save token update and create provenance record in parallel + await Promise.all([ + this.contentTokenRepo.save(token), + this.createProvenanceRecord({ + tokenId: data.token_id, + eventType: ProvenanceEventType.TRANSFER, + fromAddress: data.from, + toAddress: data.to, + timestamp: data.timestamp, + ledger: event.ledger, + txHash: event.txHash, + }), + ]); this.logger.log(`Indexed OwnershipTransferredEvent for token ${data.token_id}`); } diff --git a/indexer/src/horizon/horizon.service.ts b/indexer/src/horizon/horizon.service.ts index 1562649e..47b56d54 100644 --- a/indexer/src/horizon/horizon.service.ts +++ b/indexer/src/horizon/horizon.service.ts @@ -2,6 +2,7 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import * as StellarSdk from '@stellar/stellar-sdk'; import { Server, ServerApi } from '@stellar/stellar-sdk/lib/horizon'; +import { parallelBatch } from '../utils/parallel'; export interface ProcessedEvent { type: string; @@ -90,18 +91,29 @@ export class HorizonService implements OnModuleInit { } /** - * Fetch operations for a specific ledger range + * Fetch operations for a specific ledger range. + * + * Uses parallel batched fetching (concurrency=10) to speed up + * historical backfill by fetching multiple ledgers simultaneously. */ async fetchOperationsInRange( startLedger: number, endLedger: number, ): Promise { - this.logger.log(`Fetching operations from ledger ${startLedger} to ${endLedger}`); - - const allEvents: ProcessedEvent[] = []; - - for (let ledger = startLedger; ledger <= endLedger; ledger++) { - try { + const totalLedgers = endLedger - startLedger + 1; + this.logger.log( + `Fetching operations from ledger ${startLedger} to ${endLedger} (${totalLedgers} ledgers, parallel)`, + ); + + const ledgerNumbers = Array.from( + { length: totalLedgers }, + (_, i) => startLedger + i, + ); + + const { results, errors, durationMs } = await parallelBatch( + ledgerNumbers, + async (ledger) => { + const events: ProcessedEvent[] = []; const operations = await this.server .operations() .forLedger(ledger.toString()) @@ -113,16 +125,29 @@ export class HorizonService implements OnModuleInit { const invokeOp = operation as ServerApi.InvokeHostFunctionOperationRecord; if (this.isContractOperation(invokeOp)) { - const events = await this.extractEventsFromOperation(invokeOp); - allEvents.push(...events); + const extracted = await this.extractEventsFromOperation(invokeOp); + events.push(...extracted); } } } - } catch (error) { - this.logger.warn(`Error fetching ledger ${ledger}: ${error.message}`); - } + return events; + }, + { concurrency: 10, continueOnError: true }, + ); + + if (errors.length > 0) { + this.logger.warn( + `${errors.length}/${totalLedgers} ledger fetches failed during parallel fetch`, + ); } + // Flatten results (each element is an array of events for one ledger) + const allEvents = results.filter(Boolean).flat(); + + this.logger.log( + `Parallel fetch complete: ${allEvents.length} events from ${totalLedgers} ledgers in ${durationMs}ms`, + ); + return allEvents; } diff --git a/indexer/src/indexer/indexer.service.ts b/indexer/src/indexer/indexer.service.ts index a1197277..1018837f 100644 --- a/indexer/src/indexer/indexer.service.ts +++ b/indexer/src/indexer/indexer.service.ts @@ -177,21 +177,63 @@ export class IndexerService implements OnModuleInit, OnModuleDestroy { } /** - * Backfill historical data + * Backfill historical data with parallel event processing. + * + * Events are fetched in parallel (handled by HorizonService), then + * processed in parallel batches (concurrency=20) for throughput. + * State is updated after each batch to provide progress tracking. */ async backfillHistoricalData(startLedger: number, endLedger: number): Promise { - this.logger.log(`Starting backfill from ledger ${startLedger} to ${endLedger}`); + this.logger.log(`Starting parallel backfill from ledger ${startLedger} to ${endLedger}`); + const backfillStart = Date.now(); try { + // Parallel ledger fetching (handled inside HorizonService) const events = await this.horizonService.fetchOperationsInRange(startLedger, endLedger); - this.logger.log(`Found ${events.length} events to process`); + this.logger.log(`Found ${events.length} events to process in parallel`); - for (const event of events) { - await this.handleEvent(event); + const BATCH_SIZE = 20; + let processedCount = 0; + let errorCount = 0; + + for (let i = 0; i < events.length; i += BATCH_SIZE) { + const batch = events.slice(i, i + BATCH_SIZE); + + const results = await Promise.allSettled( + batch.map((event) => this.eventProcessor.processEvent(event)), + ); + + for (const result of results) { + if (result.status === 'fulfilled') { + processedCount++; + } else { + errorCount++; + this.logger.warn(`Backfill event error: ${result.reason?.message}`); + } + } + + // Periodic state update after each batch + const lastEvent = batch[batch.length - 1]; + const state = await this.indexerStateRepo.findOne({ + where: { key: this.stateKey }, + }); + + if (state && lastEvent) { + state.lastProcessedLedger = lastEvent.ledger; + state.totalEventsProcessed += batch.length; + state.totalErrors += results.filter((r) => r.status === 'rejected').length; + await this.indexerStateRepo.save(state); + } } - this.logger.log('Backfill completed successfully'); + const durationMs = Date.now() - backfillStart; + const eventsPerSec = processedCount > 0 ? Math.round((processedCount / durationMs) * 1000) : 0; + + this.logger.log( + `Backfill completed: ${processedCount} events processed, ${errorCount} errors, ` + + `${durationMs}ms elapsed (${eventsPerSec} events/sec)`, + ); } catch (error) { this.logger.error(`Backfill failed: ${error.message}`, error.stack); throw error; diff --git a/indexer/src/performance/health.service.ts b/indexer/src/performance/health.service.ts index 3d178aca..084dd936 100644 --- a/indexer/src/performance/health.service.ts +++ b/indexer/src/performance/health.service.ts @@ -35,17 +35,17 @@ export class HealthService { ledgerLagSeconds: number; staleAfterSeconds: number; }; + durationMs: number; }> { + const startMs = Date.now(); const staleAfterSeconds = this.configService.get('indexer.staleAfterSeconds') ?? 900; - const checks = { - database: await this.checkDatabase(), - horizon: await this.checkHorizon(), - indexerState: 'error' as ServiceStatus, - }; - const state = await this.indexerStateRepo.findOne({ - where: { key: this.stateKey }, - }); + // Run all independent checks in parallel + const [dbStatus, horizonStatus, state] = await Promise.all([ + this.checkDatabase(), + this.checkHorizon(), + this.indexerStateRepo.findOne({ where: { key: this.stateKey } }), + ]); const lastProcessedTimestamp = Number(state?.lastProcessedTimestamp ?? '0'); const ledgerLagSeconds = @@ -53,9 +53,12 @@ export class HealthService { ? Math.max(0, Math.floor(Date.now() / 1000) - lastProcessedTimestamp) : staleAfterSeconds + 1; - checks.indexerState = !state || ledgerLagSeconds > staleAfterSeconds ? 'degraded' : 'ok'; - this.metricsService.updateDependencyHealth('indexer_state', checks.indexerState === 'ok'); + const indexerState: ServiceStatus = + !state || ledgerLagSeconds > staleAfterSeconds ? 'degraded' : 'ok'; + + this.metricsService.updateDependencyHealth('indexer_state', indexerState === 'ok'); + const checks = { database: dbStatus, horizon: horizonStatus, indexerState }; const statuses = Object.values(checks); const overallStatus: ServiceStatus = statuses.includes('error') ? 'error' @@ -83,6 +86,7 @@ export class HealthService { ledgerLagSeconds, staleAfterSeconds, }, + durationMs: Date.now() - startMs, }; } diff --git a/indexer/src/utils/parallel.spec.ts b/indexer/src/utils/parallel.spec.ts new file mode 100644 index 00000000..de38f9fd --- /dev/null +++ b/indexer/src/utils/parallel.spec.ts @@ -0,0 +1,131 @@ +import { parallelBatch, parallelAll } from './parallel'; + +describe('parallelBatch', () => { + it('should process all items and return results in order', async () => { + const items = [1, 2, 3, 4, 5]; + const result = await parallelBatch( + items, + async (n) => n * 2, + { concurrency: 2 }, + ); + + expect(result.results).toEqual([2, 4, 6, 8, 10]); + expect(result.errors).toEqual([]); + expect(result.durationMs).toBeGreaterThanOrEqual(0); + }); + + it('should respect concurrency limit', async () => { + let maxConcurrent = 0; + let currentConcurrent = 0; + + const items = Array.from({ length: 10 }, (_, i) => i); + await parallelBatch( + items, + async () => { + currentConcurrent++; + maxConcurrent = Math.max(maxConcurrent, currentConcurrent); + await new Promise((r) => setTimeout(r, 50)); + currentConcurrent--; + }, + { concurrency: 3 }, + ); + + expect(maxConcurrent).toBeLessThanOrEqual(3); + }); + + it('should stop on error when continueOnError is false', async () => { + const processed: number[] = []; + const items = [1, 2, 3, 4, 5]; + + await expect( + parallelBatch( + items, + async (n) => { + if (n === 3) throw new Error('fail'); + processed.push(n); + return n; + }, + { concurrency: 1, continueOnError: false }, + ), + ).rejects.toThrow('fail'); + }); + + it('should continue on error when continueOnError is true', async () => { + const items = [1, 2, 3, 4, 5]; + const result = await parallelBatch( + items, + async (n) => { + if (n === 3) throw new Error('fail'); + return n * 2; + }, + { concurrency: 2, continueOnError: true }, + ); + + expect(result.errors).toHaveLength(1); + expect(result.errors[0].index).toBe(2); + expect(result.results[0]).toBe(2); + expect(result.results[1]).toBe(4); + expect(result.results[3]).toBe(8); + expect(result.results[4]).toBe(10); + }); + + it('should handle empty input', async () => { + const result = await parallelBatch([], async () => 1, { concurrency: 5 }); + expect(result.results).toEqual([]); + expect(result.errors).toEqual([]); + }); + + it('should achieve speedup over sequential execution', async () => { + const items = Array.from({ length: 6 }, (_, i) => i); + + // Sequential timing estimate: 6 * 50ms = 300ms + // Parallel (concurrency 3): 2 batches * 50ms = ~100ms + const result = await parallelBatch( + items, + async () => { + await new Promise((r) => setTimeout(r, 50)); + return true; + }, + { concurrency: 3 }, + ); + + // Should be significantly faster than sequential (~300ms) + expect(result.durationMs).toBeLessThan(250); + expect(result.results).toHaveLength(6); + }); +}); + +describe('parallelAll', () => { + it('should run independent operations in parallel', async () => { + const { results, durationMs } = await parallelAll( + () => Promise.resolve('a'), + () => Promise.resolve(42), + () => Promise.resolve(true), + ); + + expect(results).toEqual(['a', 42, true]); + expect(durationMs).toBeGreaterThanOrEqual(0); + }); + + it('should achieve speedup over sequential execution', async () => { + const delay = (ms: number) => new Promise((r) => setTimeout(r, ms)); + + const { durationMs } = await parallelAll( + () => delay(50).then(() => 'a'), + () => delay(50).then(() => 'b'), + () => delay(50).then(() => 'c'), + ); + + // Sequential would be ~150ms, parallel should be ~50ms + expect(durationMs).toBeLessThan(120); + }); + + it('should propagate errors', async () => { + await expect( + parallelAll( + () => Promise.resolve(1), + () => Promise.reject(new Error('fail')), + ), + ).rejects.toThrow('fail'); + }); +}); diff --git a/indexer/src/utils/parallel.ts b/indexer/src/utils/parallel.ts new file mode 100644 index 00000000..05c10efb --- /dev/null +++ b/indexer/src/utils/parallel.ts @@ -0,0 +1,110 @@ +/** + * Parallel processing utilities with concurrency control. + * + * Provides batched parallel execution to avoid overwhelming external services + * or database connections while still achieving significant speedup over + * sequential processing. + */ + +export interface ParallelBatchOptions { + /** Maximum number of concurrent tasks (default: 5) */ + concurrency?: number; + /** Whether to continue processing remaining items if one fails (default: false) */ + continueOnError?: boolean; +} + +export interface ParallelBatchResult { + results: T[]; + errors: Array<{ index: number; error: Error }>; + /** Total wall-clock duration in milliseconds */ + durationMs: number; +} + +/** + * Execute an array of tasks in parallel batches with concurrency control. + * + * Unlike `Promise.all` which runs everything at once, this limits the number + * of concurrent inflight operations to avoid resource exhaustion. + * + * @param items - Items to process + * @param fn - Async function to apply to each item + * @param options - Concurrency and error-handling options + * @returns Results in the same order as the input items + * + * @example + * ```ts + * const results = await parallelBatch( + * ledgerNumbers, + * (ledger) => fetchLedger(ledger), + * { concurrency: 10 }, + * ); + * ``` + */ +export async function parallelBatch( + items: T[], + fn: (item: T, index: number) => Promise, + options: ParallelBatchOptions = {}, +): Promise> { + const { concurrency = 5, continueOnError = false } = options; + const start = Date.now(); + const results: R[] = new Array(items.length); + const errors: Array<{ index: number; error: Error }> = []; + + // Process in batches of `concurrency` size + for (let i = 0; i < items.length; i += concurrency) { + const batch = items.slice(i, i + concurrency); + const batchPromises = batch.map(async (item, batchIdx) => { + const globalIdx = i + batchIdx; + try { + results[globalIdx] = await fn(item, globalIdx); + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)); + errors.push({ index: globalIdx, error }); + if (!continueOnError) { + throw error; + } + } + }); + + if (continueOnError) { + await Promise.allSettled(batchPromises); + } else { + await Promise.all(batchPromises); + } + } + + return { + results, + errors, + durationMs: Date.now() - start, + }; +} + +/** + * Run multiple independent async operations in parallel and return all results. + * + * This is a typed convenience wrapper around `Promise.all` that also captures + * timing information for performance measurement. + * + * @example + * ```ts + * const { results, durationMs } = await parallelAll( + * () => checkDatabase(), + * () => checkHorizon(), + * () => checkIndexerState(), + * ); + * const [dbStatus, horizonStatus, indexerStatus] = results; + * ``` + */ +export async function parallelAll Promise)[]>( + ...fns: T +): Promise<{ + results: { [K in keyof T]: Awaited> }; + durationMs: number; +}> { + const start = Date.now(); + const results = (await Promise.all(fns.map((fn) => fn()))) as { + [K in keyof T]: Awaited>; + }; + return { results, durationMs: Date.now() - start }; +}