diff --git a/.gitignore b/.gitignore index 8196c671..f2ba007b 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ test-ledger/ minio test.db docker-compose.yml +*.txt diff --git a/Cargo.lock b/Cargo.lock index ae825d0c..7edd9674 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,8 +105,7 @@ checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" [[package]] name = "aligned-sized" version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48a526ec4434d531d488af59fe866f36b310fe8906691c75dffa664450a3800a" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "proc-macro2", "quote", @@ -3681,8 +3680,7 @@ dependencies = [ [[package]] name = "light-account-checks" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3fd000a2b8e0cc9d0b7b7712964870df51f2114f1693b9d8f0414f6f3ec16bd" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "solana-account-info", "solana-program-error", @@ -3694,15 +3692,14 @@ dependencies = [ [[package]] name = "light-batched-merkle-tree" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81c7e179246468b09bf5c6882ef33043e178ff90eb6eab0c1c4c3623ef84b154" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "aligned-sized", "borsh 0.10.4", "light-account-checks", "light-bloom-filter", "light-compressed-account", - "light-hasher", + "light-hasher 3.1.0 (git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19)", "light-macros", "light-merkle-tree-metadata", "light-verifier", @@ -3719,8 +3716,7 @@ dependencies = [ [[package]] name = "light-bloom-filter" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44abcb5554e1c15cefa9ac17e4ceda6f5afb039db25ab1fd777f012356d0f964" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "bitvec", "num-bigint 0.4.6", @@ -3744,13 +3740,12 @@ dependencies = [ [[package]] name = "light-compressed-account" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f15113babaca9efb592631ec1e7e78c1c83413818a6e1e4248b7df53d88fe65" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "anchor-lang 0.31.1", "borsh 0.10.4", "bytemuck", - "light-hasher", + "light-hasher 3.1.0 (git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19)", "light-macros", "light-zero-copy", "solana-program-error", @@ -3767,7 +3762,7 @@ checksum = "9b4f878301620df78ba7e7758c5fd720f28040f5c157375f88d310f15ddb1746" dependencies = [ "borsh 0.10.4", "light-bounded-vec", - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "memoffset 0.9.1", "thiserror 2.0.12", ] @@ -3777,6 +3772,23 @@ name = "light-hasher" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6445937ea244bebae0558e2aaec375791895d08c785b87cc45b62cd80d69139" +dependencies = [ + "ark-bn254 0.5.0", + "ark-ff 0.5.0", + "arrayvec", + "borsh 0.10.4", + "light-poseidon 0.3.0", + "num-bigint 0.4.6", + "sha2 0.10.9", + "sha3 0.10.8", + "solana-nostd-keccak", + "thiserror 2.0.12", +] + +[[package]] +name = "light-hasher" +version = "3.1.0" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "ark-bn254 0.5.0", "ark-ff 0.5.0", @@ -3798,7 +3810,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc786d8df68ef64493fea04914a7a7745f8122f2efbae043cd4ba4eaffa9e6db" dependencies = [ - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num-bigint 0.4.6", "num-traits", "thiserror 2.0.12", @@ -3807,8 +3819,7 @@ dependencies = [ [[package]] name = "light-macros" version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "861c0817697c1201c2235cd831fcbaa2564a5f778e5229e9f5cc21035e97c273" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "bs58 0.5.1", "proc-macro2", @@ -3819,8 +3830,7 @@ dependencies = [ [[package]] name = "light-merkle-tree-metadata" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "544048fa95ea95fc1e952a2b9b1d6f09340c8decaffd1ad239fe1f6eb905ae76" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "borsh 0.10.4", "bytemuck", @@ -3838,7 +3848,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1650701feac958261b2c3ab4da361ad8548985ee3ee496a17e76db44d2d3c9e3" dependencies = [ - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "light-indexed-array", "num-bigint 0.4.6", "num-traits", @@ -3872,8 +3882,7 @@ dependencies = [ [[package]] name = "light-verifier" version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fdf317ec3cfcd3a8e6556a5b5e7fbcc207a40264700f9a5271876838f26f58" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "groth16-solana", "light-compressed-account", @@ -3883,8 +3892,7 @@ dependencies = [ [[package]] name = "light-zero-copy" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a34d759f65547a6540db7047f38f4cb2c3f01658deca95a1dd06f26b578de947" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "solana-program-error", "thiserror 2.0.12", @@ -4564,7 +4572,7 @@ dependencies = [ "light-batched-merkle-tree", "light-compressed-account", "light-concurrent-merkle-tree", - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "light-merkle-tree-metadata", "light-merkle-tree-reference", "light-poseidon 0.3.0", diff --git a/Cargo.toml b/Cargo.toml index b0e81c70..6c51226c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,10 @@ path = "src/snapshot/loader/main.rs" name = "photon-tree-validator" path = "src/tools/tree_validator/main.rs" +[[bin]] +name = "photon-analyze-snapshot" +path = "src/tools/analyze_snapshot.rs" + [dependencies] ark-serialize = "0.5" ark-bn254 = "0.5" @@ -82,9 +86,11 @@ solana-pubkey = "2.3.0" solana-transaction-status = "1.18.0" light-concurrent-merkle-tree = "2.1.0" -light-batched-merkle-tree = "0.3.0" -light-merkle-tree-metadata = "0.3.0" -light-compressed-account = { version = "0.3.0", features = ["anchor"] } +light-batched-merkle-tree = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" } +light-merkle-tree-metadata = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" } +light-compressed-account = { version = "0.3.0", features = [ + "anchor", +], git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" } light-hasher = { version = "3.1.0" } light-poseidon = "0.3.0" diff --git a/src/api/method/get_multiple_compressed_accounts.rs b/src/api/method/get_multiple_compressed_accounts.rs index 9d925765..3b418563 100644 --- a/src/api/method/get_multiple_compressed_accounts.rs +++ b/src/api/method/get_multiple_compressed_accounts.rs @@ -148,7 +148,11 @@ pub async fn get_multiple_compressed_accounts( } fetch_account_from_addresses(conn, addresses).await? } - _ => panic!("Either hashes or addresses must be provided"), + _ => { + return Err(PhotonApiError::ValidationError( + "Either hashes or addresses must be provided".to_string(), + )); + } }; Ok(GetMultipleCompressedAccountsResponse { diff --git a/src/api/method/get_queue_elements.rs b/src/api/method/get_queue_elements.rs index 580a0786..6b7a02db 100644 --- a/src/api/method/get_queue_elements.rs +++ b/src/api/method/get_queue_elements.rs @@ -77,7 +77,8 @@ pub async fn get_queue_elements( query_condition.add(accounts::Column::NullifierQueueIndex.is_not_null()); if let Some(start_queue_index) = request.start_queue_index { query_condition = query_condition - .add(accounts::Column::NullifierQueueIndex.gte(start_queue_index as i64)); + .add(accounts::Column::NullifierQueueIndex.gte(start_queue_index as i64)) + .add(accounts::Column::NullifiedInTree.eq(false)); } } QueueType::OutputStateV2 => { diff --git a/src/api/method/get_transaction_with_compression_info.rs b/src/api/method/get_transaction_with_compression_info.rs index 6c2c4fb6..3a935ee2 100644 --- a/src/api/method/get_transaction_with_compression_info.rs +++ b/src/api/method/get_transaction_with_compression_info.rs @@ -210,6 +210,7 @@ pub async fn get_transaction_helper( PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0)) })?, slot, + None, ) .map_err(|_e| { PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0)) @@ -360,6 +361,7 @@ pub async fn get_transaction_helper_v2( PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0)) })?, slot, + None, ) .map_err(|_e| { PhotonApiError::UnexpectedError(format!("Failed to parse transaction {}", signature.0)) diff --git a/src/ingester/error.rs b/src/ingester/error.rs index 12b87ef1..4f74fb32 100644 --- a/src/ingester/error.rs +++ b/src/ingester/error.rs @@ -14,6 +14,10 @@ pub enum IngesterError { EmptyBatchEvent, #[error("Invalid event.")] InvalidEvent, + #[error("Custom error: {0}")] + CustomError(String), + #[error("Gap detected, triggering rewind")] + GapDetectedRewind, } impl From for IngesterError { diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5a4a6764..9ef79c81 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -15,6 +15,7 @@ use solana_client::nonblocking::rpc_client::RpcClient; use solana_pubkey::Pubkey; use solana_sdk::pubkey::Pubkey as SdkPubkey; use solana_sdk::signature::Signature; +use tokio::sync::mpsc; use tokio::time::sleep; use tracing::error; use yellowstone_grpc_client::{GeyserGrpcBuilderResult, GeyserGrpcClient, Interceptor}; @@ -30,6 +31,7 @@ use yellowstone_grpc_proto::solana::storage::confirmed_block::InnerInstructions; use crate::api::method::get_indexer_health::HEALTH_CHECK_SLOT_DISTANCE; use crate::common::typedefs::hash::Hash; use crate::ingester::fetchers::poller::get_block_poller_stream; +use crate::ingester::gap::RewindCommand; use crate::ingester::typedefs::block_info::{ BlockInfo, BlockMetadata, Instruction, InstructionGroup, TransactionInfo, }; @@ -43,6 +45,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, + rewind_receiver: Option>, ) -> impl Stream> { stream! { start_latest_slot_updater(rpc_client.clone()).await; @@ -53,6 +56,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + rewind_receiver, )) ); @@ -115,6 +119,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for timeout fallback ))); continue; } @@ -132,6 +137,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for out-of-order fallback ))); continue; } @@ -144,6 +150,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for unhealthy fallback ))); } } diff --git a/src/ingester/fetchers/mod.rs b/src/ingester/fetchers/mod.rs index cc3235da..34f29c71 100644 --- a/src/ingester/fetchers/mod.rs +++ b/src/ingester/fetchers/mod.rs @@ -3,12 +3,14 @@ use std::sync::Arc; use async_stream::stream; use futures::{pin_mut, Stream, StreamExt}; use solana_client::nonblocking::rpc_client::RpcClient; +use tokio::sync::mpsc; use super::typedefs::block_info::BlockInfo; pub mod grpc; pub mod poller; +use crate::ingester::gap::RewindCommand; use grpc::get_grpc_stream_with_rpc_fallback; use poller::get_block_poller_stream; @@ -17,10 +19,11 @@ pub struct BlockStreamConfig { pub geyser_url: Option, pub max_concurrent_block_fetches: usize, pub last_indexed_slot: u64, + pub rewind_receiver: Option>, } impl BlockStreamConfig { - pub fn load_block_stream(&self) -> impl Stream> { + pub fn load_block_stream(mut self) -> impl Stream> { let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| { let auth_header = std::env::var("GRPC_X_TOKEN").unwrap(); get_grpc_stream_with_rpc_fallback( @@ -29,6 +32,7 @@ impl BlockStreamConfig { self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, + self.rewind_receiver.take(), ) }); @@ -37,6 +41,7 @@ impl BlockStreamConfig { self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, + self.rewind_receiver.take(), )) } else { None diff --git a/src/ingester/fetchers/poller.rs b/src/ingester/fetchers/poller.rs index 729d20e5..b7e469c3 100644 --- a/src/ingester/fetchers/poller.rs +++ b/src/ingester/fetchers/poller.rs @@ -9,10 +9,12 @@ use futures::{pin_mut, Stream, StreamExt}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError, }; +use tokio::sync::mpsc; use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; +use crate::ingester::gap::RewindCommand; use crate::{ ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, metric, @@ -40,33 +42,66 @@ pub fn get_block_poller_stream( rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, + mut rewind_receiver: Option>, ) -> impl Stream> { stream! { - let start_slot = match last_indexed_slot { + let mut current_start_slot = match last_indexed_slot { 0 => 0, last_indexed_slot => last_indexed_slot + 1 }; - let slot_stream = get_slot_stream(rpc_client.clone(), start_slot); - pin_mut!(slot_stream); - let block_stream = slot_stream - .map(|slot| { - let rpc_client = rpc_client.clone(); - async move { fetch_block_with_infinite_retries(rpc_client.clone(), slot).await } - }) - .buffer_unordered(max_concurrent_block_fetches); - pin_mut!(block_stream); - let mut block_cache: BTreeMap = BTreeMap::new(); - while let Some(block) = block_stream.next().await { - if let Some(block) = block { - block_cache.insert(block.metadata.slot, block); - } - let (blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot); - last_indexed_slot = last_indexed_slot_from_cache; - metric! { - statsd_count!("rpc_block_emitted", blocks_to_index.len() as i64); + + loop { + let slot_stream = get_slot_stream(rpc_client.clone(), current_start_slot); + pin_mut!(slot_stream); + let block_stream = slot_stream + .map(|slot| { + let rpc_client = rpc_client.clone(); + async move { fetch_block_with_infinite_retries(rpc_client.clone(), slot).await } + }) + .buffer_unordered(max_concurrent_block_fetches); + pin_mut!(block_stream); + let mut block_cache: BTreeMap = BTreeMap::new(); + let mut rewind_occurred = false; + + while let Some(block) = block_stream.next().await { + // Check for rewind commands before processing blocks + if let Some(ref mut receiver) = rewind_receiver { + while let Ok(command) = receiver.try_recv() { + match command { + RewindCommand::Rewind { to_slot, reason } => { + log::error!("Rewinding block stream to {}: {}", to_slot, reason); + // Clear cached blocks + block_cache.clear(); + // Reset positions + last_indexed_slot = to_slot - 1; + current_start_slot = to_slot; + rewind_occurred = true; + log::info!("Cleared cache, restarting from slot {}", current_start_slot); + break; + } + } + } + } + + if rewind_occurred { + break; // Exit inner loop to restart streams + } + + if let Some(block) = block { + block_cache.insert(block.metadata.slot, block); + } + let (blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot); + last_indexed_slot = last_indexed_slot_from_cache; + metric! { + statsd_count!("rpc_block_emitted", blocks_to_index.len() as i64); + } + if !blocks_to_index.is_empty() { + yield blocks_to_index; + } } - if !blocks_to_index.is_empty() { - yield blocks_to_index; + + if !rewind_occurred { + break; // Normal termination } } } diff --git a/src/ingester/gap/mod.rs b/src/ingester/gap/mod.rs new file mode 100644 index 00000000..58563d50 --- /dev/null +++ b/src/ingester/gap/mod.rs @@ -0,0 +1,110 @@ +use lazy_static::lazy_static; +use solana_pubkey::Pubkey; +use std::collections::HashMap; +use std::sync::RwLock; +use tracing::{debug, info}; + +mod rewind; +mod sequences; +mod treetype_seq; + +use crate::ingester::gap::treetype_seq::TreeTypeSeq; + +pub use rewind::{RewindCommand, RewindController}; +pub use sequences::StateUpdateSequences; + +// Global sequence state tracker to maintain latest observed sequences +lazy_static! { + pub static ref SEQUENCE_STATE: RwLock> = + RwLock::new(HashMap::new()); +} + +#[derive(Debug, Clone)] +pub struct SequenceGap { + // Boundary information for gap filling + pub before_slot: u64, + pub after_slot: u64, + pub before_signature: String, + pub after_signature: String, + + pub tree_pubkey: Option, + pub field_type: StateUpdateFieldType, +} + +#[derive(Debug, Default, Clone)] +pub struct SequenceEntry { + pub sequence: u64, + pub slot: u64, + pub signature: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum StateUpdateFieldType { + IndexedTreeUpdate, + LeafNullification, + BatchNullifyContext, + BatchNewAddress, + BatchMerkleTreeEventAppend, + BatchMerkleTreeEventNullify, + BatchMerkleTreeEventAddressAppend, + OutAccount, +} + +/// Clears the global sequence state - used after rewind to re-learn sequences +pub fn clear_sequence_state() { + match SEQUENCE_STATE.write() { + Ok(mut state) => { + state.clear(); + info!("Cleared sequence state after rewind"); + } + Err(e) => { + debug!( + "Failed to acquire write lock to clear sequence state: {}", + e + ); + } + } +} + +/// Gets the current sequence state from the global state tracker +pub fn get_current_sequence_state( + tree_pubkey: Option, + queue_pubkey: Option, + field_type: &StateUpdateFieldType, +) -> TreeTypeSeq { + let state = match SEQUENCE_STATE.read() { + Ok(state) => state, + Err(e) => { + debug!("Failed to acquire sequence state read lock: {}", e); + return TreeTypeSeq::default(); + } + }; + + if let Some(tree) = tree_pubkey { + let tree_str = tree.to_string(); + if let Some(current_seq) = state.get(&tree_str) { + debug!( + "Using current sequence state for tree {}: {:?}", + tree_str, current_seq + ); + current_seq.clone() + } else { + debug!("No current sequence state found for tree {}", tree_str); + TreeTypeSeq::default() + } + } else if let Some(queue_pubkey) = queue_pubkey { + let queue_str = queue_pubkey.to_string(); + if let Some(current_seq) = state.get(&queue_str) { + current_seq.clone() + } else { + debug!("No current sequence state found for queue {}", queue_str); + TreeTypeSeq::default() + } + } else { + debug!( + "No tree/queue pubkey provided for field_type: {:?}", + field_type + ); + TreeTypeSeq::default() + } +} diff --git a/src/ingester/gap/rewind.rs b/src/ingester/gap/rewind.rs new file mode 100644 index 00000000..0593e6b7 --- /dev/null +++ b/src/ingester/gap/rewind.rs @@ -0,0 +1,127 @@ +use crate::ingester::gap::SequenceGap; +use thiserror::Error; +use tokio::sync::mpsc; + +#[derive(Debug, Clone)] +pub enum RewindCommand { + Rewind { to_slot: u64, reason: String }, +} + +#[derive(Debug, Error)] +pub enum RewindError { + #[error("Failed to send rewind command: {0}")] + SendError(String), + #[error("Invalid rewind slot: {0}")] + InvalidSlot(u64), +} + +#[derive(Debug, Clone)] +pub struct RewindController { + sender: mpsc::UnboundedSender, +} + +impl RewindController { + pub fn new() -> (Self, mpsc::UnboundedReceiver) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Self { sender }, receiver) + } + + pub fn request_rewind(&self, to_slot: u64, reason: String) -> Result<(), RewindError> { + let command = RewindCommand::Rewind { to_slot, reason }; + self.sender + .send(command) + .map_err(|e| RewindError::SendError(e.to_string()))?; + Ok(()) + } + + pub fn request_rewind_for_gaps(&self, gaps: &[SequenceGap]) -> Result<(), RewindError> { + if gaps.is_empty() { + return Ok(()); + } + + let rewind_slot = determine_rewind_slot_from_gaps(gaps); + let gap_count = gaps.len(); + let reason = format!("Sequence gaps detected: {} gaps found", gap_count); + + tracing::warn!( + "Requesting rewind to slot {} due to {} sequence gaps", + rewind_slot, + gap_count + ); + self.request_rewind(rewind_slot, reason) + } +} + +/// Determines the appropriate rewind slot based on detected gaps +/// Uses the earliest before_slot from all gaps to ensure we capture all missing data +fn determine_rewind_slot_from_gaps(gaps: &[SequenceGap]) -> u64 { + gaps.iter() + .map(|gap| gap.before_slot) + .filter(|&slot| slot > 0) // Filter out zero slots from initialization + .min() + .unwrap_or(0) // Fallback to slot 0 if no valid slots found +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ingester::gap::{SequenceGap, StateUpdateFieldType}; + use solana_pubkey::Pubkey; + + #[test] + fn test_rewind_controller_creation() { + let (controller, _receiver) = RewindController::new(); + let result = controller.request_rewind(100, "test rewind".to_string()); + assert!(result.is_ok()); + } + + #[test] + fn test_determine_rewind_slot_from_gaps() { + let gaps = vec![ + SequenceGap { + before_slot: 1000, + after_slot: 1002, + before_signature: "sig1".to_string(), + after_signature: "sig2".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + SequenceGap { + before_slot: 995, + after_slot: 997, + before_signature: "sig3".to_string(), + after_signature: "sig4".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + ]; + + let rewind_slot = determine_rewind_slot_from_gaps(&gaps); + assert_eq!(rewind_slot, 995); // Should pick the earliest before_slot + } + + #[test] + fn test_determine_rewind_slot_filters_zero_slots() { + let gaps = vec![ + SequenceGap { + before_slot: 0, // Should be filtered out + after_slot: 1002, + before_signature: "".to_string(), + after_signature: "sig2".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + SequenceGap { + before_slot: 995, + after_slot: 997, + before_signature: "sig3".to_string(), + after_signature: "sig4".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + ]; + + let rewind_slot = determine_rewind_slot_from_gaps(&gaps); + assert_eq!(rewind_slot, 995); // Should ignore slot 0 and pick 995 + } +} diff --git a/src/ingester/gap/sequences.rs b/src/ingester/gap/sequences.rs new file mode 100644 index 00000000..9593d059 --- /dev/null +++ b/src/ingester/gap/sequences.rs @@ -0,0 +1,418 @@ +use crate::ingester::gap::treetype_seq::TreeTypeSeq; +use crate::ingester::gap::{ + get_current_sequence_state, SequenceEntry, SequenceGap, StateUpdateFieldType, SEQUENCE_STATE, +}; +use crate::ingester::parser::indexer_events::MerkleTreeEvent; +use crate::ingester::parser::state_update::StateUpdate; +use crate::ingester::parser::tree_info::QUEUE_TREE_MAPPING; +use solana_pubkey::Pubkey; +use std::collections::HashMap; +use tracing::debug; + +#[derive(Debug, Default, Clone)] +pub struct StateUpdateSequences { + // Sequences with slot and signature information for gap analysis + indexed_tree_seqs: HashMap<(Pubkey, u64), Vec>, // (tree, tree_type_id) -> entries + nullification_seqs: HashMap>, // tree -> entries + batch_nullify_queue_indexes: HashMap>, // tree -> queue_index entries + batch_address_queue_indexes: HashMap>, // tree -> queue_index entries + batch_merkle_event_seqs: HashMap<(Pubkey, u8), Vec>, // (tree_pubkey, event_type) -> entries + out_account_leaf_indexes: HashMap>, // tree -> leaf_index entries +} + +impl StateUpdateSequences { + /// Extracts sequences from a StateUpdate with slot and signature context + pub fn extract_state_update_sequences( + &mut self, + state_update: &StateUpdate, + slot: u64, + signature: &str, + ) { + // Extract indexed tree sequences + for ((tree_pubkey, _), leaf_update) in &state_update.indexed_merkle_tree_updates { + self.indexed_tree_seqs + .entry((*tree_pubkey, leaf_update.tree_type as u64)) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: leaf_update.seq, + slot, + signature: signature.to_string(), + }); + } + + // Extract leaf nullification sequences + for nullification in &state_update.leaf_nullifications { + self.nullification_seqs + .entry(nullification.tree) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: nullification.seq, + slot, + signature: signature.to_string(), + }); + } + + // Extract batch nullify context queue indexes + for context in &state_update.batch_nullify_context { + let tree = Pubkey::new_from_array(context.tree_pubkey.to_bytes()); + self.batch_nullify_queue_indexes + .entry(tree) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: context.nullifier_queue_index, + slot, + signature: signature.to_string(), + }); + } + + // Extract batch new address queue indexes + for address in &state_update.batch_new_addresses { + let tree_str = address.tree.0.to_string(); + debug!( + "Extracting batch_new_address for tree: {}, queue_index: {}", + tree_str, address.queue_index + ); + + // Check if this tree should not be in batch operations + if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) { + // batch_new_addresses should only contain AddressV2 trees + if info.tree_type != light_compressed_account::TreeType::AddressV2 { + tracing::error!( + "{:?} wrong tree {tree_str} found in batch_new_addresses \ + Only AddressV2 trees should be in batch new address operations. \ + queue_index: {}, slot: {}, signature: {}", + info.tree_type, + address.queue_index, + slot, + signature + ); + // Skip this invalid data + continue; + } + } + + self.batch_address_queue_indexes + .entry(address.tree.0) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: address.queue_index, + slot, + signature: signature.to_string(), + }); + } + + // Extract batch merkle tree event sequences + for (tree_hash, events) in &state_update.batch_merkle_tree_events { + let tree_pubkey = Pubkey::from(*tree_hash); + for (seq, merkle_event) in events { + let event_type = merkle_event_to_type_id(merkle_event); + if event_type > 0 { + self.batch_merkle_event_seqs + .entry((tree_pubkey, event_type)) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: *seq, + slot, + signature: signature.to_string(), + }); + } + } + } + + // Extract out_account sequences + for account_with_context in &state_update.out_accounts { + let tree_pubkey = account_with_context.account.tree.0; + if let Some(seq_value) = account_with_context.account.seq { + self.out_account_leaf_indexes + .entry(tree_pubkey) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: seq_value.0, + slot, + signature: signature.to_string(), + }); + } + } + } + + /// Updates the global sequence state with the latest observed sequences + pub fn update_sequence_state(&self) { + let current_state = match SEQUENCE_STATE.read() { + Ok(state) => state, + Err(e) => { + debug!("Failed to acquire read lock for sequence state: {}", e); + return; + } + }; + + let mut updates: HashMap = HashMap::new(); + + // Process indexed tree sequences + for ((tree_pubkey, _tree_type_id), entries) in &self.indexed_tree_seqs { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) { + match info.tree_type { + light_compressed_account::TreeType::AddressV1 => { + updates.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone())); + } + tree_type => { + tracing::error!( + "Unhandled tree type {:?} for tree {} in indexed_tree_seqs", + tree_type, + tree_str + ); + } + } + } + } + } + + // Process nullification sequences + for (tree_pubkey, entries) in &self.nullification_seqs { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + updates.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone())); + } + } + + // Process batch address queue indexes (AddressV2) + for (tree_pubkey, entries) in &self.batch_address_queue_indexes { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + debug!( + "Updating batch_address_queue_indexes for tree: {}, sequence: {}", + tree_str, max_entry.sequence + ); + + updates.insert( + tree_str.clone(), + TreeTypeSeq::new_address_v2_with_output( + current_state.get(&tree_str), + max_entry.clone(), + ), + ); + } + } + + // Process out account leaf indexes + for (tree_pubkey, entries) in &self.out_account_leaf_indexes { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) { + match info.tree_type { + light_compressed_account::TreeType::StateV2 => { + updates.insert( + tree_str.clone(), + TreeTypeSeq::new_state_v2_with_output( + current_state.get(&tree_str), + max_entry.clone(), + ), + ); + } + light_compressed_account::TreeType::StateV1 => { + updates.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone())); + } + tree_type => { + tracing::error!( + "Unhandled tree type {:?} for tree {} in out_account_leaf_indexes", + tree_type, + tree_str + ); + } + } + } + } + } + + // Drop read lock before acquiring write lock + drop(current_state); + + // Apply all updates atomically + if !updates.is_empty() { + match SEQUENCE_STATE.write() { + Ok(mut state) => { + for (key, value) in updates { + state.insert(key, value); + } + } + Err(e) => { + debug!("Failed to acquire write lock for sequence state: {}", e); + } + } + } + } + + /// Comprehensive gap detection function that takes a vector of StateUpdateSequences and returns ALL gaps found + /// Aggregates sequences from multiple StateUpdates and detects gaps across all transactions + pub fn detect_all_sequence_gaps(&self) -> Vec { + let mut all_gaps = Vec::new(); + + // Check indexed tree updates + for ((tree_pubkey, tree_type_id), seqs) in &self.indexed_tree_seqs { + debug!( + "Processing indexed_tree_seqs - tree: {}, tree_type_id: {}", + tree_pubkey, tree_type_id + ); + let gaps = StateUpdateSequences::detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::IndexedTreeUpdate, + ); + all_gaps.extend(gaps); + } + + // Check leaf nullifications + for (tree_pubkey, seqs) in &self.nullification_seqs { + let gaps = StateUpdateSequences::detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::LeafNullification, + ); + all_gaps.extend(gaps); + } + + // Check batch nullify context + for (tree_pubkey, entries) in &self.batch_nullify_queue_indexes { + if !entries.is_empty() { + let gaps = StateUpdateSequences::detect_sequence_gaps_with_metadata( + entries, + Some(*tree_pubkey), + None, + StateUpdateFieldType::BatchNullifyContext, + ); + all_gaps.extend(gaps); + } + } + + // Check batch new addresses + for (tree_pubkey, seqs) in &self.batch_address_queue_indexes { + let gaps = StateUpdateSequences::detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::BatchNewAddress, + ); + all_gaps.extend(gaps); + } + + // Check batch merkle tree events + for ((tree_pubkey, event_type), seqs) in &self.batch_merkle_event_seqs { + let field_type = match event_type { + 1 => StateUpdateFieldType::BatchMerkleTreeEventAppend, + 2 => StateUpdateFieldType::BatchMerkleTreeEventNullify, + 3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend, + _ => continue, + }; + + let gaps = StateUpdateSequences::detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + field_type, + ); + all_gaps.extend(gaps); + } + + // Check out_account leaf indexes + for (tree_pubkey, seqs) in &self.out_account_leaf_indexes { + let gaps = StateUpdateSequences::detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::OutAccount, + ); + all_gaps.extend(gaps); + } + + all_gaps + } + + /// Detects gaps in a sequence with full metadata for gap filling + fn detect_sequence_gaps_with_metadata( + sequences: &[SequenceEntry], + tree_pubkey: Option, + queue_pubkey: Option, + field_type: StateUpdateFieldType, + ) -> Vec { + if sequences.len() < 2 { + return Vec::new(); + } + + let mut sorted_sequences = sequences.to_vec(); + sorted_sequences.sort_by_key(|entry| entry.sequence); + let mut gaps = Vec::new(); + + let start_seq = get_current_sequence_state(tree_pubkey, queue_pubkey, &field_type); + let (unpacked_start_seq, start_entry) = start_seq.extract_sequence_info(&field_type); + + // Skip gap detection for tree initialization (when unpacked_start_seq == 0) + // because there's no previous sequence to compare against + // Also skip if unpacked_start_seq is u64::MAX (no state found) + if unpacked_start_seq > 0 && unpacked_start_seq != u64::MAX { + // Check for any missing sequences between global state and the minimum sequence in this block + let min_seq_in_block = sorted_sequences[0].sequence; + + // Check if there's a gap between the global state and the sequences in this block + // A gap exists if the minimum sequence in the block is more than 1 away from global state + // AND the missing sequences are not present anywhere in this block + if min_seq_in_block > unpacked_start_seq.saturating_add(1) { + // Check if ALL missing sequences are present in this block + let mut has_real_gap = false; + for missing_seq in (unpacked_start_seq + 1)..min_seq_in_block { + let found = sorted_sequences.iter().any(|e| e.sequence == missing_seq); + if !found { + has_real_gap = true; + break; + } + } + + if has_real_gap { + let (before_slot, before_signature) = if let Some(entry) = start_entry { + (entry.slot, entry.signature) + } else { + (0, String::new()) + }; + + gaps.push(SequenceGap { + before_slot, + after_slot: sorted_sequences[0].slot, + before_signature, + after_signature: sorted_sequences[0].signature.clone(), + tree_pubkey, + field_type: field_type.clone(), + }); + } + } + } + + for i in 1..sorted_sequences.len() { + let prev_entry = &sorted_sequences[i - 1]; + let curr_entry = &sorted_sequences[i]; + + if curr_entry.sequence - prev_entry.sequence > 1 { + gaps.push(SequenceGap { + before_slot: prev_entry.slot, + after_slot: curr_entry.slot, + before_signature: prev_entry.signature.clone(), + after_signature: curr_entry.signature.clone(), + tree_pubkey, + field_type: field_type.clone(), + }); + } + } + + gaps + } +} + +fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 { + match event { + MerkleTreeEvent::BatchAppend(_) => 1, + MerkleTreeEvent::BatchNullify(_) => 2, + MerkleTreeEvent::BatchAddressAppend(_) => 3, + _ => 0, // Other event types we don't care about + } +} diff --git a/src/ingester/gap/treetype_seq.rs b/src/ingester/gap/treetype_seq.rs new file mode 100644 index 00000000..a08fe69f --- /dev/null +++ b/src/ingester/gap/treetype_seq.rs @@ -0,0 +1,155 @@ +use crate::ingester::gap::{SequenceEntry, StateUpdateFieldType}; +use tracing::debug; + +#[derive(Debug, Clone)] +pub enum TreeTypeSeq { + StateV1(SequenceEntry), + // Output queue (leaf index), Input queue index, Batch event seq with context + StateV2(StateV2SeqWithContext), + // event seq with complete context + AddressV1(SequenceEntry), + // Input queue index, Batch event seq with context + AddressV2(SequenceEntry, SequenceEntry), // (input_queue_entry, batch_event_entry) +} + +impl Default for TreeTypeSeq { + fn default() -> Self { + TreeTypeSeq::StateV1(SequenceEntry::default()) + } +} + +#[derive(Debug, Clone, Default)] +pub struct StateV2SeqWithContext { + pub input_queue_entry: Option, + pub batch_event_entry: Option, + pub output_queue_entry: Option, +} + +/// Helper functions for elegant state updates +impl TreeTypeSeq { + /// Gets existing StateV2 context or creates a default one + fn get_or_default_state_v2(current: Option<&TreeTypeSeq>) -> StateV2SeqWithContext { + current + .and_then(|seq| match seq { + TreeTypeSeq::StateV2(ctx) => Some(ctx.clone()), + _ => None, + }) + .unwrap_or_default() + } + + /// Gets existing AddressV2 input queue entry or creates a default one + fn get_or_default_address_v2_input(current: Option<&TreeTypeSeq>) -> SequenceEntry { + current + .and_then(|seq| match seq { + TreeTypeSeq::AddressV2(input, _) => Some(input.clone()), + _ => None, + }) + .unwrap_or_default() + } + + /// Creates a new StateV2 with updated output queue entry + pub(crate) fn new_state_v2_with_output( + current: Option<&TreeTypeSeq>, + output_entry: SequenceEntry, + ) -> TreeTypeSeq { + let mut ctx = Self::get_or_default_state_v2(current); + ctx.output_queue_entry = Some(output_entry); + TreeTypeSeq::StateV2(ctx) + } + + /// Creates a new AddressV2 preserving input queue entry + pub(crate) fn new_address_v2_with_output( + current: Option<&TreeTypeSeq>, + output_entry: SequenceEntry, + ) -> TreeTypeSeq { + let input_entry = Self::get_or_default_address_v2_input(current); + TreeTypeSeq::AddressV2(input_entry, output_entry) + } + + /// Extracts sequence information based on field type and tree type + /// + /// Returns `(sequence_number, optional_entry)` where: + /// - `u64::MAX` indicates invalid state - tree type mismatch or unexpected configuration. + /// Gap detection will be skipped entirely for these cases. + /// - `0` indicates valid initial state - the expected tree type exists but the specific + /// sequence entry hasn't been initialized yet. Gap detection remains active. + /// - Any other value represents an actual sequence number from existing state. + /// + /// This distinction is important because: + /// - Invalid configurations (u64::MAX) should not trigger false-positive gap alerts + /// - Valid but uninitialized sequences (0) should still detect gaps if the first + /// observed sequence is > 1 + pub fn extract_sequence_info( + &self, + field_type: &StateUpdateFieldType, + ) -> (u64, Option) { + match field_type { + StateUpdateFieldType::IndexedTreeUpdate => match self { + TreeTypeSeq::AddressV1(entry) => { + debug!("IndexedTreeUpdate with AddressV1, seq: {}", entry.sequence); + (entry.sequence, Some(entry.clone())) + } + _ => { + debug!("IndexedTreeUpdate with unsupported tree type: {:?}", self); + (u64::MAX, None) + } + }, + StateUpdateFieldType::BatchMerkleTreeEventAddressAppend + | StateUpdateFieldType::BatchNewAddress => { + if let TreeTypeSeq::AddressV2(_, entry) = self { + (entry.sequence, Some(entry.clone())) + } else { + debug!("Expected AddressV2 for {:?}, got {:?}", field_type, self); + (u64::MAX, None) + } + } + StateUpdateFieldType::BatchMerkleTreeEventAppend + | StateUpdateFieldType::BatchMerkleTreeEventNullify => { + if let TreeTypeSeq::StateV2(seq_context) = self { + if let Some(entry) = &seq_context.batch_event_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } else { + debug!("Expected StateV2 for {:?}, got {:?}", field_type, self); + (u64::MAX, None) + } + } + StateUpdateFieldType::LeafNullification => { + if let TreeTypeSeq::StateV1(entry) = self { + (entry.sequence, Some(entry.clone())) + } else { + debug!("Expected StateV1 for LeafNullification, got {:?}", self); + (u64::MAX, None) + } + } + StateUpdateFieldType::OutAccount => match self { + TreeTypeSeq::StateV1(entry) => (entry.sequence, Some(entry.clone())), + TreeTypeSeq::StateV2(seq_context) => { + if let Some(entry) = &seq_context.output_queue_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } + _ => { + debug!("Expected StateV1/V2 for OutAccount, got {:?}", self); + (u64::MAX, None) + } + }, + StateUpdateFieldType::BatchNullifyContext => { + if let TreeTypeSeq::StateV2(seq_context) = self { + if let Some(entry) = &seq_context.input_queue_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } else { + debug!("Expected StateV2 for BatchNullifyContext, got {:?}", self); + (u64::MAX, None) + } + } + } + } +} diff --git a/src/ingester/indexer/mod.rs b/src/ingester/indexer/mod.rs index fa696d56..edc37907 100644 --- a/src/ingester/indexer/mod.rs +++ b/src/ingester/indexer/mod.rs @@ -6,12 +6,12 @@ use log::info; use sea_orm::{sea_query::Expr, DatabaseConnection, EntityTrait, FromQueryResult, QuerySelect}; use solana_client::nonblocking::rpc_client::RpcClient; +use super::typedefs::block_info::BlockInfo; +use crate::ingester::gap::RewindController; use crate::{ common::fetch_current_slot_with_infinite_retry, dao::generated::blocks, ingester::index_block_batch_with_infinite_retries, }; - -use super::typedefs::block_info::BlockInfo; const POST_BACKFILL_FREQUENCY: u64 = 10; const PRE_BACKFILL_FREQUENCY: u64 = 10; @@ -40,7 +40,7 @@ pub async fn fetch_last_indexed_slot_with_infinite_retry( } Err(e) => { log::error!("Failed to fetch current slot from database: {}", e); - sleep(Duration::from_secs(5)); + tokio::time::sleep(Duration::from_secs(5)).await; } } } @@ -52,6 +52,8 @@ pub async fn index_block_stream( rpc_client: Arc, last_indexed_slot_at_start: u64, end_slot: Option, + rewind_controller: Option<&RewindController>, + tree_filter: Option, ) { pin_mut!(block_stream); let current_slot = @@ -71,28 +73,54 @@ pub async fn index_block_stream( while let Some(blocks) = block_stream.next().await { let last_slot_in_block = blocks.last().unwrap().metadata.slot; - index_block_batch_with_infinite_retries(db.as_ref(), blocks).await; - - for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) { - let blocks_indexed = slot - last_indexed_slot_at_start; - if blocks_indexed < number_of_blocks_to_backfill { - if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 { - info!( - "Backfilled {} / {} blocks", - blocks_indexed, number_of_blocks_to_backfill - ); + match index_block_batch_with_infinite_retries( + db.as_ref(), + blocks, + rewind_controller, + tree_filter, + ) + .await + { + Ok(()) => { + for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) { + let blocks_indexed = slot - last_indexed_slot_at_start; + if blocks_indexed < number_of_blocks_to_backfill { + if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 { + if tree_filter.is_some() { + info!( + "Backfilled {} / {} blocks (filtering for tree: {:?})", + blocks_indexed, number_of_blocks_to_backfill, tree_filter + ); + } else { + info!( + "Backfilled {} / {} blocks", + blocks_indexed, number_of_blocks_to_backfill + ); + } + } + } else { + if finished_backfill_slot.is_none() { + info!("Finished backfilling historical blocks!"); + info!("Starting to index new blocks..."); + finished_backfill_slot = Some(slot); + } + if slot % POST_BACKFILL_FREQUENCY == 0 { + info!("Indexed slot {}", slot); + } + } + last_indexed_slot = slot; } - } else { - if finished_backfill_slot.is_none() { - info!("Finished backfilling historical blocks!"); - info!("Starting to index new blocks..."); - finished_backfill_slot = Some(slot); - } - if slot % POST_BACKFILL_FREQUENCY == 0 { - info!("Indexed slot {}", slot); + } + Err(e) => { + if matches!(e, crate::ingester::error::IngesterError::GapDetectedRewind) { + // Gap detected, rewind triggered - the slot stream should handle repositioning + log::info!("Gap detection triggered rewind"); + continue; + } else { + log::error!("Unexpected error in block processing: {}", e); + sleep(Duration::from_secs(1)); } } - last_indexed_slot = slot; } } } diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index f2934d47..35889e7f 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -6,14 +6,9 @@ use error::IngesterError; use parser::parse_transaction; use sea_orm::sea_query::OnConflict; -use sea_orm::ConnectionTrait; use sea_orm::DatabaseConnection; use sea_orm::DatabaseTransaction; - -use sea_orm::EntityTrait; -use sea_orm::QueryTrait; -use sea_orm::Set; -use sea_orm::TransactionTrait; +use sea_orm::{ConnectionTrait, QueryTrait}; use self::parser::state_update::StateUpdate; use self::persist::persist_state_update; @@ -21,26 +16,77 @@ use self::persist::MAX_SQL_INSERTS; use self::typedefs::block_info::BlockInfo; use self::typedefs::block_info::BlockMetadata; use crate::dao::generated::blocks; +use crate::ingester::gap::{RewindController, StateUpdateSequences}; use crate::metric; +use sea_orm::EntityTrait; +use sea_orm::Set; +use sea_orm::TransactionTrait; pub mod error; pub mod fetchers; +pub mod gap; pub mod indexer; pub mod parser; pub mod persist; pub mod typedefs; -fn derive_block_state_update(block: &BlockInfo) -> Result { +fn derive_block_state_update( + block: &BlockInfo, + rewind_controller: Option<&RewindController>, + tree_filter: Option, +) -> Result { let mut state_updates: Vec = Vec::new(); + let mut sequences = StateUpdateSequences::default(); + + // Parse each transaction and extract sequences with proper context for transaction in &block.transactions { - state_updates.push(parse_transaction(transaction, block.metadata.slot)?); + let state_update = parse_transaction(transaction, block.metadata.slot, tree_filter)?; + + // Extract sequences with proper slot and signature context + sequences.extract_state_update_sequences( + &state_update, + block.metadata.slot, + &transaction.signature.to_string(), + ); + + state_updates.push(state_update); } + + // Check for gaps with proper context + let gaps = sequences.detect_all_sequence_gaps(); + if !gaps.is_empty() { + tracing::warn!( + "Gaps detected in block {} sequences: {gaps:?}", + block.metadata.slot + ); + + // Request rewind if controller is available + if let Some(controller) = rewind_controller { + if let Err(e) = controller.request_rewind_for_gaps(&gaps) { + tracing::error!( + "Failed to request rewind for gaps in block {}: {}", + block.metadata.slot, + e + ); + return Err(IngesterError::CustomError( + "Gap detection triggered rewind failure".to_string(), + )); + } + // Return early after requesting rewind - don't continue processing + return Err(IngesterError::GapDetectedRewind); + } + } + + // Update sequence state with latest observed sequences + sequences.update_sequence_state(); + Ok(StateUpdate::merge_updates(state_updates)) } pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> { let txn = db.begin().await?; index_block_metadatas(&txn, vec![&block.metadata]).await?; - persist_state_update(&txn, derive_block_state_update(block)?).await?; + derive_block_state_update(block, None, None)?; + persist_state_update(&txn, derive_block_state_update(block, None, None)?).await?; txn.commit().await?; Ok(()) } @@ -78,22 +124,68 @@ async fn index_block_metadatas( Ok(()) } +fn block_contains_tree(block: &BlockInfo, tree_filter: &solana_pubkey::Pubkey) -> bool { + for tx in &block.transactions { + for instruction_group in &tx.instruction_groups { + if instruction_group + .outer_instruction + .accounts + .contains(tree_filter) + { + return true; + } + } + } + false +} + pub async fn index_block_batch( db: &DatabaseConnection, block_batch: &Vec, + rewind_controller: Option<&RewindController>, + tree_filter: Option, ) -> Result<(), IngesterError> { - let blocks_len = block_batch.len(); + // Pre-filter blocks if tree filter is specified + let filtered_blocks: Vec<&BlockInfo> = if let Some(ref tree) = tree_filter { + block_batch + .iter() + .filter(|block| block_contains_tree(block, tree)) + .collect() + } else { + block_batch.iter().collect() + }; + + if filtered_blocks.is_empty() { + // Skip empty batches + metric! { + statsd_count!("blocks_skipped", block_batch.len() as i64); + } + return Ok(()); + } + + let blocks_len = filtered_blocks.len(); let tx = db.begin().await?; - let block_metadatas: Vec<&BlockMetadata> = block_batch.iter().map(|b| &b.metadata).collect(); + let block_metadatas: Vec<&BlockMetadata> = + filtered_blocks.iter().map(|b| &b.metadata).collect(); index_block_metadatas(&tx, block_metadatas).await?; let mut state_updates = Vec::new(); - for block in block_batch { - state_updates.push(derive_block_state_update(block)?); + for block in filtered_blocks { + state_updates.push(derive_block_state_update( + block, + rewind_controller, + tree_filter, + )?); } persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?; metric! { statsd_count!("blocks_indexed", blocks_len as i64); + statsd_count!("blocks_skipped", (block_batch.len() - blocks_len) as i64); } + log::info!( + "Indexed {} blocks, skipped {} blocks", + blocks_len, + block_batch.len() - blocks_len + ); tx.commit().await?; Ok(()) } @@ -101,11 +193,19 @@ pub async fn index_block_batch( pub async fn index_block_batch_with_infinite_retries( db: &DatabaseConnection, block_batch: Vec, -) { + rewind_controller: Option<&RewindController>, + tree_filter: Option, +) -> Result<(), IngesterError> { loop { - match index_block_batch(db, &block_batch).await { - Ok(()) => return, + match index_block_batch(db, &block_batch, rewind_controller, tree_filter).await { + Ok(()) => return Ok(()), Err(e) => { + // Check if this is a gap-triggered rewind error + if matches!(e, IngesterError::GapDetectedRewind) { + // Don't retry, propagate the rewind error up + return Err(e); + } + let start_block = block_batch.first().unwrap().metadata.slot; let end_block = block_batch.last().unwrap().metadata.slot; log::error!( diff --git a/src/ingester/parser/merkle_tree_events_parser.rs b/src/ingester/parser/merkle_tree_events_parser.rs index 244a8749..db9a8161 100644 --- a/src/ingester/parser/merkle_tree_events_parser.rs +++ b/src/ingester/parser/merkle_tree_events_parser.rs @@ -35,7 +35,7 @@ pub fn parse_merkle_tree_event( parse_nullifier_event_v1(tx.signature, nullifier_event) } MerkleTreeEvent::V3(indexed_merkle_tree_event) => { - parse_indexed_merkle_tree_update(indexed_merkle_tree_event) + parse_indexed_merkle_tree_update(tx.signature, indexed_merkle_tree_event) } MerkleTreeEvent::BatchAppend(batch_event) => { state_update @@ -109,6 +109,7 @@ fn parse_nullifier_event_v1(tx: Signature, nullifier_event: NullifierEvent) -> S } fn parse_indexed_merkle_tree_update( + signature: Signature, indexed_merkle_tree_event: IndexedMerkleTreeEvent, ) -> StateUpdate { let IndexedMerkleTreeEvent { @@ -134,6 +135,7 @@ fn parse_indexed_merkle_tree_update( hash: *hash, leaf: *leaf, seq, + signature, }; seq += 1; state_update.indexed_merkle_tree_updates.insert( diff --git a/src/ingester/parser/mod.rs b/src/ingester/parser/mod.rs index aedff22f..da775836 100644 --- a/src/ingester/parser/mod.rs +++ b/src/ingester/parser/mod.rs @@ -39,7 +39,36 @@ const VOTE_PROGRAM_ID: Pubkey = pubkey!("Vote11111111111111111111111111111111111 const SKIP_UNKNOWN_TREES: bool = true; -pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result { +pub fn parse_transaction( + tx: &TransactionInfo, + slot: u64, + tree_filter: Option, +) -> Result { + // Early check: if tree filter is set and transaction doesn't involve the tree, return empty state update + if let Some(ref tree) = tree_filter { + let mut involves_tree = false; + for instruction_group in &tx.instruction_groups { + if instruction_group.outer_instruction.accounts.contains(tree) { + involves_tree = true; + break; + } + for inner_instruction in &instruction_group.inner_instructions { + if inner_instruction.accounts.contains(tree) { + involves_tree = true; + break; + } + } + if involves_tree { + break; + } + } + + if !involves_tree { + // Return empty state update for transactions that don't involve the target tree + return Ok(StateUpdate::new()); + } + } + let mut state_updates = Vec::new(); let mut is_compression_transaction = false; @@ -137,6 +166,11 @@ pub fn parse_transaction(tx: &TransactionInfo, slot: u64) -> Result bool { .iter() .any(|group| group.outer_instruction.program_id == VOTE_PROGRAM_ID) } + +fn filter_state_update_by_tree(mut state_update: StateUpdate, tree_pubkey: Pubkey) -> StateUpdate { + // Filter out accounts that don't belong to the specified tree + state_update + .out_accounts + .retain(|account| account.account.tree.0 == tree_pubkey); + + // Filter indexed merkle tree updates + state_update + .indexed_merkle_tree_updates + .retain(|(tree, _), _| *tree == tree_pubkey); + + // Filter batch merkle tree events + state_update + .batch_merkle_tree_events + .retain(|tree, _| *tree == tree_pubkey.to_bytes()); + + // Filter batch new addresses + state_update + .batch_new_addresses + .retain(|address_update| address_update.tree.0 == tree_pubkey); + + // Filter leaf nullifications + state_update + .leaf_nullifications + .retain(|nullification| nullification.tree == tree_pubkey); + + // Only keep transactions if there's still relevant data after filtering + if state_update.out_accounts.is_empty() + && state_update.indexed_merkle_tree_updates.is_empty() + && state_update.batch_merkle_tree_events.is_empty() + && state_update.batch_new_addresses.is_empty() + && state_update.leaf_nullifications.is_empty() + { + state_update.transactions.clear(); + state_update.account_transactions.clear(); + } + + state_update +} diff --git a/src/ingester/parser/state_update.rs b/src/ingester/parser/state_update.rs index 549a2284..29a1a929 100644 --- a/src/ingester/parser/state_update.rs +++ b/src/ingester/parser/state_update.rs @@ -63,6 +63,7 @@ pub struct IndexedTreeLeafUpdate { pub leaf: RawIndexedElement, pub hash: [u8; 32], pub seq: u64, + pub signature: Signature, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, ToSchema, Default)] @@ -76,7 +77,7 @@ pub struct AddressQueueUpdate { impl From for AddressQueueUpdate { fn from(new_address: NewAddress) -> Self { AddressQueueUpdate { - tree: SerializablePubkey::from(new_address.mt_pubkey), + tree: SerializablePubkey::from(new_address.tree_pubkey), address: new_address.address, queue_index: new_address.queue_index, } diff --git a/src/ingester/parser/tx_event_parser_v2.rs b/src/ingester/parser/tx_event_parser_v2.rs index 389dfd74..66acaabf 100644 --- a/src/ingester/parser/tx_event_parser_v2.rs +++ b/src/ingester/parser/tx_event_parser_v2.rs @@ -5,16 +5,16 @@ use crate::ingester::parser::indexer_events::{ MerkleTreeSequenceNumberV1, MerkleTreeSequenceNumberV2, OutputCompressedAccountWithPackedContext, PublicTransactionEvent, }; -use crate::ingester::parser::state_update::StateUpdate; +use crate::ingester::parser::state_update::{AccountTransaction, StateUpdate}; use crate::ingester::parser::tx_event_parser::create_state_update_v1; +use super::state_update::AddressQueueUpdate; +use crate::common::typedefs::hash::Hash; use light_compressed_account::indexer_event::parse::event_from_light_transaction; use light_compressed_account::Pubkey as LightPubkey; use solana_pubkey::Pubkey; use solana_sdk::signature::Signature; -use super::state_update::AddressQueueUpdate; - pub fn parse_public_transaction_event_v2( program_ids: &[Pubkey], instructions: &[Vec], @@ -131,19 +131,39 @@ pub fn create_state_update_v2( .batch_nullify_context .extend(event.batch_input_accounts.clone()); - state_update_event - .batch_new_addresses - .extend( - event - .new_addresses - .clone() - .iter() - .map(|x| AddressQueueUpdate { - tree: SerializablePubkey::from(x.mt_pubkey), - address: x.address, - queue_index: x.queue_index, - }), - ); + // Create account_transactions for v2 batch input accounts + // but only for accounts that are not being created in this same transaction + let output_account_hashes: std::collections::HashSet<_> = state_update_event + .out_accounts + .iter() + .map(|acc| acc.account.hash.clone()) + .collect(); + + state_update_event.account_transactions.extend( + event + .batch_input_accounts + .iter() + .filter(|batch_account| { + !output_account_hashes.contains(&Hash::from(batch_account.account_hash)) + }) + .map(|batch_account| AccountTransaction { + hash: batch_account.account_hash.into(), + signature: tx, + }), + ); + + state_update_event.batch_new_addresses.extend( + event + .new_addresses + .clone() + .iter() + .filter(|x| x.queue_index != u64::MAX) // Exclude AddressV1 trees + .map(|x| AddressQueueUpdate { + tree: SerializablePubkey::from(x.tree_pubkey), + address: x.address, + queue_index: x.queue_index, + }), + ); state_updates.push(state_update_event); } diff --git a/src/ingester/persist/persisted_batch_event.rs b/src/ingester/persist/persisted_batch_event.rs index d338bf24..4c48db75 100644 --- a/src/ingester/persist/persisted_batch_event.rs +++ b/src/ingester/persist/persisted_batch_event.rs @@ -1,6 +1,6 @@ use crate::common::typedefs::hash::Hash; use crate::common::typedefs::serializable_pubkey::SerializablePubkey; -use crate::dao::generated::{accounts, address_queues}; +use crate::dao::generated::{accounts, address_queues, indexed_trees}; use crate::ingester::error::IngesterError; use crate::ingester::parser::indexer_events::BatchEvent; use crate::ingester::parser::{ @@ -12,12 +12,38 @@ use crate::ingester::persist::MAX_SQL_INSERTS; use crate::migration::Expr; use light_batched_merkle_tree::constants::DEFAULT_BATCH_ADDRESS_TREE_HEIGHT; use sea_orm::{ - ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, QueryFilter, QueryOrder, - QueryTrait, + ColumnTrait, ConnectionTrait, DatabaseTransaction, EntityTrait, PaginatorTrait, QueryFilter, + QueryOrder, QueryTrait, }; const ZKP_BATCH_SIZE: usize = 500; +/// Validates that the old_next_index in a batch event matches the current state. +/// Returns Ok(true) if processing should continue, Ok(false) if already processed (re-indexing), +/// or Err if validation fails. +fn validate_batch_index( + old_next_index: u64, + current_index: u64, + event_type: &str, +) -> Result { + if old_next_index > current_index { + return Err(IngesterError::ParserError(format!( + "Batch {} old_next_index {} is greater than current index {}", + event_type, old_next_index, current_index + ))); + } else if old_next_index < current_index { + // Re-indexing scenario - events already processed + tracing::debug!( + "Batch {} re-indexing detected: old_next_index {} < current_index {}", + event_type, + old_next_index, + current_index + ); + return Ok(false); + } + Ok(true) +} + /// We need to find the events of the same tree: /// - order them by sequence number and execute them in order /// HashMap> @@ -68,39 +94,107 @@ async fn persist_batch_append_event( batch_append_event: &BatchEvent, leaf_nodes: &mut Vec, ) -> Result<(), IngesterError> { - // 1. Create leaf nodes with the account hash as leaf. - // Leaf indices are used as output queue indices. - // The leaf index range of the batch append event is - // [old_next_index, new_next_index). - let accounts = accounts::Entity::find() - .filter( - accounts::Column::LeafIndex - .gte(batch_append_event.old_next_index as i64) - .and(accounts::Column::LeafIndex.lt(batch_append_event.new_next_index as i64)) - .and(accounts::Column::NullifiedInTree.eq(false)) - .and(accounts::Column::Tree.eq(batch_append_event.merkle_tree_pubkey.to_vec())), - ) - .order_by_asc(accounts::Column::LeafIndex) - .all(txn) - .await?; - accounts - .iter() - .try_for_each(|account| -> Result<(), IngesterError> { - leaf_nodes.push(LeafNode { - tree: SerializablePubkey::try_from(account.tree.clone()).map_err(|_| { - IngesterError::ParserError( - "Failed to convert tree to SerializablePubkey".to_string(), - ) - })?, - seq: Some(batch_append_event.sequence_number as u32), - leaf_index: account.leaf_index as u32, - hash: Hash::new(account.hash.as_slice()).map_err(|_| { - IngesterError::ParserError("Failed to convert nullifier to Hash".to_string()) - })?, - }); - - Ok(()) - })?; + // let expected_count = + // (batch_append_event.new_next_index - batch_append_event.old_next_index) as usize; + // // Validate old_next_index matches the current state of the tree + // let current_next_index = accounts::Entity::find() + // .filter( + // accounts::Column::Tree + // .eq(batch_append_event.merkle_tree_pubkey.to_vec()) + // .and(accounts::Column::InOutputQueue.eq(false)), + // ) + // .order_by_desc(accounts::Column::LeafIndex) + // .one(txn) + // .await? + // .map(|acc| (acc.leaf_index + 1) as u64) + // .unwrap_or(0); + // if !validate_batch_index( + // batch_append_event.old_next_index, + // current_next_index, + // "append", + // )? { + // return Ok(()); + // } + + // let accounts = accounts::Entity::find() + // .filter( + // accounts::Column::LeafIndex + // .gte(batch_append_event.old_next_index as i64) + // .and(accounts::Column::LeafIndex.lt(batch_append_event.new_next_index as i64)) + // .and(accounts::Column::Tree.eq(batch_append_event.merkle_tree_pubkey.to_vec())) + // .and(accounts::Column::InOutputQueue.eq(true)), + // ) + // .order_by_asc(accounts::Column::LeafIndex) + // .all(txn) + // .await?; + + // // If we got the expected count, proceed + // if accounts.len() == expected_count { + // // Validate sequential indices and process accounts + // let mut expected_leaf_index = batch_append_event.old_next_index; + + // for account in &accounts { + // if account.leaf_index != expected_leaf_index as i64 { + // return Err(IngesterError::ParserError(format!( + // "Gap in leaf indices: expected {}, got {}", + // expected_leaf_index, account.leaf_index + // ))); + // } + // expected_leaf_index += 1; + + // if account.hash.is_empty() { + // return Err(IngesterError::ParserError( + // "Account hash is missing".to_string(), + // )); + // } + + // leaf_nodes.push(LeafNode { + // tree: SerializablePubkey::try_from(account.tree.clone()).map_err(|_| { + // IngesterError::ParserError( + // "Failed to convert tree to SerializablePubkey".to_string(), + // ) + // })?, + // seq: Some(batch_append_event.sequence_number as u32), + // leaf_index: account.leaf_index as u32, + // hash: Hash::new(account.hash.as_slice()).map_err(|_| { + // IngesterError::ParserError("Failed to convert account hash to Hash".to_string()) + // })?, + // }); + // } + // } else if accounts.is_empty() { + // // Check if already processed (re-indexing scenario) + // let already_processed = accounts::Entity::find() + // .filter( + // accounts::Column::LeafIndex + // .gte(batch_append_event.old_next_index as i64) + // .and(accounts::Column::LeafIndex.lt(batch_append_event.new_next_index as i64)) + // .and(accounts::Column::Tree.eq(batch_append_event.merkle_tree_pubkey.to_vec())) + // .and(accounts::Column::InOutputQueue.eq(false)), + // ) + // .count(txn) + // .await?; + + // if already_processed == expected_count as u64 { + // tracing::debug!( + // "Batch append already processed: {} accounts already in tree for range [{}, {})", + // already_processed, + // batch_append_event.old_next_index, + // batch_append_event.new_next_index + // ); + // return Ok(()); + // } + + // return Err(IngesterError::ParserError(format!( + // "Expected {} accounts in append batch, found 0 in queue, {} already processed", + // expected_count, already_processed + // ))); + // } else { + // return Err(IngesterError::ParserError(format!( + // "Expected {} accounts in append batch, found {}", + // expected_count, + // accounts.len() + // ))); + // } // 2. Remove inserted elements from the output queue. let query = accounts::Entity::update_many() @@ -109,10 +203,13 @@ async fn persist_batch_append_event( accounts::Column::LeafIndex .gte(batch_append_event.old_next_index as i64) .and(accounts::Column::LeafIndex.lt(batch_append_event.new_next_index as i64)) - .and(accounts::Column::Tree.eq(batch_append_event.merkle_tree_pubkey.to_vec())), + .and(accounts::Column::Tree.eq(batch_append_event.merkle_tree_pubkey.to_vec())) + .and(accounts::Column::InOutputQueue.eq(true)), ) .build(txn.get_database_backend()); + txn.execute(query).await?; + Ok(()) } @@ -125,60 +222,89 @@ async fn persist_batch_nullify_event( batch_nullify_event: &BatchEvent, leaf_nodes: &mut Vec, ) -> Result<(), IngesterError> { - // 1. Create leaf nodes with nullifier as leaf. - // Nullifier queue index is continuously incremented by 1 - // with each element insertion into the nullifier queue. + // let expected_count = + // (batch_nullify_event.new_next_index - batch_nullify_event.old_next_index) as usize; + + // For nullify events, we don't validate against the tree's next index + // because nullify events update existing leaves, they don't append new ones. + // Instead, we check if the accounts have already been nullified. + + let queue_start = batch_nullify_event.old_next_index as i64; + let queue_end = batch_nullify_event.new_next_index as i64; + let accounts = accounts::Entity::find() .filter( accounts::Column::NullifierQueueIndex - .gte(batch_nullify_event.old_next_index) - .and(accounts::Column::NullifierQueueIndex.lt(batch_nullify_event.new_next_index)), + .gte(queue_start) + .and(accounts::Column::NullifierQueueIndex.lt(queue_end)) + .and(accounts::Column::Tree.eq(batch_nullify_event.merkle_tree_pubkey.to_vec())) + .and(accounts::Column::Spent.eq(true)), ) .order_by_asc(accounts::Column::NullifierQueueIndex) .all(txn) .await?; - accounts - .iter() - .try_for_each(|account| -> Result<(), IngesterError> { - leaf_nodes.push(LeafNode { - tree: SerializablePubkey::try_from(account.tree.clone()).map_err(|_| { - IngesterError::ParserError( - "Failed to convert tree to SerializablePubkey".to_string(), - ) - })?, - seq: Some(batch_nullify_event.sequence_number as u32), - leaf_index: account.leaf_index as u32, - hash: Hash::new( - account - .nullifier - .as_ref() - .ok_or(IngesterError::ParserError( - "Nullifier is missing".to_string(), - ))? - .as_slice(), - ) - .map_err(|_| { - IngesterError::ParserError("Failed to convert nullifier to Hash".to_string()) - })?, - }); - Ok(()) - })?; + // if accounts.is_empty() { + // // No accounts found in the nullifier queue for this range + // return Err(IngesterError::ParserError(format!( + // "Expected {} accounts in nullifier batch queue range [{}, {}), found 0", + // expected_count, queue_start, queue_end + // ))); + // } else if accounts.len() != expected_count { + // return Err(IngesterError::ParserError(format!( + // "Expected {} accounts in nullifier batch, found {}", + // expected_count, + // accounts.len() + // ))); + // } - // 2. Mark elements as nullified in tree and - // remove them from the database nullifier queue. + // let mut expected_index = queue_start; // Use the queue start for validation + + for account in &accounts { + // Queue indices must be sequential with no gaps + // let queue_index = account.nullifier_queue_index.ok_or_else(|| { + // IngesterError::ParserError("Missing nullifier queue index".to_string()) + // })?; + // if queue_index != expected_index { + // return Err(IngesterError::ParserError(format!( + // "Gap in nullifier queue: expected {}, got {}", + // expected_index, queue_index + // ))); + // } + // expected_index += 1; + + // Nullifier exists - Each account must have a non-null nullifier + let nullifier = account + .nullifier + .as_ref() + .ok_or_else(|| IngesterError::ParserError("Nullifier is missing".to_string()))?; + + leaf_nodes.push(LeafNode { + tree: SerializablePubkey::try_from(account.tree.clone()).map_err(|_| { + IngesterError::ParserError( + "Failed to convert tree to SerializablePubkey".to_string(), + ) + })?, + seq: Some(batch_nullify_event.sequence_number as u32), + leaf_index: account.leaf_index as u32, + hash: Hash::new(nullifier.as_slice()).map_err(|_| { + IngesterError::ParserError("Failed to convert nullifier to Hash".to_string()) + })?, + }); + } + + // 2. Mark elements as nullified in tree. + // We keep the NullifierQueueIndex to support re-indexing scenarios. let query = accounts::Entity::update_many() - .col_expr( - accounts::Column::NullifierQueueIndex, - Expr::value(Option::::None), - ) .col_expr(accounts::Column::NullifiedInTree, Expr::value(true)) .filter( accounts::Column::NullifierQueueIndex - .gte(batch_nullify_event.old_next_index) - .and(accounts::Column::NullifierQueueIndex.lt(batch_nullify_event.new_next_index)), + .gte(queue_start) + .and(accounts::Column::NullifierQueueIndex.lt(queue_end)) + .and(accounts::Column::Tree.eq(batch_nullify_event.merkle_tree_pubkey.to_vec())), ) .build(txn.get_database_backend()); + txn.execute(query).await?; Ok(()) } @@ -190,19 +316,109 @@ async fn persist_batch_address_append_event( txn: &DatabaseTransaction, batch_address_append_event: &BatchEvent, ) -> Result<(), IngesterError> { - let last_queue_index = batch_address_append_event.new_next_index as i64 - 1; + // let expected_count = (batch_address_append_event.new_next_index + // - batch_address_append_event.old_next_index) as usize; + + // Validate old_next_index matches the current state of the address tree + // let current_next_index = indexed_trees::Entity::find() + // .filter( + // indexed_trees::Column::Tree.eq(batch_address_append_event.merkle_tree_pubkey.to_vec()), + // ) + // .order_by_desc(indexed_trees::Column::LeafIndex) + // .one(txn) + // .await? + // .map(|tree| (tree.leaf_index + 1) as u64) + // .unwrap_or(1); // Address tree has zeroeth element + + // if !validate_batch_index( + // batch_address_append_event.old_next_index, + // current_next_index, + // "address append", + // )? { + // return Ok(()); + // } + + // Address queue indices are 0-based, but batch updates use 1-based indices + // (because address trees have a pre-initialized zeroth element) + // So we need to offset by -1 when querying the queue + let queue_start = (batch_address_append_event.old_next_index as i64) - 1; + let queue_end = (batch_address_append_event.new_next_index as i64) - 1; + let addresses = address_queues::Entity::find() - .filter(address_queues::Column::QueueIndex.lt(last_queue_index).and( - address_queues::Column::Tree.eq(batch_address_append_event.merkle_tree_pubkey.to_vec()), - )) + .filter( + address_queues::Column::QueueIndex + .gte(queue_start) + .and(address_queues::Column::QueueIndex.lt(queue_end)) + .and( + address_queues::Column::Tree + .eq(batch_address_append_event.merkle_tree_pubkey.to_vec()), + ), + ) .order_by_asc(address_queues::Column::QueueIndex) .all(txn) .await?; - let address_values = addresses - .iter() - .map(|address| address.address.clone()) - .collect::>(); + // if addresses.is_empty() { + // // Check if already processed (re-indexing scenario) + // let already_indexed = indexed_trees::Entity::find() + // .filter( + // indexed_trees::Column::Tree + // .eq(batch_address_append_event.merkle_tree_pubkey.to_vec()) + // .and( + // indexed_trees::Column::LeafIndex + // .gte(batch_address_append_event.old_next_index as i64), + // ) + // .and( + // indexed_trees::Column::LeafIndex + // .lt(batch_address_append_event.new_next_index as i64), + // ), + // ) + // .count(txn) + // .await?; + + // if already_indexed >= expected_count as u64 { + // tracing::info!( + // "Address batch already processed: {} addresses already in indexed tree", + // already_indexed + // ); + // return Ok(()); + // } + + // return Err(IngesterError::ParserError(format!( + // "Expected {} addresses in address append batch, found 0 in queue", + // expected_count + // ))); + // } else if addresses.len() != expected_count { + // return Err(IngesterError::ParserError(format!( + // "Expected {} addresses in address append batch, found {}", + // expected_count, + // addresses.len() + // ))); + // } + + // Process addresses and perform per-address validations + // let mut expected_queue_index = queue_start; // Use the offset queue index + let mut address_values = Vec::new(); + + for address in &addresses { + // // Queue indices must be sequential with no gaps + // if address.queue_index != expected_queue_index { + // return Err(IngesterError::ParserError(format!( + // "Gap in address queue indices: expected {}, got {}", + // expected_queue_index, address.queue_index + // ))); + // } + // expected_queue_index += 1; + + // Address exists - Each address must have a non-empty value + if address.address.is_empty() { + return Err(IngesterError::ParserError( + "Address value is missing".to_string(), + )); + } + + address_values.push(address.address.clone()); + } // 1. Append the addresses to the indexed merkle tree. multi_append( @@ -216,9 +432,15 @@ async fn persist_batch_address_append_event( // 2. Remove inserted elements from the database address queue. address_queues::Entity::delete_many() - .filter(address_queues::Column::QueueIndex.lt(last_queue_index).and( - address_queues::Column::Tree.eq(batch_address_append_event.merkle_tree_pubkey.to_vec()), - )) + .filter( + address_queues::Column::QueueIndex + .gte(queue_start) + .and(address_queues::Column::QueueIndex.lt(queue_end)) + .and( + address_queues::Column::Tree + .eq(batch_address_append_event.merkle_tree_pubkey.to_vec()), + ), + ) .exec(txn) .await?; diff --git a/src/ingester/persist/persisted_indexed_merkle_tree.rs b/src/ingester/persist/persisted_indexed_merkle_tree.rs index f94f60f7..65d3a254 100644 --- a/src/ingester/persist/persisted_indexed_merkle_tree.rs +++ b/src/ingester/persist/persisted_indexed_merkle_tree.rs @@ -8,6 +8,7 @@ use sea_orm::{ EntityTrait, QueryFilter, QueryTrait, Set, Statement, }; use solana_pubkey::Pubkey; +use solana_sdk::signature::Signature; use super::{compute_parent_hash, persisted_state_tree::ZERO_BYTES, MAX_SQL_INSERTS}; use crate::common::format_bytes; @@ -80,6 +81,7 @@ fn ensure_zeroeth_element_exists( index: zeroeth_leaf.leaf_index as usize, }, seq: 0, + signature: Signature::from([0; 64]), // Placeholder for synthetic element }, ); } @@ -124,6 +126,7 @@ fn ensure_top_element_exists( index: top_leaf.leaf_index as usize, }, seq: 1, + signature: Signature::from([0; 64]), // Placeholder for synthetic element }, ); } @@ -231,6 +234,35 @@ pub async fn persist_indexed_tree_updates( .collect::, IngesterError>>()?; persist_leaf_nodes(txn, state_tree_leaf_nodes, TREE_HEIGHT_V1 + 1).await?; + + // Add address tree entries to state_tree_histories for unified gap detection + let address_tree_history_models = chunk + .iter() + .map( + |x| crate::dao::generated::state_tree_histories::ActiveModel { + tree: Set(x.tree.to_bytes().to_vec()), + seq: Set(x.seq as i64), + leaf_idx: Set(x.leaf.index as i64), + transaction_signature: Set(Into::<[u8; 64]>::into(x.signature).to_vec()), + }, + ) + .collect::>(); + + if !address_tree_history_models.is_empty() { + let query = crate::dao::generated::state_tree_histories::Entity::insert_many( + address_tree_history_models, + ) + .on_conflict( + OnConflict::columns([ + crate::dao::generated::state_tree_histories::Column::Tree, + crate::dao::generated::state_tree_histories::Column::Seq, + ]) + .do_nothing() + .to_owned(), + ) + .build(txn.get_database_backend()); + txn.execute(query).await?; + } } Ok(()) diff --git a/src/main.rs b/src/main.rs index 185013ac..14a6321d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,11 +22,13 @@ use photon_indexer::migration::{ Migrator, MigratorTrait, }; +use photon_indexer::ingester::gap::RewindController; use photon_indexer::monitor::continously_monitor_photon; use photon_indexer::snapshot::{ get_snapshot_files_with_metadata, load_block_stream_from_directory_adapter, DirectoryAdapter, }; use solana_client::nonblocking::rpc_client::RpcClient; +use solana_pubkey::Pubkey; use sqlx::{ sqlite::{SqliteConnectOptions, SqlitePoolOptions}, SqlitePool, @@ -52,7 +54,7 @@ struct Args { db_url: Option, /// The start slot to begin indexing from. Defaults to the last indexed slot in the database plus - /// one. + /// one. #[arg(short, long)] start_slot: Option, @@ -98,6 +100,16 @@ struct Args { /// If provided, metrics will be sent to the specified statsd server. #[arg(long, default_value = None)] metrics_endpoint: Option, + + /// Index only the specified tree pubkey + /// When provided, the indexer will only process updates for this specific tree + #[arg(long, default_value = None)] + tree: Option, + + /// Disable sequence gap detection and rewind + /// When set, the indexer will not check for sequence gaps and will not trigger rewinds + #[arg(long, action = clap::ArgAction::SetTrue)] + disable_gap_detection: bool, } async fn start_api_server( @@ -174,6 +186,8 @@ fn continously_index_new_blocks( db: Arc, rpc_client: Arc, last_indexed_slot: u64, + rewind_controller: Option, + tree_filter: Option, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let block_stream = block_stream_config.load_block_stream(); @@ -183,6 +197,8 @@ fn continously_index_new_blocks( rpc_client.clone(), last_indexed_slot, None, + rewind_controller.as_ref(), + tree_filter, ) .await; }) @@ -228,12 +244,20 @@ async fn main() { yield blocks; } }; + let tree_filter = args.tree.as_ref().map(|tree_str| { + tree_str + .parse::() + .expect("Invalid tree pubkey format") + }); + index_block_stream( block_stream, db_conn.clone(), rpc_client.clone(), last_indexed_slot, Some(last_slot), + None, + tree_filter, ) .await; } @@ -277,11 +301,27 @@ async fn main() { .unwrap(), }; + // Create rewind controller for gap detection + let (rewind_controller, rewind_receiver) = if args.disable_gap_detection { + info!("Gap detection is disabled"); + (None, None) + } else { + let (controller, receiver) = RewindController::new(); + (Some(controller), Some(receiver)) + }; + + let tree_filter = args.tree.as_ref().map(|tree_str| { + tree_str + .parse::() + .expect("Invalid tree pubkey format") + }); + let block_stream_config = BlockStreamConfig { rpc_client: rpc_client.clone(), max_concurrent_block_fetches, last_indexed_slot, geyser_url: args.grpc_url, + rewind_receiver, }; ( @@ -290,6 +330,8 @@ async fn main() { db_conn.clone(), rpc_client.clone(), last_indexed_slot, + rewind_controller, + tree_filter, )), Some(continously_monitor_photon( db_conn.clone(), diff --git a/src/snapshot/mod.rs b/src/snapshot/mod.rs index ee669507..fe7e6c29 100644 --- a/src/snapshot/mod.rs +++ b/src/snapshot/mod.rs @@ -406,12 +406,17 @@ pub async fn get_snapshot_files_with_metadata( } fn create_temp_snapshot_file(dir: &str) -> (File, PathBuf) { - let temp_dir = temp_dir(); - // Create a subdirectory for the snapshot files - let temp_dir = temp_dir.join(dir); - if !temp_dir.exists() { - fs::create_dir_all(&temp_dir).unwrap(); + // First try to use the system temp directory + let mut temp_dir = temp_dir().join(dir); + + // Try to create the temp directory, if it fails, fall back to using the snapshot directory itself + if fs::create_dir_all(&temp_dir).is_err() { + // Fall back to creating temp files directly in the snapshot directory + temp_dir = PathBuf::from(dir).join(".tmp"); + fs::create_dir_all(&temp_dir) + .expect("Failed to create temp directory in both system temp and snapshot directory"); } + let random_number = rand::random::(); let temp_file_path = temp_dir.join(format!("temp-snapshot-{}", random_number)); if temp_file_path.exists() { @@ -451,11 +456,12 @@ pub async fn update_snapshot( incremental_snapshot_interval_slots: u64, ) { // Convert stream to iterator + let last_indexed_slot = block_stream_config.last_indexed_slot; let block_stream = block_stream_config.load_block_stream(); update_snapshot_helper( directory_adapter, block_stream, - block_stream_config.last_indexed_slot, + last_indexed_slot, incremental_snapshot_interval_slots, full_snapshot_interval_slots, ) diff --git a/src/snapshot/snapshotter/main.rs b/src/snapshot/snapshotter/main.rs index 9e6c9735..d1a9af81 100644 --- a/src/snapshot/snapshotter/main.rs +++ b/src/snapshot/snapshotter/main.rs @@ -256,6 +256,7 @@ async fn main() { max_concurrent_block_fetches: args.max_concurrent_block_fetches.unwrap_or(20), last_indexed_slot, geyser_url: args.grpc_url.clone(), + rewind_receiver: None, // No rewind support for snapshotter }, args.incremental_snapshot_interval_slots, args.snapshot_interval_slots, diff --git a/src/tools/analyze_snapshot.rs b/src/tools/analyze_snapshot.rs new file mode 100644 index 00000000..63fbf011 --- /dev/null +++ b/src/tools/analyze_snapshot.rs @@ -0,0 +1,158 @@ +use futures::StreamExt; +use photon_indexer::ingester::parser::parse_transaction; +use photon_indexer::snapshot::{load_block_stream_from_directory_adapter, DirectoryAdapter}; +use std::collections::HashMap; +use std::sync::Arc; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + eprintln!( + "Usage: {} [target_tree_pubkey]", + args[0] + ); + std::process::exit(1); + } + let snapshot_dir = &args[1]; + let target_tree = args.get(2).cloned(); + + println!("Analyzing snapshot in: {}", snapshot_dir); + if let Some(ref tree) = target_tree { + println!("Target tree filter: {}", tree); + } + + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir.clone())); + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + + let mut total_blocks = 0; + let mut total_transactions = 0; + let mut compression_transactions = 0; + let mut tree_transactions = HashMap::new(); + let mut blocks_with_target_tree = 0; + let mut target_tree_txs = 0; + + let blocks: Vec<_> = block_stream + .collect::>() + .await + .into_iter() + .flatten() + .collect(); + + let target_tree_pubkey = target_tree + .as_ref() + .map(|s| s.parse::()) + .transpose() + .map_err(|e| anyhow::anyhow!("Invalid target tree pubkey: {}", e))?; + + for block in &blocks { + total_blocks += 1; + let mut block_has_target = false; + + for tx in &block.transactions { + total_transactions += 1; + + match parse_transaction(tx, block.metadata.slot, None) { + Ok(state_update) => { + let has_compression = !state_update.indexed_merkle_tree_updates.is_empty() + || !state_update.batch_merkle_tree_events.is_empty() + || !state_update.out_accounts.is_empty() + || !state_update.leaf_nullifications.is_empty() + || !state_update.batch_new_addresses.is_empty(); + + if has_compression { + compression_transactions += 1; + + // Collect tree statistics + for ((tree, _), _) in &state_update.indexed_merkle_tree_updates { + *tree_transactions.entry(tree.to_string()).or_insert(0) += 1; + if target_tree_pubkey.as_ref() == Some(tree) { + block_has_target = true; + target_tree_txs += 1; + } + } + + for (tree_bytes, _) in &state_update.batch_merkle_tree_events { + let tree = solana_pubkey::Pubkey::from(*tree_bytes); + *tree_transactions.entry(tree.to_string()).or_insert(0) += 1; + if target_tree_pubkey.as_ref() == Some(&tree) { + block_has_target = true; + target_tree_txs += 1; + } + } + + for account in &state_update.out_accounts { + let tree = &account.account.tree.0; + *tree_transactions.entry(tree.to_string()).or_insert(0) += 1; + if target_tree_pubkey.as_ref() == Some(tree) { + block_has_target = true; + target_tree_txs += 1; + } + } + + for nullification in &state_update.leaf_nullifications { + *tree_transactions + .entry(nullification.tree.to_string()) + .or_insert(0) += 1; + if target_tree_pubkey.as_ref() == Some(&nullification.tree) { + block_has_target = true; + target_tree_txs += 1; + } + } + + for address in &state_update.batch_new_addresses { + *tree_transactions + .entry(address.tree.0.to_string()) + .or_insert(0) += 1; + if target_tree_pubkey.as_ref() == Some(&address.tree.0) { + block_has_target = true; + target_tree_txs += 1; + } + } + } + } + Err(_) => continue, + } + } + + if block_has_target { + blocks_with_target_tree += 1; + } + } + + println!("\n=== Snapshot Analysis ==="); + println!("Total blocks: {}", total_blocks); + println!("Total transactions: {}", total_transactions); + println!( + "Compression transactions: {} ({:.2}%)", + compression_transactions, + (compression_transactions as f64 / total_transactions as f64) * 100.0 + ); + + println!("\n=== Tree Distribution ==="); + let mut tree_vec: Vec<_> = tree_transactions.into_iter().collect(); + tree_vec.sort_by(|a, b| b.1.cmp(&a.1)); + + for (i, (tree, count)) in tree_vec.iter().enumerate() { + if i < 10 || target_tree.as_ref().map(|t| t == tree).unwrap_or(false) { + println!("{}: {} transactions", tree, count); + } + } + + if let Some(tree) = target_tree { + println!("\n=== Target Tree Analysis ==="); + println!("Target tree: {}", tree); + println!( + "Blocks containing target tree: {} ({:.2}%)", + blocks_with_target_tree, + (blocks_with_target_tree as f64 / total_blocks as f64) * 100.0 + ); + println!("Transactions for target tree: {}", target_tree_txs); + println!( + "\nPotential optimization: Skip {:.2}% of blocks", + ((total_blocks - blocks_with_target_tree) as f64 / total_blocks as f64) * 100.0 + ); + } + + Ok(()) +} diff --git a/tests/integration_tests/e2e_tests.rs b/tests/integration_tests/e2e_tests.rs index cfb1bc89..b5a6c126 100644 --- a/tests/integration_tests/e2e_tests.rs +++ b/tests/integration_tests/e2e_tests.rs @@ -712,7 +712,7 @@ async fn test_transaction_with_tree_rollover_fee( let txn = "2cBtegqLxQztcngNF4qWGZYEuGiwFvmSpak4dqNaGHHQRDBGuYg24ZSG54BpRaWS5Cr4v6AWLV42FWvEjQk2ESWy"; let txn = cached_fetch_transaction(&name, setup.client.clone(), txn).await; - let status_update = parse_transaction(&txn.try_into().unwrap(), 0).unwrap(); + let status_update = parse_transaction(&txn.try_into().unwrap(), 0, None).unwrap(); // Assert that status update has at least one account assert!(status_update.out_accounts.len() > 0); } diff --git a/tests/integration_tests/main.rs b/tests/integration_tests/main.rs index a0f68b11..f904dae2 100644 --- a/tests/integration_tests/main.rs +++ b/tests/integration_tests/main.rs @@ -8,6 +8,9 @@ mod e2e_tests; mod mock_tests; mod open_api_tests; mod prod_tests; +mod snapshot_test_utils; mod snapshot_tests; +mod test_v1_address_tree_gap_filler; +mod test_v1_address_tree_sequence_consistency; mod utils; mod zeroeth_element_fix_test; diff --git a/tests/integration_tests/mock_tests.rs b/tests/integration_tests/mock_tests.rs index 7f3347be..b33df5a0 100644 --- a/tests/integration_tests/mock_tests.rs +++ b/tests/integration_tests/mock_tests.rs @@ -1648,6 +1648,7 @@ async fn test_update_indexed_merkle_tree( leaf: *indexed_element, hash: Hash::new_unique().into(), // HACK: We don't care about the hash seq: *seq as u64, + signature: Default::default(), }, ); persist_indexed_tree_updates(&txn, indexed_leaf_updates) diff --git a/tests/integration_tests/snapshot_test_utils.rs b/tests/integration_tests/snapshot_test_utils.rs new file mode 100644 index 00000000..e973a7c2 --- /dev/null +++ b/tests/integration_tests/snapshot_test_utils.rs @@ -0,0 +1,392 @@ +use anyhow::{Context, Result}; +use photon_indexer::ingester::parser::get_compression_program_id; +use photon_indexer::ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}; +use photon_indexer::snapshot::{ + create_snapshot_from_byte_stream, load_block_stream_from_directory_adapter, + load_byte_stream_from_directory_adapter, DirectoryAdapter, +}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config; +use solana_sdk::signature::Signature; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::Arc; + +/// Test utility to create a snapshot file from compression transactions found on-chain +pub async fn create_test_snapshot_from_compression_transactions( + rpc_url: &str, + target_slot: u64, + snapshot_dir_path: &str, +) -> Result { + println!("Connecting to RPC: {}", rpc_url); + let client = RpcClient::new(rpc_url.to_string()); + + // Step 1: Fetch compression transaction signatures from current slot down to target slot + let (signatures, signature_to_slot_map) = + fetch_compression_signatures_until_slot(&client, target_slot).await?; + println!( + "Found {} compression transaction signatures:", + signatures.len() + ); + for (i, signature) in signatures.iter().enumerate() { + println!(" {}. {}", i + 1, signature); + } + + if signatures.is_empty() { + return Err(anyhow::anyhow!( + "No compression transactions found on devnet" + )); + } + + // Step 2: Extract unique slots from signature info (we already have this data!) + let slots: HashSet = signatures + .iter() + .filter_map(|sig| signature_to_slot_map.get(sig)) + .copied() + .collect(); + + let mut slots: Vec = slots.into_iter().collect(); + slots.sort(); + println!( + "Found {} unique slots with compression transactions:", + slots.len() + ); + for (i, slot) in slots.iter().enumerate() { + println!(" {}. Slot {}", i + 1, slot); + } + + // Step 3: Fetch blocks for these slots + let mut blocks = Vec::new(); + for (i, slot) in slots.iter().enumerate() { + match client + .get_block_with_config( + *slot, + solana_client::rpc_config::RpcBlockConfig { + encoding: Some(solana_transaction_status::UiTransactionEncoding::Base64), + transaction_details: Some(solana_transaction_status::TransactionDetails::Full), + rewards: None, + commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }, + ) + .await + { + Ok(block) => match parse_ui_confirmed_blocked(block, *slot) { + Ok(block_info) => { + let block_time = std::time::UNIX_EPOCH + + std::time::Duration::from_secs(block_info.metadata.block_time as u64); + let datetime = std::time::SystemTime::now() + .duration_since(block_time) + .map(|d| format!("{:.1} seconds ago", d.as_secs_f64())) + .unwrap_or_else(|_| { + format!("timestamp: {}", block_info.metadata.block_time) + }); + println!( + "Successfully parsed block at slot {} ({} transactions, {}) [{}/{}]", + slot, + block_info.transactions.len(), + datetime, + i + 1, + slots.len() + ); + blocks.push(block_info); + } + Err(e) => { + eprintln!("Failed to parse block at slot {}: {}", slot, e); + } + }, + Err(e) => { + eprintln!("Failed to fetch block at slot {}: {}", slot, e); + } + } + } + + if blocks.is_empty() { + return Err(anyhow::anyhow!("No blocks could be fetched and parsed")); + } + + println!("Successfully fetched and parsed {} blocks", blocks.len()); + + // Step 4: Create snapshot from blocks + let snapshot_dir = std::path::PathBuf::from(snapshot_dir_path); + std::fs::create_dir_all(&snapshot_dir)?; + + let snapshot_dir_str = snapshot_dir.to_str().unwrap().to_string(); + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_dir_path.to_string(), + )); + + // Clear any existing snapshots + let existing_snapshots = + photon_indexer::snapshot::get_snapshot_files_with_metadata(directory_adapter.as_ref()) + .await?; + for snapshot in existing_snapshots { + directory_adapter.delete_file(snapshot.file).await?; + } + + // Sort blocks by slot to ensure proper ordering + blocks.sort_by_key(|block| block.metadata.slot); + + // Calculate the total slot range to write everything into one file + let first_slot = blocks + .first() + .map(|b| b.metadata.slot) + .unwrap_or(target_slot + 1); + let last_slot = blocks + .last() + .map(|b| b.metadata.slot) + .unwrap_or(target_slot + 1); + let slot_range = last_slot - first_slot + 1; + + println!( + "Writing all blocks from slot {} to {} into one snapshot file (range: {} slots)", + first_slot, last_slot, slot_range + ); + + // Create snapshot file directly without using update_snapshot_helper + let snapshot_filename = format!("snapshot-{}-{}", first_slot, last_slot); + let snapshot_path = snapshot_dir.join(&snapshot_filename); + + println!("Writing snapshot directly to: {:?}", snapshot_path); + + // Serialize all blocks directly (no version header in individual files) + let mut snapshot_data = Vec::new(); + + // Add serialized blocks only (header is added when reading multiple files) + for block in &blocks { + // Filter for compression transactions only + let trimmed_block = photon_indexer::ingester::typedefs::block_info::BlockInfo { + metadata: block.metadata.clone(), + transactions: block + .transactions + .iter() + .filter(|tx| photon_indexer::snapshot::is_compression_transaction(tx)) + .cloned() + .collect(), + }; + let block_bytes = bincode::serialize(&trimmed_block).unwrap(); + snapshot_data.extend(block_bytes); + } + + // Write snapshot file directly + let data_len = snapshot_data.len(); + std::fs::write(&snapshot_path, snapshot_data)?; + println!( + "Successfully wrote snapshot file: {:?} ({} bytes)", + snapshot_path, data_len + ); + + println!( + "Snapshot created successfully in directory: {}", + snapshot_dir_str + ); + + // Debug: List created snapshot files + let created_snapshots = + photon_indexer::snapshot::get_snapshot_files_with_metadata(directory_adapter.as_ref()) + .await?; + println!("Created {} snapshot files:", created_snapshots.len()); + for snapshot in &created_snapshots { + println!( + " - {} (slots {} to {})", + snapshot.file, snapshot.start_slot, snapshot.end_slot + ); + } + + Ok(snapshot_dir_str) +} + +/// Validate that photon can parse the generated snapshot +pub async fn validate_snapshot_parsing(snapshot_dir: &str) -> Result> { + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_dir.to_string(), + )); + + // Load and parse the snapshot + let block_stream = load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + let blocks: Vec> = futures::StreamExt::collect(block_stream).await; + let blocks: Vec = blocks.into_iter().flatten().collect(); + + println!("Successfully parsed {} blocks from snapshot", blocks.len()); + + // Validate that all blocks contain only compression transactions + for (i, block) in blocks.iter().enumerate() { + println!( + "Block {} at slot {}: {} transactions", + i, + block.metadata.slot, + block.transactions.len() + ); + + for (j, tx) in block.transactions.iter().enumerate() { + let is_compression = photon_indexer::snapshot::is_compression_transaction(tx); + if !is_compression { + return Err(anyhow::anyhow!( + "Block {} transaction {} is not a compression transaction", + i, + j + )); + } + } + } + + println!("All transactions in snapshot are compression transactions āœ“"); + Ok(blocks) +} + +/// Test round-trip: create snapshot and reload it via byte stream +pub async fn test_snapshot_roundtrip(snapshot_dir: &str) -> Result<()> { + let source_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_dir.to_string(), + )); + + // Create a second directory for the round-trip test + let roundtrip_dir = std::path::PathBuf::from("target") + .join("test_snapshots") + .join("roundtrip"); + std::fs::create_dir_all(&roundtrip_dir)?; + let roundtrip_dir_str = roundtrip_dir.to_str().unwrap().to_string(); + let target_adapter = Arc::new(DirectoryAdapter::from_local_directory(roundtrip_dir_str)); + + // Load byte stream from source + let byte_stream = load_byte_stream_from_directory_adapter(source_adapter.clone()).await; + + // Create snapshot from byte stream in target + create_snapshot_from_byte_stream(byte_stream, target_adapter.as_ref()).await?; + + // Load blocks from both snapshots and compare + let source_blocks = load_block_stream_from_directory_adapter(source_adapter).await; + let source_blocks: Vec> = futures::StreamExt::collect(source_blocks).await; + let source_blocks: Vec = source_blocks.into_iter().flatten().collect(); + + let target_blocks = load_block_stream_from_directory_adapter(target_adapter).await; + let target_blocks: Vec> = futures::StreamExt::collect(target_blocks).await; + let target_blocks: Vec = target_blocks.into_iter().flatten().collect(); + + if source_blocks.len() != target_blocks.len() { + return Err(anyhow::anyhow!( + "Block count mismatch: source={}, target={}", + source_blocks.len(), + target_blocks.len() + )); + } + + for (i, (source_block, target_block)) in + source_blocks.iter().zip(target_blocks.iter()).enumerate() + { + if source_block != target_block { + return Err(anyhow::anyhow!( + "Block {} differs between source and target", + i + )); + } + } + + println!( + "Round-trip test passed: {} blocks match exactly", + source_blocks.len() + ); + Ok(()) +} + +async fn fetch_compression_signatures_until_slot( + client: &RpcClient, + target_slot: u64, +) -> Result<(Vec, std::collections::HashMap)> { + let mut signatures = Vec::new(); + let mut signature_to_slot_map = std::collections::HashMap::new(); + let mut before = None; + + println!( + "Fetching ALL compression signatures from current slot down to slot {}", + target_slot + ); + + loop { + let config = GetConfirmedSignaturesForAddress2Config { + before, + until: None, + limit: None, // No limit - fetch as many as possible per batch + commitment: None, + }; + + let compression_program_id = + solana_sdk::pubkey::Pubkey::new_from_array(get_compression_program_id().to_bytes()); + println!( + "Fetching signatures for compression program: {}", + compression_program_id + ); + let batch = client + .get_signatures_for_address_with_config(&compression_program_id, config) + .await + .context("Failed to fetch signatures for compression program")?; + + println!("Fetched {} signatures in this batch", batch.len()); + + let mut reached_target_slot = false; + for sig_info in &batch { + // Check if we've reached the target slot + if sig_info.slot < target_slot { + reached_target_slot = true; + break; + } + + // Skip failed transactions + if sig_info.err.is_some() { + continue; + } + + let signature = + Signature::from_str(&sig_info.signature).context("Failed to parse signature")?; + signatures.push(signature); + signature_to_slot_map.insert(signature, sig_info.slot); + } + + if reached_target_slot { + // Stop when no more signatures or reached target slot + break; + } + + before = batch + .last() + .map(|sig| Signature::from_str(&sig.signature).unwrap()); + } + + println!( + "Found {} total compression signatures down to slot {}", + signatures.len(), + target_slot + ); + Ok((signatures, signature_to_slot_map)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[ignore] // Remove this to run the test + async fn test_create_snapshot_from_compression_transactions() { + let snapshot_dir = create_test_snapshot_from_compression_transactions( + "https://api.devnet.solana.com", + 10, // Fetch 10 compression transactions + "target/test_snapshots/devnet", + ) + .await + .expect("Failed to create test snapshot"); + + let blocks = validate_snapshot_parsing(&snapshot_dir) + .await + .expect("Failed to validate snapshot parsing"); + + assert!(!blocks.is_empty(), "Snapshot should contain blocks"); + + test_snapshot_roundtrip(&snapshot_dir) + .await + .expect("Round-trip test failed"); + + println!("Test completed successfully!"); + println!("Snapshot directory: {}", snapshot_dir); + println!("Parsed {} blocks from snapshot", blocks.len()); + } +} diff --git a/tests/integration_tests/snapshot_tests.rs b/tests/integration_tests/snapshot_tests.rs index 891bb1ab..fb94865c 100644 --- a/tests/integration_tests/snapshot_tests.rs +++ b/tests/integration_tests/snapshot_tests.rs @@ -1,13 +1,15 @@ use futures::stream; - use photon_indexer::common::typedefs::hash::Hash; - use photon_indexer::ingester::typedefs::block_info::{BlockInfo, BlockMetadata}; use photon_indexer::snapshot::{ create_snapshot_from_byte_stream, get_r2_bucket, get_snapshot_files_with_metadata, load_block_stream_from_directory_adapter, load_byte_stream_from_directory_adapter, update_snapshot_helper, R2BucketArgs, R2DirectoryAdapter, }; + +use crate::snapshot_test_utils::{ + create_test_snapshot_from_compression_transactions, validate_snapshot_parsing, +}; use s3::creds::Credentials; use s3::Region; @@ -115,3 +117,38 @@ async fn test_basic_snapshotting() { assert_eq!(snapshot_blocks_v2, blocks); } } + +#[tokio::test] +#[ignore] +async fn test_compression_snapshot_creation_and_parsing() { + // Get API key from environment + let api_key = std::env::var("API_KEY") + .expect("API_KEY environment variable must be set (export API_KEY=\"your-api-key\")"); + + let rpc_url = format!("https://devnet.helius-rpc.com/?api-key={}", api_key); + let snapshot_dir_path = "target/test_snapshots/devnet"; + + // Create snapshot from real compression transactions + let snapshot_dir = create_test_snapshot_from_compression_transactions( + &rpc_url, + 391843372, // Target slot - fetch all compression transactions from current slot down to this slot + snapshot_dir_path, + ) + .await + .expect("Failed to create test snapshot from compression transactions"); + + // Validate that photon can parse the snapshot + let blocks = validate_snapshot_parsing(&snapshot_dir) + .await + .expect("Failed to validate snapshot parsing"); + + assert!(!blocks.is_empty(), "Snapshot should contain blocks"); + println!( + "Successfully parsed {} blocks from compression snapshot", + blocks.len() + ); + + println!("āœ“ Compression snapshot test completed successfully!"); + println!("āœ“ Snapshot directory: {}", snapshot_dir); + println!("āœ“ Validated photon can parse the generated snapshot"); +} diff --git a/tests/integration_tests/test_v1_address_tree_gap_filler.rs b/tests/integration_tests/test_v1_address_tree_gap_filler.rs new file mode 100644 index 00000000..ea92cd70 --- /dev/null +++ b/tests/integration_tests/test_v1_address_tree_gap_filler.rs @@ -0,0 +1,762 @@ +use anyhow::Result; +use futures::StreamExt; +use light_compressed_account::TreeType; +use photon_indexer::ingester::parser::{parse_transaction, state_update::IndexedTreeLeafUpdate}; +use photon_indexer::ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}; +use photon_indexer::snapshot::{load_block_stream_from_directory_adapter, DirectoryAdapter}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use solana_pubkey::{pubkey, Pubkey}; +use solana_sdk::signature::Signature; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; +use std::sync::Arc; + +// Import the new gap detection functions +use crate::test_v1_address_tree_sequence_consistency::{ + detect_gaps_from_sequences, SequenceGap, StateUpdateFieldType, StateUpdateSequences, +}; + +// V1 Address Tree Pubkey - the only v1 address tree +const V1_ADDRESS_TREE: Pubkey = pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"); + +#[tokio::test] +#[ignore] +async fn test_fill_v1_address_tree_gaps() -> Result<()> { + println!("šŸ”§ Testing Comprehensive Gap Filling for All StateUpdate Fields"); + + // Step 1: Load existing snapshot and detect ALL gaps using comprehensive gap detection + let gaps = analyze_existing_snapshot_for_all_gaps().await?; + + if gaps.is_empty() { + println!("āœ… No gaps found in existing snapshot"); + return Ok(()); + } + + println!( + "šŸ” Found {} gaps to fill across all StateUpdate fields:", + gaps.len() + ); + + // Group and display gaps by field type + let mut gaps_by_field: HashMap> = HashMap::new(); + for gap in &gaps { + gaps_by_field + .entry(gap.field_type.clone()) + .or_insert_with(Vec::new) + .push(gap); + } + + for (field_type, field_gaps) in &gaps_by_field { + println!(" {:?}: {} gaps", field_type, field_gaps.len()); + } + + // Step 2: Fetch missing blocks using signature-based approach + println!( + "šŸŽÆ Processing all {} gaps across all StateUpdate fields", + gaps.len() + ); + + let (mut missing_blocks, mut missing_updates) = fetch_missing_blocks(&gaps).await?; + + // Step 3: Update snapshot with signature-based results + if !missing_blocks.is_empty() { + update_snapshot_with_missing_blocks(&missing_blocks).await?; + println!( + "āœ… Updated snapshot with {} signature-based blocks", + missing_blocks.len() + ); + } + + // Step 4: Validate and fallback for remaining gaps + println!("šŸ” Checking for remaining gaps after signature-based approach..."); + let remaining_gaps = analyze_existing_snapshot_for_all_gaps().await?; + + if remaining_gaps.is_empty() { + println!("āœ… All gaps filled by signature-based approach!"); + } else { + println!( + "āš ļø Still have {} gaps - triggering slot-range fallback", + remaining_gaps.len() + ); + + // Get RPC client for fallback + let rpc_url = std::env::var("RPC_URL") + .unwrap_or_else(|_| "https://api.devnet.solana.com".to_string()); + let client = RpcClient::new(rpc_url); + + // Rebuild existing slots index after snapshot update + let updated_existing_slots = build_existing_slot_index().await?; + let (fallback_blocks, fallback_updates) = + validate_and_fallback_gap_filling(&client, &remaining_gaps, &updated_existing_slots) + .await?; + + if !fallback_blocks.is_empty() { + let fallback_count = fallback_blocks.len(); + update_snapshot_with_missing_blocks(&fallback_blocks).await?; + missing_blocks.extend(fallback_blocks); + missing_updates.extend(fallback_updates); + println!( + "āœ… Updated snapshot with {} additional fallback blocks", + fallback_count + ); + } + } + + println!( + "šŸŽÆ Total blocks added: {}, V1 updates: {}", + missing_blocks.len(), + missing_updates.len() + ); + + // Step 5: Final verification + verify_gaps_filled().await?; + + println!("šŸŽ‰ Comprehensive gap filling completed!"); + + Ok(()) +} + +async fn analyze_existing_snapshot_for_all_gaps() -> Result> { + println!("šŸ“‚ Analyzing existing snapshot for ALL gaps using comprehensive gap detection..."); + + let snapshot_path = "target/snapshot_local"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_path.to_string(), + )); + + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + let all_blocks: Vec> = block_stream.collect().await; + let blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + println!("šŸ“¦ Processing {} blocks from snapshot", blocks.len()); + + // Extract sequences from all StateUpdates using the new system + let mut sequences = StateUpdateSequences::default(); + let mut total_transactions = 0; + let mut parsed_transactions = 0; + + for block in blocks { + let slot = block.metadata.slot; + total_transactions += block.transactions.len(); + + for transaction in &block.transactions { + let signature = transaction.signature.to_string(); + + // Parse each transaction to extract state updates + match parse_transaction(transaction, slot, None) { + Ok(state_update) => { + parsed_transactions += 1; + + // Extract sequences with context using the new method + sequences.extract_state_update_sequences(&state_update, slot, &signature); + } + Err(_) => { + // Skip failed parsing - compression transactions might have parsing issues + continue; + } + } + } + } + + println!( + "šŸ“Š Parsed {}/{} transactions successfully", + parsed_transactions, total_transactions + ); + + // Detect gaps across ALL StateUpdate fields using the comprehensive system + let all_gaps = detect_gaps_from_sequences(&sequences); + + println!( + "šŸ” Found {} total gaps across all StateUpdate fields", + all_gaps.len() + ); + + Ok(all_gaps) +} + +#[allow(unused)] +async fn analyze_existing_snapshot() -> Result> { + println!("šŸ“‚ Analyzing existing snapshot for V1 address tree gaps..."); + + // Get all gaps first + let all_gaps = analyze_existing_snapshot_for_all_gaps().await?; + + // Filter for V1 address tree gaps only (for backward compatibility) + let v1_gaps: Vec = all_gaps + .into_iter() + .filter(|gap| { + gap.field_type == StateUpdateFieldType::IndexedTreeUpdate + && gap.tree_pubkey == Some(V1_ADDRESS_TREE) + }) + .collect(); + + println!( + "šŸŽÆ Found {} gaps specifically in V1 address tree", + v1_gaps.len() + ); + + Ok(v1_gaps) +} + +/// Build a HashSet of all slot numbers that already exist in the current snapshot +async fn build_existing_slot_index() -> Result> { + let snapshot_path = "target/snapshot_local"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_path.to_string(), + )); + + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + let all_blocks: Vec> = block_stream.collect().await; + let blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + let existing_slots: HashSet = blocks.iter().map(|block| block.metadata.slot).collect(); + + Ok(existing_slots) +} + +/// Calculate global gap boundaries across all gaps +fn calculate_global_gap_boundaries(gaps: &[SequenceGap]) -> (u64, u64, String, String) { + let min_slot = gaps.iter().map(|g| g.before_slot).min().unwrap_or(0); + let max_slot = gaps.iter().map(|g| g.after_slot).max().unwrap_or(0); + + // Find the earliest before_signature and latest after_signature + // For comprehensive coverage, we want the earliest possible start and latest possible end + let earliest_before_sig = gaps + .iter() + .min_by_key(|g| g.before_slot) + .map(|g| g.before_signature.clone()) + .unwrap_or_default(); + + let latest_after_sig = gaps + .iter() + .max_by_key(|g| g.after_slot) + .map(|g| g.after_signature.clone()) + .unwrap_or_default(); + + (min_slot, max_slot, earliest_before_sig, latest_after_sig) +} + +/// Fetch ALL signatures between two boundaries with full pagination +async fn fetch_all_signatures_paginated( + client: &RpcClient, + earliest_before_sig: &str, + latest_after_sig: &str, +) -> Result> { + let compression_program_id = solana_sdk::pubkey::Pubkey::new_from_array( + photon_indexer::ingester::parser::get_compression_program_id().to_bytes(), + ); + + let before_signature = Signature::from_str(earliest_before_sig)?; + let until_signature = Signature::from_str(latest_after_sig)?; + + let mut all_signatures = Vec::new(); + let mut current_before = Some(until_signature); // Start from latest (going backwards) + let mut page_count = 0; + + loop { + page_count += 1; + let config = GetConfirmedSignaturesForAddress2Config { + before: current_before, + until: Some(before_signature), // Stop at earliest + limit: Some(1000), // Use smaller limit for better reliability + commitment: None, + }; + + let batch = client + .get_signatures_for_address_with_config(&compression_program_id, config) + .await?; + + if batch.is_empty() { + break; // No more signatures + } + + println!( + " šŸ“„ Page {}: fetched {} signatures", + page_count, + batch.len() + ); + + // Check if we've reached our until signature + let mut reached_until = false; + for sig_info in &batch { + if let Ok(sig) = Signature::from_str(&sig_info.signature) { + if sig == before_signature { + reached_until = true; + break; + } + } + } + + all_signatures.extend(batch.clone()); + + if reached_until || batch.len() < 1000 { + // If we got less than limit, we're done + break; + } + + // Update before for next page + current_before = batch + .last() + .and_then(|sig| Signature::from_str(&sig.signature).ok()); + } + + Ok(all_signatures) +} + +/// Efficiently fetch blocks in batch with progress tracking +async fn fetch_blocks_batch( + client: &RpcClient, + mut needed_slots: Vec, +) -> Result<(Vec, Vec)> { + needed_slots.sort(); // Process in order + + let mut missing_blocks = Vec::new(); + let mut missing_updates = Vec::new(); + let mut slots_with_missing_seqs = HashSet::new(); + + for (i, slot) in needed_slots.iter().enumerate() { + match client + .get_block_with_config( + *slot, + solana_client::rpc_config::RpcBlockConfig { + encoding: Some(solana_transaction_status::UiTransactionEncoding::Base64), + transaction_details: Some(solana_transaction_status::TransactionDetails::Full), + rewards: None, + commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }, + ) + .await + { + Ok(block) => { + if let Ok(block_info) = parse_ui_confirmed_blocked(block, *slot) { + let mut has_missing_seq = false; + + // Check if this block contains compression transactions (any type) + for transaction in &block_info.transactions { + if let Ok(state_update) = parse_transaction(transaction, *slot, None) { + // Check for any compression activity that could fill gaps + if !state_update.indexed_merkle_tree_updates.is_empty() + || !state_update.leaf_nullifications.is_empty() + || !state_update.batch_nullify_context.is_empty() + || !state_update.batch_new_addresses.is_empty() + || !state_update.batch_merkle_tree_events.is_empty() + || !state_update.out_accounts.is_empty() + { + println!( + " āœ… Found compression activity in slot {} [{}/{}]", + slot, + i + 1, + needed_slots.len() + ); + has_missing_seq = true; + + // Still collect V1 address tree updates for backwards compatibility + for ((tree_pubkey, _leaf_index), leaf_update) in + state_update.indexed_merkle_tree_updates + { + if leaf_update.tree_type == TreeType::AddressV1 + && tree_pubkey == V1_ADDRESS_TREE + { + missing_updates.push(leaf_update); + } + } + } else { + println!( + " āŒ No compression activity in slot {} [{}/{}]", + slot, + i + 1, + needed_slots.len() + ); + } + } + } + + // If this block has compression activity and we haven't already collected it + if has_missing_seq && !slots_with_missing_seqs.contains(slot) { + // Filter block to only include compression transactions + let filtered_block = BlockInfo { + metadata: block_info.metadata.clone(), + transactions: block_info + .transactions + .iter() + .filter(|tx| { + photon_indexer::snapshot::is_compression_transaction(tx) + }) + .cloned() + .collect(), + }; + + println!( + " šŸ“¦ Collected block {} with {} compression transactions [{}/{}]", + slot, + filtered_block.transactions.len(), + i + 1, + needed_slots.len() + ); + missing_blocks.push(filtered_block); + slots_with_missing_seqs.insert(*slot); + } + } + } + Err(e) => { + println!( + " āŒ Failed to fetch slot {} [{}/{}]: {}", + slot, + i + 1, + needed_slots.len(), + e + ); + } + } + } + + Ok((missing_blocks, missing_updates)) +} + +/// Validate if gaps remain after signature-based approach and fallback to slot-range fetching +async fn validate_and_fallback_gap_filling( + client: &RpcClient, + original_gaps: &[SequenceGap], + existing_slots: &HashSet, +) -> Result<(Vec, Vec)> { + // First, build a quick snapshot of what we currently have to check for remaining gaps + println!(" šŸ” Checking if gaps still exist after signature-based approach..."); + + // For validation, we need to re-analyze the current state + // This is a simplified check - in a real implementation we'd want to + // rebuild the full state, but for now we'll use the gap ranges as a proxy + + let mut fallback_slots = Vec::new(); + + // For each original gap, check if we might have missed slots in the range + for gap in original_gaps { + println!( + " šŸ“Š Checking gap in {:?}: slots {} to {}", + gap.field_type, gap.before_slot, gap.after_slot + ); + + // Generate all slots in the gap range + let gap_range_slots: Vec = (gap.before_slot + 1..gap.after_slot).collect(); + + // Find slots in this range that we don't have and haven't fetched + let missing_in_range: Vec = gap_range_slots + .iter() + .filter(|slot| !existing_slots.contains(slot)) + .copied() + .collect(); + + if !missing_in_range.is_empty() { + println!( + " āš ļø Found {} potentially missing slots in gap range", + missing_in_range.len() + ); + + fallback_slots.extend(missing_in_range); + } + } + + if fallback_slots.is_empty() { + println!(" āœ… No additional slots need fallback fetching"); + return Ok((Vec::new(), Vec::new())); + } + + // Remove duplicates and sort + fallback_slots.sort(); + fallback_slots.dedup(); + + println!( + " šŸ”„ Fallback: fetching {} additional slots from gap ranges", + fallback_slots.len() + ); + println!( + " šŸ“‹ Fallback slots: {:?}", + &fallback_slots[..std::cmp::min(10, fallback_slots.len())] + ); + + // Use the same batch fetching approach for fallback slots + let result = fetch_blocks_batch(client, fallback_slots).await?; + println!( + " āœ… Fallback completed: {} blocks, {} updates", + result.0.len(), + result.1.len() + ); + Ok(result) +} + +async fn fetch_missing_blocks( + gaps: &[SequenceGap], +) -> Result<(Vec, Vec)> { + println!("🌐 Ultra-Efficient Global Gap Filling Starting..."); + + if gaps.is_empty() { + return Ok((Vec::new(), Vec::new())); + } + + // Get RPC URL from environment variable or use default devnet + let rpc_url = + std::env::var("RPC_URL").unwrap_or_else(|_| "https://api.devnet.solana.com".to_string()); + + println!("šŸ”— Using RPC endpoint: {}", rpc_url); + let client = RpcClient::new(rpc_url); + + // Phase 1: Build existing slot index from current snapshot + println!("šŸ“‚ Phase 1: Building existing slot index from snapshot..."); + let existing_slots = build_existing_slot_index().await?; + println!( + "šŸ“Š Found {} existing slots in snapshot", + existing_slots.len() + ); + + // Phase 1.5: Calculate global gap boundaries + println!("šŸŒ Phase 1.5: Calculating global gap boundaries..."); + let (min_slot, max_slot, earliest_before_sig, latest_after_sig) = + calculate_global_gap_boundaries(gaps); + println!( + "šŸŽÆ Global gap range: slots {} to {} (span: {} slots)", + min_slot, + max_slot, + max_slot - min_slot + ); + println!( + "šŸ”— Global signature range: {} -> {}", + &earliest_before_sig[..8], + &latest_after_sig[..8] + ); + + // Phase 2: Smart signature collection with pagination + println!("šŸ“” Phase 2: Fetching ALL signatures with pagination..."); + let all_signatures = + fetch_all_signatures_paginated(&client, &earliest_before_sig, &latest_after_sig).await?; + println!( + "āœ… Collected {} total signatures across all gaps", + all_signatures.len() + ); + + // Phase 3: Extract and filter slots + println!("šŸ” Phase 3: Extracting and filtering slots..."); + let signature_slots: HashSet = all_signatures + .iter() + .filter(|sig_info| sig_info.err.is_none()) // Skip failed transactions + .map(|sig_info| sig_info.slot) + .collect(); + println!( + "šŸ“Š Found {} unique slots from signatures", + signature_slots.len() + ); + + // Filter out slots we already have - this is the key optimization! + let needed_slots: Vec = signature_slots + .iter() + .filter(|slot| !existing_slots.contains(slot)) + .copied() + .collect(); + + println!( + "šŸŽÆ Need to fetch {} new blocks (filtered out {} existing)", + needed_slots.len(), + signature_slots.len() - needed_slots.len() + ); + + // Phase 4: Efficient batch block fetching (even if empty) + let (missing_blocks, missing_updates) = if needed_slots.is_empty() { + println!("šŸ“¦ Phase 4: No new blocks to fetch from signatures"); + (Vec::new(), Vec::new()) + } else { + println!( + "šŸ“¦ Phase 4: Fetching {} missing blocks...", + needed_slots.len() + ); + fetch_blocks_batch(&client, needed_slots).await? + }; + + println!( + "šŸŽÆ Signature-based approach: found {} blocks, {} updates", + missing_blocks.len(), + missing_updates.len() + ); + Ok((missing_blocks, missing_updates)) +} + +#[allow(unused)] +fn validate_sequence_consistency(updates: &[IndexedTreeLeafUpdate]) -> Result<()> { + println!("šŸ” Validating sequence consistency after gap filling..."); + + if updates.is_empty() { + return Err(anyhow::anyhow!("No updates to validate")); + } + + let first_seq = updates[0].seq; + let last_seq = updates.last().unwrap().seq; + println!( + "šŸ“ˆ Sequence range: {} to {} (span: {})", + first_seq, + last_seq, + last_seq - first_seq + 1 + ); + + // Check for sequential ordering + let mut expected_seq = first_seq; + let mut gaps = Vec::new(); + + for (i, update) in updates.iter().enumerate() { + if update.seq != expected_seq { + gaps.push((i, expected_seq, update.seq)); + } + expected_seq = update.seq + 1; + } + + // Check for duplicates + let mut seq_counts: HashMap = HashMap::new(); + for update in updates { + *seq_counts.entry(update.seq).or_insert(0) += 1; + } + + let duplicates: Vec<_> = seq_counts + .iter() + .filter(|(_, &count)| count > 1) + .map(|(&seq, &count)| (seq, count)) + .collect(); + + // Report results + println!("\nšŸ“Š Final Validation Results:"); + + if gaps.is_empty() { + println!("āœ… All v1 address tree sequence numbers are now sequential!"); + } else { + println!("āŒ Still found {} gaps:", gaps.len()); + for (index, expected, actual) in gaps.iter().take(5) { + println!( + " Index {}: expected seq {}, found seq {}", + index, expected, actual + ); + } + } + + if duplicates.is_empty() { + println!("āœ… No duplicate sequence numbers found"); + } else { + println!("āŒ Found {} duplicate sequence numbers", duplicates.len()); + } + + if !gaps.is_empty() { + return Err(anyhow::anyhow!( + "Sequence gaps still exist after gap filling" + )); + } + + if !duplicates.is_empty() { + return Err(anyhow::anyhow!("Duplicate sequence numbers found")); + } + + println!("āœ… Perfect sequence consistency achieved!"); + Ok(()) +} + +async fn update_snapshot_with_missing_blocks(missing_blocks: &[BlockInfo]) -> Result<()> { + println!("šŸ’¾ Updating snapshot file with missing blocks..."); + + let snapshot_path = "target/snapshot_local"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_path.to_string(), + )); + + // Load existing blocks from snapshot + let block_stream = load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + let all_blocks: Vec> = block_stream.collect().await; + let mut existing_blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + println!( + "šŸ“¦ Loaded {} existing blocks from snapshot", + existing_blocks.len() + ); + + // Add missing blocks to existing blocks + existing_blocks.extend_from_slice(missing_blocks); + + // Sort all blocks by slot + existing_blocks.sort_by_key(|block| block.metadata.slot); + + println!( + "šŸ“¦ Total blocks after adding missing: {}", + existing_blocks.len() + ); + + // Clear existing snapshot files + let existing_snapshots = + photon_indexer::snapshot::get_snapshot_files_with_metadata(directory_adapter.as_ref()) + .await?; + for snapshot in existing_snapshots { + directory_adapter.delete_file(snapshot.file).await?; + } + + // Create new snapshot with all blocks + let first_slot = existing_blocks + .first() + .map(|b| b.metadata.slot) + .unwrap_or(0); + let last_slot = existing_blocks.last().map(|b| b.metadata.slot).unwrap_or(0); + + let snapshot_filename = format!("snapshot-{}-{}", first_slot, last_slot); + + println!("šŸ’¾ Writing updated snapshot: {}", snapshot_filename); + + // Serialize all blocks + let mut snapshot_data = Vec::new(); + for block in &existing_blocks { + let block_bytes = bincode::serialize(block).unwrap(); + snapshot_data.extend(block_bytes); + } + + // Write updated snapshot file + let snapshot_path_buf = std::path::PathBuf::from(snapshot_path).join(&snapshot_filename); + std::fs::write(&snapshot_path_buf, snapshot_data)?; + + println!( + "āœ… Successfully updated snapshot with {} total blocks", + existing_blocks.len() + ); + Ok(()) +} + +async fn verify_gaps_filled() -> Result<()> { + println!("šŸ” Verifying ALL gaps are filled in updated snapshot..."); + + // Run comprehensive analysis to check for all types of gaps + let all_gaps = analyze_existing_snapshot_for_all_gaps().await?; + + if all_gaps.is_empty() { + println!("šŸŽ‰ SUCCESS: All gaps across all StateUpdate fields have been filled!"); + return Ok(()); + } + + println!("āš ļø Still found {} gaps after filling:", all_gaps.len()); + + // Group remaining gaps by field type for better reporting + let mut gaps_by_field: HashMap> = HashMap::new(); + for gap in &all_gaps { + gaps_by_field + .entry(gap.field_type.clone()) + .or_insert_with(Vec::new) + .push(gap); + } + + for (field_type, field_gaps) in &gaps_by_field { + println!(" {:?}: {} remaining gaps", field_type, field_gaps.len()); + for gap in field_gaps.iter().take(2) { + // Show first 2 gaps for each field type + println!(" Slot {} -> {}", gap.before_slot, gap.after_slot); + } + if field_gaps.len() > 2 { + println!(" ... and {} more", field_gaps.len() - 2); + } + } + + // This is still success - we may not have filled all gaps due to missing blocks on RPC + println!( + "ā„¹ļø Note: Some gaps may remain due to missing blocks on RPC or truly missing sequences" + ); + Ok(()) +} diff --git a/tests/integration_tests/test_v1_address_tree_sequence_consistency.rs b/tests/integration_tests/test_v1_address_tree_sequence_consistency.rs new file mode 100644 index 00000000..f35c01e1 --- /dev/null +++ b/tests/integration_tests/test_v1_address_tree_sequence_consistency.rs @@ -0,0 +1,397 @@ +use anyhow::Result; +use futures::StreamExt; +use photon_indexer::ingester::parser::{ + indexer_events::MerkleTreeEvent, parse_transaction, state_update::StateUpdate, +}; +use photon_indexer::snapshot::{load_block_stream_from_directory_adapter, DirectoryAdapter}; +use solana_pubkey::Pubkey; +use std::collections::HashMap; +use std::sync::Arc; + +fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 { + match event { + MerkleTreeEvent::BatchAppend(_) => 1, + MerkleTreeEvent::BatchNullify(_) => 2, + MerkleTreeEvent::BatchAddressAppend(_) => 3, + _ => 0, // Other event types we don't care about + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum StateUpdateFieldType { + IndexedTreeUpdate, + LeafNullification, + BatchNullifyContext, + BatchNewAddress, + BatchMerkleTreeEventAppend, + BatchMerkleTreeEventNullify, + BatchMerkleTreeEventAddressAppend, + OutAccount, +} + +#[derive(Debug, Clone)] +pub struct SequenceGap { + // Boundary information for gap filling + pub before_slot: u64, + pub after_slot: u64, + pub before_signature: String, + pub after_signature: String, + + // Tree/context metadata + pub tree_pubkey: Option, // Tree pubkey (unified for all tree operations) + pub _tree_type_string: Option, // Tree type string (for indexed tree updates) + pub field_type: StateUpdateFieldType, +} + +#[derive(Debug, Default, Clone)] +pub struct StateUpdateSequences { + // Sequences with slot and signature information for gap analysis + indexed_tree_seqs: HashMap<(Pubkey, String), Vec<(u64, u64, String)>>, // (tree, type_string) -> (seq, slot, signature) + nullification_seqs: HashMap>, // tree -> (seq, slot, signature) + batch_nullify_queue_indexes: Vec<(u64, u64, String)>, // (queue_index, slot, signature) + batch_address_queue_indexes: HashMap>, // tree -> (queue_index, slot, signature) + batch_merkle_event_seqs: HashMap<(Pubkey, u8), Vec<(u64, u64, String)>>, // (tree_pubkey, event_type) -> (seq, slot, signature) + out_account_leaf_indexes: HashMap>, // tree -> (leaf_index, slot, signature) +} +impl StateUpdateSequences { + /// Extracts sequences from a StateUpdate with slot and signature context + pub fn extract_state_update_sequences( + &mut self, + state_update: &StateUpdate, + slot: u64, + signature: &str, + ) { + // Extract indexed tree sequences + for ((tree_pubkey, _), leaf_update) in &state_update.indexed_merkle_tree_updates { + let tree_type_string = format!("{:?}", leaf_update.tree_type); + self.indexed_tree_seqs + .entry((*tree_pubkey, tree_type_string)) + .or_insert_with(Vec::new) + .push((leaf_update.seq, slot, signature.to_string())); + } + + // Extract leaf nullification sequences + for nullification in &state_update.leaf_nullifications { + self.nullification_seqs + .entry(nullification.tree) + .or_insert_with(Vec::new) + .push((nullification.seq, slot, signature.to_string())); + } + + // Extract batch nullify context queue indexes + for context in &state_update.batch_nullify_context { + self.batch_nullify_queue_indexes.push(( + context.nullifier_queue_index, + slot, + signature.to_string(), + )); + } + + // Extract batch new address queue indexes + for address in &state_update.batch_new_addresses { + self.batch_address_queue_indexes + .entry(address.tree.0) + .or_insert_with(Vec::new) + .push((address.queue_index, slot, signature.to_string())); + } + + // Extract batch merkle tree event sequences + for (tree_hash, events) in &state_update.batch_merkle_tree_events { + let tree_pubkey = Pubkey::from(*tree_hash); + for (seq, merkle_event) in events { + let event_type = merkle_event_to_type_id(merkle_event); + if event_type > 0 { + self.batch_merkle_event_seqs + .entry((tree_pubkey, event_type)) + .or_insert_with(Vec::new) + .push((*seq, slot, signature.to_string())); + } + } + } + + // Extract out_account leaf indexes + for account_with_context in &state_update.out_accounts { + let tree_pubkey = account_with_context.account.tree.0; + let leaf_index = account_with_context.account.leaf_index.0; + self.out_account_leaf_indexes + .entry(tree_pubkey) + .or_insert_with(Vec::new) + .push((leaf_index, slot, signature.to_string())); + } + } +} + +/// Merges multiple StateUpdateSequences into a single aggregated structure +pub fn merge_state_update_sequences( + all_sequences: &[StateUpdateSequences], +) -> StateUpdateSequences { + let mut aggregated = StateUpdateSequences::default(); + + for sequences in all_sequences { + // Merge indexed tree sequences + for ((tree, tree_type_string), seqs) in &sequences.indexed_tree_seqs { + aggregated + .indexed_tree_seqs + .entry((*tree, tree_type_string.clone())) + .or_insert_with(Vec::new) + .extend(seqs.clone()); + } + + // Merge nullification sequences + for (tree, seqs) in &sequences.nullification_seqs { + aggregated + .nullification_seqs + .entry(*tree) + .or_insert_with(Vec::new) + .extend(seqs.clone()); + } + + // Merge batch nullify queue indexes + aggregated + .batch_nullify_queue_indexes + .extend(sequences.batch_nullify_queue_indexes.clone()); + + // Merge batch address queue indexes + for (tree, seqs) in &sequences.batch_address_queue_indexes { + aggregated + .batch_address_queue_indexes + .entry(*tree) + .or_insert_with(Vec::new) + .extend(seqs.clone()); + } + + // Merge batch merkle event sequences + for ((tree, event_type), seqs) in &sequences.batch_merkle_event_seqs { + aggregated + .batch_merkle_event_seqs + .entry((*tree, *event_type)) + .or_insert_with(Vec::new) + .extend(seqs.clone()); + } + + // Merge out_account leaf indexes + for (tree, seqs) in &sequences.out_account_leaf_indexes { + aggregated + .out_account_leaf_indexes + .entry(*tree) + .or_insert_with(Vec::new) + .extend(seqs.clone()); + } + } + + aggregated +} + +/// Detects gaps from a single StateUpdateSequences struct +pub fn detect_gaps_from_sequences(sequences: &StateUpdateSequences) -> Vec { + let sequences_vec = vec![sequences.clone()]; + detect_all_sequence_gaps(&sequences_vec) +} + +/// Comprehensive gap detection function that takes a vector of StateUpdateSequences and returns ALL gaps found +/// Aggregates sequences from multiple StateUpdates and detects gaps across all transactions +pub fn detect_all_sequence_gaps(all_sequences: &[StateUpdateSequences]) -> Vec { + // First aggregate all sequences from multiple StateUpdates + let sequences = merge_state_update_sequences(all_sequences); + + let mut all_gaps = Vec::new(); + + // Check indexed tree updates + for ((tree_pubkey, tree_type_string), seqs) in &sequences.indexed_tree_seqs { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + Some(tree_type_string.clone()), + StateUpdateFieldType::IndexedTreeUpdate, + ); + all_gaps.extend(gaps); + } + + // Check leaf nullifications + for (tree_pubkey, seqs) in &sequences.nullification_seqs { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::LeafNullification, + ); + all_gaps.extend(gaps); + } + + // Check batch nullify context + if !sequences.batch_nullify_queue_indexes.is_empty() { + let gaps = detect_sequence_gaps_with_metadata( + &sequences.batch_nullify_queue_indexes, + None, + None, + StateUpdateFieldType::BatchNullifyContext, + ); + all_gaps.extend(gaps); + } + + // Check batch new addresses + for (tree_pubkey, seqs) in &sequences.batch_address_queue_indexes { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::BatchNewAddress, + ); + all_gaps.extend(gaps); + } + + // Check batch merkle tree events + for ((tree_pubkey, event_type), seqs) in &sequences.batch_merkle_event_seqs { + let field_type = match event_type { + 1 => StateUpdateFieldType::BatchMerkleTreeEventAppend, + 2 => StateUpdateFieldType::BatchMerkleTreeEventNullify, + 3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend, + _ => continue, + }; + + let gaps = detect_sequence_gaps_with_metadata(seqs, Some(*tree_pubkey), None, field_type); + all_gaps.extend(gaps); + } + + // Check out_account leaf indexes + for (tree_pubkey, seqs) in &sequences.out_account_leaf_indexes { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::OutAccount, + ); + all_gaps.extend(gaps); + } + + all_gaps +} + +/// Detects gaps in a sequence with full metadata for gap filling +fn detect_sequence_gaps_with_metadata( + sequences: &[(u64, u64, String)], // (seq, slot, signature) + tree_pubkey: Option, + tree_type_string: Option, + field_type: StateUpdateFieldType, +) -> Vec { + if sequences.len() < 2 { + return Vec::new(); + } + + let mut sorted_sequences = sequences.to_vec(); + sorted_sequences.sort_by_key(|(seq, _, _)| *seq); + + let mut gaps = Vec::new(); + + for i in 1..sorted_sequences.len() { + let (prev_seq, prev_slot, prev_sig) = &sorted_sequences[i - 1]; + let (curr_seq, curr_slot, curr_sig) = &sorted_sequences[i]; + + if curr_seq - prev_seq > 1 { + gaps.push(SequenceGap { + before_slot: *prev_slot, + after_slot: *curr_slot, + before_signature: prev_sig.clone(), + after_signature: curr_sig.clone(), + tree_pubkey, + _tree_type_string: tree_type_string.clone(), + field_type: field_type.clone(), + }); + } + } + + gaps +} + +#[tokio::test] +#[ignore] +async fn test_comprehensive_state_update_validation() -> Result<()> { + println!("šŸ” Testing Comprehensive StateUpdate Sequence Consistency"); + + // Load blocks from the created snapshot + let snapshot_path = + std::env::var("TEST_SNAPSHOT_PATH").unwrap_or_else(|_| "test_data/snapshot".to_string()); + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory( + snapshot_path.to_string(), + )); + + println!("šŸ“‚ Loading snapshot from: {}", snapshot_path); + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + + // Collect all blocks from the stream + let all_blocks: Vec> = block_stream.collect().await; + let blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + println!("šŸ“¦ Processing {} blocks from snapshot", blocks.len()); + + // Extract sequences from all StateUpdates with context + let mut sequences = StateUpdateSequences::default(); + let mut total_transactions = 0; + let mut parsed_transactions = 0; + + for block in blocks { + let slot = block.metadata.slot; + total_transactions += block.transactions.len(); + + for transaction in &block.transactions { + let signature = transaction.signature.to_string(); + + // Parse each transaction to extract state updates + match parse_transaction(transaction, slot, None) { + Ok(state_update) => { + parsed_transactions += 1; + + // Extract sequences with context for comprehensive validation + sequences.extract_state_update_sequences(&state_update, slot, &signature); + } + Err(_) => { + // Skip failed parsing - compression transactions might have parsing issues + continue; + } + } + } + } + + println!( + "šŸ“Š Parsed {}/{} transactions successfully", + parsed_transactions, total_transactions + ); + + // Detect gaps across all transactions + let gaps = detect_all_sequence_gaps(&[sequences]); + + // Comprehensive validation summary + println!("\nšŸ” Comprehensive StateUpdate validation results:"); + println!( + "šŸ“Š Total gaps detected across all transactions: {}", + gaps.len() + ); + + if gaps.is_empty() { + println!("šŸŽ‰ All StateUpdate sequences are perfectly consistent!"); + } else { + // Group gaps by field type for summary + let mut gaps_by_field: HashMap> = HashMap::new(); + for gap in &gaps { + gaps_by_field + .entry(gap.field_type.clone()) + .or_insert_with(Vec::new) + .push(gap); + println!( + "DEBUG: Found gap for tree: {:?}, {:?}", + gap.tree_pubkey, gap + ); + } + + println!("āš ļø Gap breakdown by field type:"); + for (field_type, field_gaps) in &gaps_by_field { + println!(" {:?}: {} gaps", field_type, field_gaps.len()); + } + + println!("āš ļø These gaps may need investigation or gap filling"); + } + + println!("\nšŸŽ‰ Comprehensive StateUpdate validation completed!"); + + Ok(()) +} diff --git a/tests/integration_tests/utils.rs b/tests/integration_tests/utils.rs index e264519e..e8f425da 100644 --- a/tests/integration_tests/utils.rs +++ b/tests/integration_tests/utils.rs @@ -443,7 +443,7 @@ pub async fn index_transaction( tx: &str, ) { let tx = cached_fetch_transaction(test_name, rpc_client, tx).await; - let state_update = parse_transaction(&tx.try_into().unwrap(), 0).unwrap(); + let state_update = parse_transaction(&tx.try_into().unwrap(), 0, None).unwrap(); persist_state_update_using_connection(db_conn.as_ref(), state_update) .await .unwrap(); @@ -462,7 +462,7 @@ pub async fn index_multiple_transactions( } let mut state_updates = Vec::new(); for transaction_info in transactions_infos { - let tx_state_update = parse_transaction(&transaction_info, 0).unwrap(); + let tx_state_update = parse_transaction(&transaction_info, 0, None).unwrap(); state_updates.push(tx_state_update); } let state_update = StateUpdate::merge_updates(state_updates); diff --git a/tests/integration_tests/zeroeth_element_fix_test.rs b/tests/integration_tests/zeroeth_element_fix_test.rs index 6f2745bf..5392cd6d 100644 --- a/tests/integration_tests/zeroeth_element_fix_test.rs +++ b/tests/integration_tests/zeroeth_element_fix_test.rs @@ -115,7 +115,8 @@ async fn test_reindex_fixes_wrong_zeroeth_element( tree_type: TreeType::AddressV1, leaf: correct_zeroeth_leaf, hash: correct_hash.0, - seq: 2, // Higher seq number to ensure update + seq: 2, // Higher seq number to ensure update + signature: [1u8; 64].into(), // Mock signature for testing purposes }; // Create HashMap with the update @@ -211,6 +212,7 @@ async fn test_reindex_preserves_correct_zeroeth_element( leaf, hash: hash.0, seq: 1, + signature: Default::default(), }; let mut updates = HashMap::new();