Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions src/blockchain/blockchain-indexer.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { Repository, DataSource } from 'typeorm';
import { Repository, DataSource, MoreThanOrEqual } from 'typeorm';
import { BlockchainIndexerService } from './blockchain-indexer.service';
import { ProcessedEvent } from './entities/processed-event.entity';
import { TokenBalance } from './entities/token-balance.entity';
Expand Down Expand Up @@ -176,37 +176,34 @@ describe('BlockchainIndexerService', () => {
});

describe('replayFromBlock', () => {
it('should delete events from startBlock onward and replay', async () => {
const createQueryBuilder = jest.fn().mockReturnValue({
delete: jest.fn().mockReturnValue({
where: jest.fn().mockReturnValue({
execute: jest.fn().mockResolvedValue({ affected: 3, raw: {} })
})
})
});
jest.spyOn(processedEventRepo, 'createQueryBuilder').mockImplementation(createQueryBuilder);
// Replay is now transactional and reverses orphaned state; the detailed
// coverage lives in blockchain-replay.spec.ts. This is a smoke check that
// the service drives the transaction and deletes everything >= startBlock.
it('should transactionally delete events from startBlock onward', async () => {
const manager = {
find: jest.fn().mockResolvedValue([]),
delete: jest.fn().mockResolvedValue({ affected: 3, raw: {} }),
save: jest.fn().mockResolvedValue(null),
increment: jest.fn().mockResolvedValue({ affected: 1 }),
decrement: jest.fn().mockResolvedValue({ affected: 1 }),
findOne: jest.fn().mockResolvedValue(null),
};
const queryRunner = {
connect: jest.fn(),
startTransaction: jest.fn(),
commitTransaction: jest.fn(),
rollbackTransaction: jest.fn(),
release: jest.fn(),
manager,
};
jest.spyOn(dataSource, 'createQueryRunner').mockReturnValue(queryRunner as any);

await service.replayFromBlock(100);

expect(processedEventRepo.createQueryBuilder).toHaveBeenCalled();
expect(createQueryBuilder().delete().where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 100 });
});

it('should prevent double processing by cleaning up all future events', async () => {
// Test that ensures events from blocks 100, 101, 102+ are all cleaned up
const createQueryBuilder = jest.fn().mockReturnValue({
delete: jest.fn().mockReturnValue({
where: jest.fn().mockReturnValue({
execute: jest.fn().mockResolvedValue({ affected: 5, raw: {} })
})
})
expect(queryRunner.commitTransaction).toHaveBeenCalled();
expect(manager.delete).toHaveBeenCalledWith(ProcessedEvent, {
blockNumber: MoreThanOrEqual(100),
});
jest.spyOn(processedEventRepo, 'createQueryBuilder').mockImplementation(createQueryBuilder);

await service.replayFromBlock(100);

// Verify the query uses >= to clean up all events from startBlock onward
expect(createQueryBuilder().delete().where).toHaveBeenCalledWith('blockNumber >= :startBlock', { startBlock: 100 });
});
});
});
148 changes: 101 additions & 47 deletions src/blockchain/blockchain-indexer.service.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, DataSource } from 'typeorm';
import { Repository, DataSource, EntityManager, MoreThanOrEqual } from 'typeorm';
import { ProcessedEvent } from './entities/processed-event.entity';
import { TokenBalance } from './entities/token-balance.entity';
import { IndexerCheckpoint } from './entities/indexer-checkpoint.entity';
import { BlockchainEvent, TransferEventData } from './interfaces/blockchain-event.interface';
import { SequentialQueue } from './utils/sequential-queue';

@Injectable()
export class BlockchainIndexerService {
private readonly logger = new Logger(BlockchainIndexerService.name);

/**
* Serialises every state mutation (event processing and reorg rollbacks) so
* they are applied strictly in order and never interleave. Without this, a
* reorg rollback could race against a newer block's transaction and leave
* balances or the checkpoint desynced.
*/
private readonly queue = new SequentialQueue();

constructor(
@InjectRepository(ProcessedEvent)
private processedEventRepo: Repository<ProcessedEvent>,
Expand All @@ -20,10 +29,20 @@ export class BlockchainIndexerService {
private dataSource: DataSource,
) {}

/**
* Process a single blockchain event. Enqueued so events are persisted one at
* a time, in submission order.
*/
async processEvent(event: BlockchainEvent): Promise<void> {
return this.queue.enqueue(() => this.processEventInternal(event));
}

private async processEventInternal(event: BlockchainEvent): Promise<void> {
const { txHash, logIndex, blockNumber, eventType, data } = event;

// Check if event already processed using the transaction/log identity.
// Idempotency: an event is uniquely identified by (txHash, logIndex).
// Skipping here short-circuits duplicates before we open a transaction; the
// unique index on those columns is the hard guarantee behind it.
const existing = await this.processedEventRepo.findOne({
where: { txHash, logIndex },
});
Expand All @@ -33,32 +52,34 @@ export class BlockchainIndexerService {
return;
}

// Start transaction
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();

try {
// Insert event record (will fail if unique constraint violated)
// Persist the event record. The payload is retained so a later reorg can
// reverse exactly this mutation.
const processedEvent = this.processedEventRepo.create({
txHash,
logIndex,
blockNumber,
eventType,
payload: (data as Record<string, any>) ?? null,
});
await queryRunner.manager.save(ProcessedEvent, processedEvent);

// Process the event
// Apply the state mutation for this event type.
if (eventType === 'Transfer') {
await this.updateBalances(queryRunner.manager, data as TransferEventData);
await this.applyTransfer(queryRunner.manager, data as TransferEventData);
}

// Advance the checkpoint inside the SAME transaction so the event, the
// balance changes and the checkpoint all commit atomically. If anything
// fails we roll back as a unit and the checkpoint never moves.
await this.saveCheckpoint(queryRunner.manager, blockNumber);

await queryRunner.commitTransaction();
this.logger.log(`Processed event: ${eventType} at block ${blockNumber}`);

// Save checkpoint after successful commit to avoid checkpoint desyncs
// if the transaction is rolled back. Use repository (outside txn).
await this.saveCheckpointAfterCommit(blockNumber);
} catch (error) {
await queryRunner.rollbackTransaction();
this.logger.error(`Failed to process event: ${error.message}`, error.stack);
Expand All @@ -68,61 +89,94 @@ export class BlockchainIndexerService {
}
}

private async updateBalances(manager: any, data: TransferEventData): Promise<void> {
const { from, to, amount, token } = data;
/**
* Roll back every event from `startBlock` onward and reverse the state it
* applied. Used to recover from a chain reorganization: orphaned blocks are
* undone atomically and the checkpoint is rewound so the canonical chain can
* be re-indexed cleanly.
*/
async replayFromBlock(startBlock: number): Promise<void> {
return this.queue.enqueue(() => this.replayFromBlockInternal(startBlock));
}

// Decrease sender balance
await manager.decrement(TokenBalance, { address: from, tokenAddress: token }, 'balance', amount);
private async replayFromBlockInternal(startBlock: number): Promise<void> {
this.logger.log(`Rolling back state from block ${startBlock}`);

// Increase receiver balance
await manager.increment(TokenBalance, { address: to, tokenAddress: token }, 'balance', amount);
}
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();

private async getLastBlock(manager: any): Promise<number | null> {
const checkpoint = await manager.findOne(IndexerCheckpoint, { where: { id: 1 } });
return checkpoint ? checkpoint.lastBlock : null;
try {
// Load orphaned events newest-first so reversals unwind in the opposite
// order they were applied.
const orphaned = await queryRunner.manager.find(ProcessedEvent, {
where: { blockNumber: MoreThanOrEqual(startBlock) },
order: { blockNumber: 'DESC', logIndex: 'DESC' },
});

for (const event of orphaned) {
if (event.eventType === 'Transfer' && event.payload) {
await this.reverseTransfer(
queryRunner.manager,
event.payload as TransferEventData,
);
}
}

// Remove the orphaned event records (>= startBlock) so the canonical
// chain can be re-indexed without tripping the idempotency check.
await queryRunner.manager.delete(ProcessedEvent, {
blockNumber: MoreThanOrEqual(startBlock),
});

// Rewind the checkpoint to just before the rolled-back range.
const rewoundTo = Math.max(0, startBlock - 1);
await queryRunner.manager.save(IndexerCheckpoint, {
id: 1,
lastBlock: rewoundTo,
updatedAt: new Date(),
});

await queryRunner.commitTransaction();
this.logger.log(
`Rolled back ${orphaned.length} event(s); checkpoint rewound to block ${rewoundTo}`,
);
} catch (error) {
await queryRunner.rollbackTransaction();
this.logger.error(`Failed to roll back from block ${startBlock}: ${error.message}`, error.stack);
throw error;
} finally {
await queryRunner.release();
}
}

private async saveCheckpoint(manager: any, blockNumber: number): Promise<void> {
const currentLastBlock = (await this.getLastBlock(manager)) || 0;
const nextLastBlock = Math.max(currentLastBlock, blockNumber);
private async applyTransfer(manager: EntityManager, data: TransferEventData): Promise<void> {
const { from, to, amount, token } = data;
await manager.decrement(TokenBalance, { address: from, tokenAddress: token }, 'balance', amount);
await manager.increment(TokenBalance, { address: to, tokenAddress: token }, 'balance', amount);
}

await manager.save(IndexerCheckpoint, {
id: 1,
lastBlock: nextLastBlock,
updatedAt: new Date(),
});
/** Inverse of {@link applyTransfer}, used when unwinding an orphaned block. */
private async reverseTransfer(manager: EntityManager, data: TransferEventData): Promise<void> {
const { from, to, amount, token } = data;
await manager.increment(TokenBalance, { address: from, tokenAddress: token }, 'balance', amount);
await manager.decrement(TokenBalance, { address: to, tokenAddress: token }, 'balance', amount);
}

private async saveCheckpointAfterCommit(blockNumber: number): Promise<void> {
const checkpoint = await this.checkpointRepo.findOne({ where: { id: 1 } });
private async saveCheckpoint(manager: EntityManager, blockNumber: number): Promise<void> {
const checkpoint = await manager.findOne(IndexerCheckpoint, { where: { id: 1 } });
const currentLastBlock = checkpoint ? checkpoint.lastBlock : 0;
const nextLastBlock = Math.max(currentLastBlock || 0, blockNumber);

await this.checkpointRepo.save({
await manager.save(IndexerCheckpoint, {
id: 1,
lastBlock: nextLastBlock,
updatedAt: new Date(),
});
}

async replayFromBlock(startBlock: number): Promise<void> {
this.logger.log(`Starting replay from block ${startBlock}`);

// Delete processed events from startBlock onwards (>= to prevent stale events)
await this.processedEventRepo.createQueryBuilder()
.delete()
.where('blockNumber >= :startBlock', { startBlock })
.execute();

// Note: In a real implementation, you'd fetch events from blockchain
// For now, assume events are provided externally or mocked
// This is a placeholder for replay logic
this.logger.log(`Replay completed from block ${startBlock}`);
}

async getLastProcessedBlock(): Promise<number | null> {
const checkpoint = await this.checkpointRepo.findOne({ where: { id: 1 } });
return checkpoint ? checkpoint.lastBlock : null;
}
}
}
16 changes: 10 additions & 6 deletions src/blockchain/blockchain-indexer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,21 @@ describe('BlockchainIndexerService - Checkpoint Commit Behavior', () => {
service = module.get<BlockchainIndexerService>(BlockchainIndexerService);
});

it('saves checkpoint after commit when processing succeeds', async () => {
it('saves the checkpoint inside the transaction when processing succeeds', async () => {
const event = { txHash: '0x1', logIndex: 0, blockNumber: 123, eventType: 'Transfer', data: { from: 'a', to: 'b', amount: 1, token: 'T' } } as any;

const manager = dataSource.createQueryRunner().manager;

await service.processEvent(event);

expect(dataSource.createQueryRunner).toHaveBeenCalled();
// Repository.save should be called after commit
expect(checkpointRepo.save).toHaveBeenCalledTimes(1);
const saved = checkpointRepo.save.mock.calls[0][0];
expect(saved.id).toBe(1);
expect(saved.lastBlock).toBe(123);
// Checkpoint must be persisted through the transaction's manager (atomic
// with the event + balance changes), not via the repository after commit.
expect(manager.save).toHaveBeenCalledWith(
IndexerCheckpoint,
expect.objectContaining({ id: 1, lastBlock: 123 }),
);
expect(checkpointRepo.save).not.toHaveBeenCalled();
});

it('does not save checkpoint when processing fails and rolls back', async () => {
Expand Down
Loading
Loading