Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services/indexer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub enum IndexerError {
RollbackTreeHistoryPruned { event_id: WorldIdRegistryEventId },
#[error("contract call failed: {0}")]
ContractCall(String),
#[error("no progress made for {highest_block_number} blocks, restarting")]
NoProgressAfterReorg { highest_block_number: u64 },
}

impl From<BlockchainError> for IndexerError {
Expand Down
15 changes: 12 additions & 3 deletions services/indexer/src/events_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ impl<'a> EventsCommitter<'a> {
let mut tx = self.db.transaction(IsolationLevel::Serializable).await?;

for event in self.buffered_events.iter() {
// Look up event DB for already existing event with the same block number and log index.
// This can either be the same event or conflicting (e.g., if there is a reorg)
let db_event = tx
.world_id_registry_events()
.await?
.get_event((event.block_number, event.log_index))
.await?;

// If an event with the same block number and log index exists in the DB,
// we need to check if the block hash or tx hash is different.
if let Some(db_event) = db_event {
if db_event.block_hash != event.block_hash {
return Err(IndexerError::ReorgDetected {
Expand All @@ -140,13 +144,15 @@ impl<'a> EventsCommitter<'a> {
),
});
}

// If the event already exists in the DB, we can skip it.
tracing::info!(
block_number = event.block_number,
log_index = event.log_index,
"Event already processed, skipping"
);
continue;
// If no event with the same block number and log index exists in the DB,
// we can insert the event into the DB.
} else {
tx.world_id_registry_events()
.await?
Expand All @@ -156,7 +162,9 @@ impl<'a> EventsCommitter<'a> {

EventsProcessor::process_event(&mut tx, event).await?;
}

// Check for any block we have events for in the DB (there could be multiple events per block),
// wether there are conflicting block hashes.
// Note: The earlier check checks only for conflicting events.
let batch_block_numbers: Vec<i64> = self
.buffered_events
.iter()
Expand All @@ -168,7 +176,7 @@ impl<'a> EventsCommitter<'a> {
.await?
.get_blocks_with_conflicting_hashes(&batch_block_numbers)
.await?;

// If there is at least one conflicting block hash, we return a ReorgDetected error.
if !blocks.is_empty() {
return Err(IndexerError::ReorgDetected {
block_number: blocks[0].block_number,
Expand All @@ -179,6 +187,7 @@ impl<'a> EventsCommitter<'a> {
});
}

// Get the on-chain Merkle root of the last event of the batch and compare it to the simulated root.
if let Some(BlockchainEvent {
block_number,
details:
Expand Down
60 changes: 37 additions & 23 deletions services/indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
blockchain::{Blockchain, BlockchainEvent, RegistryEvent},
blockchain::Blockchain,
config::{AppState, HttpConfig, IndexerConfig, RunMode},
db::DB,
events_committer::EventsCommitter,
Expand Down Expand Up @@ -28,6 +28,7 @@ mod sanity_check;
pub mod tree;

static BLOCKCHAIN_RETRY_DELAY: Duration = Duration::from_secs(1);
static MAX_ROLLBACKS_WITHOUT_PROGRESS: u32 = 3;

/// Initializes the in-memory tree from a cache file if it exists, otherwise builds from DB.
///
Expand Down Expand Up @@ -338,14 +339,6 @@ async fn run_both(
}
}

pub async fn handle_registry_event<'a>(
events_committer: &mut EventsCommitter<'a>,
event: BlockchainEvent<RegistryEvent>,
) -> IndexerResult<()> {
events_committer.handle_event(event).await?;
Ok(())
}

/// Stream registry events from the blockchain and process them.
/// Restart when websocket connection is dropped.
#[instrument(level = "info", skip_all, fields(start_from))]
Expand All @@ -357,9 +350,23 @@ pub async fn process_registry_events(
db: &DB,
tree_state: &tree::TreeState,
) -> IndexerResult<()> {
// We re-create the blockchain connection (including backfill and websocket) when the stream
// returns an error or the websocket connection is dropped.
loop {
// We re-start the rpc websocket connection when we then local tree root deviates from the on-chain tree root or
// the websocket connection is dropped or we get any other error.

// We keep track of the highest committed block number across restarts to avoid looping forever in case no progress is made.
let mut watermark_block_number = db
.world_id_registry_events()
.get_latest_block()
.await?
.unwrap_or(indexer_cfg.start_block);

let mut rollbacks_without_progress = 0;

// In-memory tree is rolled back on reorgs
let versioned_tree =
tree::VersionedTreeState::new(tree_state.clone(), indexer_cfg.tree_max_block_age);

while rollbacks_without_progress < MAX_ROLLBACKS_WITHOUT_PROGRESS {
tracing::info!("starting blockchain connection");

let blockchain =
Expand All @@ -379,23 +386,29 @@ pub async fn process_registry_events(

let mut stream = blockchain.backfill_and_stream_events(from, indexer_cfg.batch_size);

let versioned_tree =
tree::VersionedTreeState::new(tree_state.clone(), indexer_cfg.tree_max_block_age);
let mut events_committer = EventsCommitter::new(db, versioned_tree.clone());

while let Some(event) = stream.next().await {
match event {
Ok(event) => {
let block_number = event.block_number;
match handle_registry_event(&mut events_committer, event).await {
Ok(()) => {
match events_committer.handle_event(event).await {
Ok(root_recorded) => {
crate::metrics::set_chain_processed_block(block_number);
// Only update the watermark block if the committed block number is higher than the current watermark.
// It may be lower if there was a reorg and we rolled back to a previous valid root.
if root_recorded {
if block_number > watermark_block_number {
watermark_block_number = block_number;
rollbacks_without_progress = 0;
}
}
}
Err(IndexerError::ReorgDetected {
block_number,
reason,
}) => {
tracing::warn!(
tracing::error!(
block_number,
reason,
"Reorg detected during event commit, rolling back"
Expand All @@ -410,11 +423,8 @@ pub async fn process_registry_events(
{
Ok(Some(target)) => {
tracing::info!(?target, "rolled back successfully");
return Err(IndexerError::ReorgDetected {
block_number: target.block_number,
reason: "rolled back to last valid root, restart required"
.to_string(),
});
rollbacks_without_progress += 1;
break;
}
Ok(None) => {
return Err(IndexerError::ReorgDetected {
Expand All @@ -435,6 +445,10 @@ pub async fn process_registry_events(
}
}

tracing::warn!("restarting blockchain connection");
tracing::error!("restarting blockchain connection");
}

return Err(IndexerError::NoProgressAfterReorg {
highest_block_number: watermark_block_number,
});
}
Loading