From 542b14ff54156ada6b704813f4e1945436345192 Mon Sep 17 00:00:00 2001 From: kilianglas Date: Thu, 21 May 2026 20:55:49 +0200 Subject: [PATCH] feat: better rollback behavior --- services/indexer/src/error.rs | 2 + services/indexer/src/events_committer.rs | 15 ++++-- services/indexer/src/lib.rs | 60 +++++++++++++++--------- 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/services/indexer/src/error.rs b/services/indexer/src/error.rs index 656b8d611..7a70d12c2 100644 --- a/services/indexer/src/error.rs +++ b/services/indexer/src/error.rs @@ -58,6 +58,8 @@ pub enum IndexerError { RollbackTreeHistoryPruned { event_id: WorldIdRegistryEventId }, #[error("contract call failed: {0}")] ContractCall(String), + #[error("no progress made for {highest_block_number} blocks, restarting")] + NoProgressAfterReorg { highest_block_number: u64 }, } impl From for IndexerError { diff --git a/services/indexer/src/events_committer.rs b/services/indexer/src/events_committer.rs index 6851ec13f..ddf275394 100644 --- a/services/indexer/src/events_committer.rs +++ b/services/indexer/src/events_committer.rs @@ -111,12 +111,16 @@ impl<'a> EventsCommitter<'a> { let mut tx = self.db.transaction(IsolationLevel::Serializable).await?; for event in self.buffered_events.iter() { + // Look up event DB for already existing event with the same block number and log index. + // This can either be the same event or conflicting (e.g., if there is a reorg) let db_event = tx .world_id_registry_events() .await? .get_event((event.block_number, event.log_index)) .await?; + // If an event with the same block number and log index exists in the DB, + // we need to check if the block hash or tx hash is different. if let Some(db_event) = db_event { if db_event.block_hash != event.block_hash { return Err(IndexerError::ReorgDetected { @@ -140,13 +144,15 @@ impl<'a> EventsCommitter<'a> { ), }); } - + // If the event already exists in the DB, we can skip it. tracing::info!( block_number = event.block_number, log_index = event.log_index, "Event already processed, skipping" ); continue; + // If no event with the same block number and log index exists in the DB, + // we can insert the event into the DB. } else { tx.world_id_registry_events() .await? @@ -156,7 +162,9 @@ impl<'a> EventsCommitter<'a> { EventsProcessor::process_event(&mut tx, event).await?; } - + // Check for any block we have events for in the DB (there could be multiple events per block), + // wether there are conflicting block hashes. + // Note: The earlier check checks only for conflicting events. let batch_block_numbers: Vec = self .buffered_events .iter() @@ -168,7 +176,7 @@ impl<'a> EventsCommitter<'a> { .await? .get_blocks_with_conflicting_hashes(&batch_block_numbers) .await?; - + // If there is at least one conflicting block hash, we return a ReorgDetected error. if !blocks.is_empty() { return Err(IndexerError::ReorgDetected { block_number: blocks[0].block_number, @@ -179,6 +187,7 @@ impl<'a> EventsCommitter<'a> { }); } + // Get the on-chain Merkle root of the last event of the batch and compare it to the simulated root. if let Some(BlockchainEvent { block_number, details: diff --git a/services/indexer/src/lib.rs b/services/indexer/src/lib.rs index f65978eaa..90e31a454 100644 --- a/services/indexer/src/lib.rs +++ b/services/indexer/src/lib.rs @@ -1,5 +1,5 @@ use crate::{ - blockchain::{Blockchain, BlockchainEvent, RegistryEvent}, + blockchain::Blockchain, config::{AppState, HttpConfig, IndexerConfig, RunMode}, db::DB, events_committer::EventsCommitter, @@ -28,6 +28,7 @@ mod sanity_check; pub mod tree; static BLOCKCHAIN_RETRY_DELAY: Duration = Duration::from_secs(1); +static MAX_ROLLBACKS_WITHOUT_PROGRESS: u32 = 3; /// Initializes the in-memory tree from a cache file if it exists, otherwise builds from DB. /// @@ -338,14 +339,6 @@ async fn run_both( } } -pub async fn handle_registry_event<'a>( - events_committer: &mut EventsCommitter<'a>, - event: BlockchainEvent, -) -> IndexerResult<()> { - events_committer.handle_event(event).await?; - Ok(()) -} - /// Stream registry events from the blockchain and process them. /// Restart when websocket connection is dropped. #[instrument(level = "info", skip_all, fields(start_from))] @@ -357,9 +350,23 @@ pub async fn process_registry_events( db: &DB, tree_state: &tree::TreeState, ) -> IndexerResult<()> { - // We re-create the blockchain connection (including backfill and websocket) when the stream - // returns an error or the websocket connection is dropped. - loop { + // We re-start the rpc websocket connection when we then local tree root deviates from the on-chain tree root or + // the websocket connection is dropped or we get any other error. + + // We keep track of the highest committed block number across restarts to avoid looping forever in case no progress is made. + let mut watermark_block_number = db + .world_id_registry_events() + .get_latest_block() + .await? + .unwrap_or(indexer_cfg.start_block); + + let mut rollbacks_without_progress = 0; + + // In-memory tree is rolled back on reorgs + let versioned_tree = + tree::VersionedTreeState::new(tree_state.clone(), indexer_cfg.tree_max_block_age); + + while rollbacks_without_progress < MAX_ROLLBACKS_WITHOUT_PROGRESS { tracing::info!("starting blockchain connection"); let blockchain = @@ -379,23 +386,29 @@ pub async fn process_registry_events( let mut stream = blockchain.backfill_and_stream_events(from, indexer_cfg.batch_size); - let versioned_tree = - tree::VersionedTreeState::new(tree_state.clone(), indexer_cfg.tree_max_block_age); let mut events_committer = EventsCommitter::new(db, versioned_tree.clone()); while let Some(event) = stream.next().await { match event { Ok(event) => { let block_number = event.block_number; - match handle_registry_event(&mut events_committer, event).await { - Ok(()) => { + match events_committer.handle_event(event).await { + Ok(root_recorded) => { crate::metrics::set_chain_processed_block(block_number); + // Only update the watermark block if the committed block number is higher than the current watermark. + // It may be lower if there was a reorg and we rolled back to a previous valid root. + if root_recorded { + if block_number > watermark_block_number { + watermark_block_number = block_number; + rollbacks_without_progress = 0; + } + } } Err(IndexerError::ReorgDetected { block_number, reason, }) => { - tracing::warn!( + tracing::error!( block_number, reason, "Reorg detected during event commit, rolling back" @@ -410,11 +423,8 @@ pub async fn process_registry_events( { Ok(Some(target)) => { tracing::info!(?target, "rolled back successfully"); - return Err(IndexerError::ReorgDetected { - block_number: target.block_number, - reason: "rolled back to last valid root, restart required" - .to_string(), - }); + rollbacks_without_progress += 1; + break; } Ok(None) => { return Err(IndexerError::ReorgDetected { @@ -435,6 +445,10 @@ pub async fn process_registry_events( } } - tracing::warn!("restarting blockchain connection"); + tracing::error!("restarting blockchain connection"); } + + return Err(IndexerError::NoProgressAfterReorg { + highest_block_number: watermark_block_number, + }); }