diff --git a/.gitignore b/.gitignore index 8196c671..f2ba007b 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ test-ledger/ minio test.db docker-compose.yml +*.txt diff --git a/Cargo.lock b/Cargo.lock index ae825d0c..7edd9674 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,8 +105,7 @@ checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" [[package]] name = "aligned-sized" version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48a526ec4434d531d488af59fe866f36b310fe8906691c75dffa664450a3800a" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "proc-macro2", "quote", @@ -3681,8 +3680,7 @@ dependencies = [ [[package]] name = "light-account-checks" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3fd000a2b8e0cc9d0b7b7712964870df51f2114f1693b9d8f0414f6f3ec16bd" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "solana-account-info", "solana-program-error", @@ -3694,15 +3692,14 @@ dependencies = [ [[package]] name = "light-batched-merkle-tree" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81c7e179246468b09bf5c6882ef33043e178ff90eb6eab0c1c4c3623ef84b154" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "aligned-sized", "borsh 0.10.4", "light-account-checks", "light-bloom-filter", "light-compressed-account", - "light-hasher", + "light-hasher 3.1.0 (git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19)", "light-macros", "light-merkle-tree-metadata", "light-verifier", @@ -3719,8 +3716,7 @@ dependencies = [ [[package]] name = "light-bloom-filter" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44abcb5554e1c15cefa9ac17e4ceda6f5afb039db25ab1fd777f012356d0f964" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "bitvec", "num-bigint 0.4.6", @@ -3744,13 +3740,12 @@ dependencies = [ [[package]] name = "light-compressed-account" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f15113babaca9efb592631ec1e7e78c1c83413818a6e1e4248b7df53d88fe65" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "anchor-lang 0.31.1", "borsh 0.10.4", "bytemuck", - "light-hasher", + "light-hasher 3.1.0 (git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19)", "light-macros", "light-zero-copy", "solana-program-error", @@ -3767,7 +3762,7 @@ checksum = "9b4f878301620df78ba7e7758c5fd720f28040f5c157375f88d310f15ddb1746" dependencies = [ "borsh 0.10.4", "light-bounded-vec", - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "memoffset 0.9.1", "thiserror 2.0.12", ] @@ -3777,6 +3772,23 @@ name = "light-hasher" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6445937ea244bebae0558e2aaec375791895d08c785b87cc45b62cd80d69139" +dependencies = [ + "ark-bn254 0.5.0", + "ark-ff 0.5.0", + "arrayvec", + "borsh 0.10.4", + "light-poseidon 0.3.0", + "num-bigint 0.4.6", + "sha2 0.10.9", + "sha3 0.10.8", + "solana-nostd-keccak", + "thiserror 2.0.12", +] + +[[package]] +name = "light-hasher" +version = "3.1.0" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "ark-bn254 0.5.0", "ark-ff 0.5.0", @@ -3798,7 +3810,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc786d8df68ef64493fea04914a7a7745f8122f2efbae043cd4ba4eaffa9e6db" dependencies = [ - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num-bigint 0.4.6", "num-traits", "thiserror 2.0.12", @@ -3807,8 +3819,7 @@ dependencies = [ [[package]] name = "light-macros" version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "861c0817697c1201c2235cd831fcbaa2564a5f778e5229e9f5cc21035e97c273" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "bs58 0.5.1", "proc-macro2", @@ -3819,8 +3830,7 @@ dependencies = [ [[package]] name = "light-merkle-tree-metadata" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "544048fa95ea95fc1e952a2b9b1d6f09340c8decaffd1ad239fe1f6eb905ae76" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "borsh 0.10.4", "bytemuck", @@ -3838,7 +3848,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1650701feac958261b2c3ab4da361ad8548985ee3ee496a17e76db44d2d3c9e3" dependencies = [ - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "light-indexed-array", "num-bigint 0.4.6", "num-traits", @@ -3872,8 +3882,7 @@ dependencies = [ [[package]] name = "light-verifier" version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85fdf317ec3cfcd3a8e6556a5b5e7fbcc207a40264700f9a5271876838f26f58" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "groth16-solana", "light-compressed-account", @@ -3883,8 +3892,7 @@ dependencies = [ [[package]] name = "light-zero-copy" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a34d759f65547a6540db7047f38f4cb2c3f01658deca95a1dd06f26b578de947" +source = "git+https://github.com/lightprotocol/light-protocol?rev=341aae4dfc89a27913e6ff1af65572d626b0cc19#341aae4dfc89a27913e6ff1af65572d626b0cc19" dependencies = [ "solana-program-error", "thiserror 2.0.12", @@ -4564,7 +4572,7 @@ dependencies = [ "light-batched-merkle-tree", "light-compressed-account", "light-concurrent-merkle-tree", - "light-hasher", + "light-hasher 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "light-merkle-tree-metadata", "light-merkle-tree-reference", "light-poseidon 0.3.0", diff --git a/Cargo.toml b/Cargo.toml index b0e81c70..ed906cf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,9 +82,11 @@ solana-pubkey = "2.3.0" solana-transaction-status = "1.18.0" light-concurrent-merkle-tree = "2.1.0" -light-batched-merkle-tree = "0.3.0" -light-merkle-tree-metadata = "0.3.0" -light-compressed-account = { version = "0.3.0", features = ["anchor"] } +light-batched-merkle-tree = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" } +light-merkle-tree-metadata = { version = "0.3.0", git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" } +light-compressed-account = { version = "0.3.0", features = [ + "anchor", +], git = "https://github.com/lightprotocol/light-protocol", rev = "341aae4dfc89a27913e6ff1af65572d626b0cc19" } light-hasher = { version = "3.1.0" } light-poseidon = "0.3.0" diff --git a/src/api/method/get_multiple_compressed_accounts.rs b/src/api/method/get_multiple_compressed_accounts.rs index 9d925765..3b418563 100644 --- a/src/api/method/get_multiple_compressed_accounts.rs +++ b/src/api/method/get_multiple_compressed_accounts.rs @@ -148,7 +148,11 @@ pub async fn get_multiple_compressed_accounts( } fetch_account_from_addresses(conn, addresses).await? } - _ => panic!("Either hashes or addresses must be provided"), + _ => { + return Err(PhotonApiError::ValidationError( + "Either hashes or addresses must be provided".to_string(), + )); + } }; Ok(GetMultipleCompressedAccountsResponse { diff --git a/src/ingester/detect_gaps.rs b/src/ingester/detect_gaps.rs new file mode 100644 index 00000000..d2d3d803 --- /dev/null +++ b/src/ingester/detect_gaps.rs @@ -0,0 +1,569 @@ +use crate::ingester::parser::{ + indexer_events::MerkleTreeEvent, + state_update::StateUpdate, + tree_info::{TreeTypeSeq, QUEUE_TREE_MAPPING}, +}; +use lazy_static::lazy_static; +use solana_pubkey::Pubkey; +use std::collections::HashMap; +use std::sync::Mutex; +use tracing::warn; + +// Global sequence state tracker to maintain latest observed sequences +lazy_static! { + pub static ref SEQUENCE_STATE: Mutex> = Mutex::new(HashMap::new()); +} + +fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 { + match event { + MerkleTreeEvent::BatchAppend(_) => 1, + MerkleTreeEvent::BatchNullify(_) => 2, + MerkleTreeEvent::BatchAddressAppend(_) => 3, + _ => 0, // Other event types we don't care about + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum StateUpdateFieldType { + IndexedTreeUpdate, + LeafNullification, + BatchNullifyContext, + BatchNewAddress, + BatchMerkleTreeEventAppend, + BatchMerkleTreeEventNullify, + BatchMerkleTreeEventAddressAppend, + OutAccount, +} + +#[derive(Debug, Clone)] +pub struct SequenceGap { + // Boundary information for gap filling + pub before_slot: u64, + pub after_slot: u64, + pub before_signature: String, + pub after_signature: String, + + // Tree/context metadata + pub tree_pubkey: Option, // Tree pubkey (unified for all tree operations) + // pub tree_type_string: Option, // Tree type string (for indexed tree updates) + pub field_type: StateUpdateFieldType, +} + +#[derive(Debug, Default, Clone)] +pub struct SequenceEntry { + pub sequence: u64, + pub slot: u64, + pub signature: String, +} + +#[derive(Debug, Default, Clone)] +pub struct StateUpdateSequences { + // Sequences with slot and signature information for gap analysis + indexed_tree_seqs: HashMap<(Pubkey, u64), Vec>, // (tree, tree_type_id) -> entries + nullification_seqs: HashMap>, // tree -> entries + batch_nullify_queue_indexes: HashMap>, // tree -> queue_index entries + batch_address_queue_indexes: HashMap>, // tree -> queue_index entries + batch_merkle_event_seqs: HashMap<(Pubkey, u8), Vec>, // (tree_pubkey, event_type) -> entries + out_account_leaf_indexes: HashMap>, // tree -> leaf_index entries +} + +/// Updates the global sequence state with the latest observed sequences +pub fn update_sequence_state(sequences: &StateUpdateSequences) { + let mut state = SEQUENCE_STATE.lock().unwrap(); + + // Update indexed tree sequences + for ((tree_pubkey, _tree_type_id), entries) in &sequences.indexed_tree_seqs { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + // Check the actual tree type from the mapping + if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) { + match info.tree_type { + light_compressed_account::TreeType::AddressV1 => { + state.insert(tree_str, TreeTypeSeq::AddressV1(max_entry.clone())); + } + light_compressed_account::TreeType::StateV1 => { + state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone())); + } + _ => { + // Other tree types not handled in indexed_tree_seqs + } + } + } + } + } + + // Update nullification sequences + for (tree_pubkey, entries) in &sequences.nullification_seqs { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone())); + } + } + + // Update batch address queue indexes + for (tree_pubkey, entries) in &sequences.batch_address_queue_indexes { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + println!( + "DEBUG: Updating batch_address_queue_indexes for tree: {}, sequence: {}", + tree_str, max_entry.sequence + ); + let input_queue_entry = if let Some(current_seq) = state.get(&tree_str) { + if let TreeTypeSeq::AddressV2(input_queue_entry, _) = current_seq { + input_queue_entry.clone() + } else { + SequenceEntry { + sequence: 0, + slot: 0, + signature: String::new(), + } + } + } else { + SequenceEntry { + sequence: 0, + slot: 0, + signature: String::new(), + } + }; + state.insert( + tree_str, + TreeTypeSeq::AddressV2(input_queue_entry, max_entry.clone()), + ); + } + } + + // Update out account leaf indexes for StateV2 trees + for (tree_pubkey, entries) in &sequences.out_account_leaf_indexes { + if let Some(max_entry) = entries.iter().max_by_key(|e| e.sequence) { + let tree_str = tree_pubkey.to_string(); + if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) { + match info.tree_type { + light_compressed_account::TreeType::StateV2 => { + let mut seq_context = if let Some(current_seq) = state.get(&tree_str) { + if let TreeTypeSeq::StateV2(seq_context) = current_seq { + seq_context.clone() + } else { + crate::ingester::parser::tree_info::StateV2SeqWithContext::default() + } + } else { + crate::ingester::parser::tree_info::StateV2SeqWithContext::default() + }; + seq_context.output_queue_entry = Some(max_entry.clone()); + state.insert(tree_str, TreeTypeSeq::StateV2(seq_context)); + } + _ => { + state.insert(tree_str, TreeTypeSeq::StateV1(max_entry.clone())); + } + } + } + } + } +} + +impl StateUpdateSequences { + /// Extracts sequences from a StateUpdate with slot and signature context + pub fn extract_state_update_sequences( + &mut self, + state_update: &StateUpdate, + slot: u64, + signature: &str, + ) { + // Extract indexed tree sequences + for ((tree_pubkey, _), leaf_update) in &state_update.indexed_merkle_tree_updates { + self.indexed_tree_seqs + .entry((*tree_pubkey, leaf_update.tree_type as u64)) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: leaf_update.seq, + slot, + signature: signature.to_string(), + }); + } + + // Extract leaf nullification sequences + for nullification in &state_update.leaf_nullifications { + self.nullification_seqs + .entry(nullification.tree) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: nullification.seq, + slot, + signature: signature.to_string(), + }); + } + + // Extract batch nullify context queue indexes + for context in &state_update.batch_nullify_context { + let tree = Pubkey::new_from_array(context.tree_pubkey.to_bytes()); + self.batch_nullify_queue_indexes + .entry(tree) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: context.nullifier_queue_index, + slot, + signature: signature.to_string(), + }); + } + + // Extract batch new address queue indexes + for address in &state_update.batch_new_addresses { + let tree_str = address.tree.0.to_string(); + println!( + "DEBUG: Extracting batch_new_address for tree: {}, queue_index: {}", + tree_str, address.queue_index + ); + + // Check if this is an AddressV1 tree incorrectly in batch operations + if let Some(info) = QUEUE_TREE_MAPPING.get(&tree_str) { + if info.tree_type == light_compressed_account::TreeType::AddressV1 { + println!("ERROR: AddressV1 tree {} found in batch_new_addresses - this should not happen!", tree_str); + println!( + " queue_index: {}, slot: {}, signature: {}", + address.queue_index, slot, signature + ); + // Skip this invalid data + continue; + } + } + + self.batch_address_queue_indexes + .entry(address.tree.0) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: address.queue_index, + slot, + signature: signature.to_string(), + }); + } + + // Extract batch merkle tree event sequences + for (tree_hash, events) in &state_update.batch_merkle_tree_events { + let tree_pubkey = Pubkey::from(*tree_hash); + for (seq, merkle_event) in events { + let event_type = merkle_event_to_type_id(merkle_event); + if event_type > 0 { + self.batch_merkle_event_seqs + .entry((tree_pubkey, event_type)) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: *seq, + slot, + signature: signature.to_string(), + }); + } + } + } + + // Extract out_account leaf indexes + for account_with_context in &state_update.out_accounts { + let tree_pubkey = account_with_context.account.tree.0; + let leaf_index = account_with_context.account.leaf_index.0; + self.out_account_leaf_indexes + .entry(tree_pubkey) + .or_insert_with(Vec::new) + .push(SequenceEntry { + sequence: leaf_index, + slot, + signature: signature.to_string(), + }); + } + } +} + +/// Detects gaps from a single StateUpdateSequences struct +pub fn detect_gaps_from_sequences(sequences: &StateUpdateSequences) -> Vec { + detect_all_sequence_gaps(sequences) +} + +/// Comprehensive gap detection function that takes a vector of StateUpdateSequences and returns ALL gaps found +/// Aggregates sequences from multiple StateUpdates and detects gaps across all transactions +pub fn detect_all_sequence_gaps(sequences: &StateUpdateSequences) -> Vec { + let mut all_gaps = Vec::new(); + + // Check indexed tree updates + for ((tree_pubkey, tree_type_id), seqs) in &sequences.indexed_tree_seqs { + println!( + "DEBUG: Processing indexed_tree_seqs - tree: {}, tree_type_id: {}", + tree_pubkey, tree_type_id + ); + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, // TODO: use queue pubkey if we only have queue pubkey such as for outputs of batched trees + StateUpdateFieldType::IndexedTreeUpdate, + ); + all_gaps.extend(gaps); + } + + // Check leaf nullifications + for (tree_pubkey, seqs) in &sequences.nullification_seqs { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::LeafNullification, + ); + all_gaps.extend(gaps); + } + + // Check batch nullify context + for (tree_pubkey, entries) in &sequences.batch_nullify_queue_indexes { + if !entries.is_empty() { + let gaps = detect_sequence_gaps_with_metadata( + entries, + Some(*tree_pubkey), + None, + StateUpdateFieldType::BatchNullifyContext, + ); + all_gaps.extend(gaps); + } + } + + // Check batch new addresses + for (tree_pubkey, seqs) in &sequences.batch_address_queue_indexes { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::BatchNewAddress, + ); + all_gaps.extend(gaps); + } + + // Check batch merkle tree events + for ((tree_pubkey, event_type), seqs) in &sequences.batch_merkle_event_seqs { + let field_type = match event_type { + 1 => StateUpdateFieldType::BatchMerkleTreeEventAppend, + 2 => StateUpdateFieldType::BatchMerkleTreeEventNullify, + 3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend, + _ => continue, + }; + + let gaps = detect_sequence_gaps_with_metadata(seqs, Some(*tree_pubkey), None, field_type); + all_gaps.extend(gaps); + } + + // Check out_account leaf indexes + for (tree_pubkey, seqs) in &sequences.out_account_leaf_indexes { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::OutAccount, + ); + all_gaps.extend(gaps); + } + + all_gaps +} + +/// Detects gaps in a sequence with full metadata for gap filling +fn detect_sequence_gaps_with_metadata( + sequences: &[SequenceEntry], + tree_pubkey: Option, + queue_pubkey: Option, + field_type: StateUpdateFieldType, +) -> Vec { + if field_type == StateUpdateFieldType::BatchNullifyContext { + // For batch nullify context, we don't have tree or queue pubkey, so we can't detect gaps + return Vec::new(); + } + if sequences.len() < 2 { + return Vec::new(); + } + + let mut sorted_sequences = sequences.to_vec(); + sorted_sequences.sort_by_key(|entry| entry.sequence); + let mut gaps = Vec::new(); + let start_seq = if let Some(tree) = tree_pubkey { + let tree_str = tree.to_string(); + + let state = SEQUENCE_STATE.lock().unwrap(); + if let Some(current_seq) = state.get(&tree_str) { + println!( + "DEBUG: Using current sequence state for tree {}: {:?}", + tree_str, current_seq + ); + current_seq.clone() + } else { + warn!("No current sequence state found for tree {}", tree_str); + TreeTypeSeq::default() + } + } else if let Some(queue_pubkey) = queue_pubkey { + let queue_str = queue_pubkey.to_string(); + // This could be an issue in case of batched output queue updates. + let state = SEQUENCE_STATE.lock().unwrap(); + if let Some(current_seq) = state.get(&queue_str) { + current_seq.clone() + } else { + warn!("No current sequence state found for queue {}", queue_str); + TreeTypeSeq::default() + } + } else { + println!("field_type: {:?}", field_type); + println!( + "tree_pubkey: {:?}, queue_pubkey: {:?}", + tree_pubkey, queue_pubkey + ); + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + TreeTypeSeq::default() + }; + + let (unpacked_start_seq, start_entry) = match field_type { + StateUpdateFieldType::IndexedTreeUpdate => match start_seq { + TreeTypeSeq::AddressV1(entry) => { + println!( + "DEBUG: IndexedTreeUpdate with AddressV1, seq: {}", + entry.sequence + ); + (entry.sequence, Some(entry)) + } + _ => { + println!( + "DEBUG: IndexedTreeUpdate with unsupported tree type: {:?}", + start_seq + ); + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + }, + StateUpdateFieldType::BatchMerkleTreeEventAddressAppend => { + if let TreeTypeSeq::AddressV2(_, entry) = start_seq { + (entry.sequence, Some(entry)) + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + StateUpdateFieldType::BatchNewAddress => { + if let TreeTypeSeq::AddressV2(_, entry) = start_seq { + (entry.sequence, Some(entry)) + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + StateUpdateFieldType::BatchMerkleTreeEventAppend => { + if let TreeTypeSeq::StateV2(seq_context) = start_seq { + if let Some(entry) = &seq_context.batch_event_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + StateUpdateFieldType::BatchMerkleTreeEventNullify => { + if let TreeTypeSeq::StateV2(seq_context) = start_seq { + if let Some(entry) = &seq_context.batch_event_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + StateUpdateFieldType::LeafNullification => { + if let TreeTypeSeq::StateV1(entry) = start_seq { + (entry.sequence, Some(entry)) + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + StateUpdateFieldType::OutAccount => { + if let TreeTypeSeq::StateV1(entry) = start_seq { + (entry.sequence, Some(entry)) + } else if let TreeTypeSeq::StateV2(seq_context) = start_seq { + if let Some(entry) = &seq_context.output_queue_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + StateUpdateFieldType::BatchNullifyContext => { + if let TreeTypeSeq::StateV2(seq_context) = start_seq { + if let Some(entry) = &seq_context.input_queue_entry { + (entry.sequence, Some(entry.clone())) + } else { + (0, None) + } + } else { + warn!( + "No current sequence state found for queue {:?} and tree {:?}", + queue_pubkey, tree_pubkey + ); + (u64::MAX, None) + } + } + }; + + // Skip gap detection for tree initialization (when unpacked_start_seq == 0) + // because there's no previous sequence to compare against + // Also skip if unpacked_start_seq is u64::MAX (no state found) + if unpacked_start_seq > 0 && unpacked_start_seq != u64::MAX && sorted_sequences[0].sequence > unpacked_start_seq.saturating_add(1) { + let (before_slot, before_signature) = if let Some(entry) = start_entry { + (entry.slot, entry.signature) + } else { + (0, String::new()) + }; + + gaps.push(SequenceGap { + before_slot, + after_slot: sorted_sequences[0].slot, + before_signature, + after_signature: sorted_sequences[0].signature.clone(), + tree_pubkey, + field_type: field_type.clone(), + }); + } + for i in 1..sorted_sequences.len() { + let prev_entry = &sorted_sequences[i - 1]; + let curr_entry = &sorted_sequences[i]; + + if curr_entry.sequence - prev_entry.sequence > 1 { + gaps.push(SequenceGap { + before_slot: prev_entry.slot, + after_slot: curr_entry.slot, + before_signature: prev_entry.signature.clone(), + after_signature: curr_entry.signature.clone(), + tree_pubkey, + field_type: field_type.clone(), + }); + } + } + + gaps +} diff --git a/src/ingester/error.rs b/src/ingester/error.rs index 12b87ef1..a97404b2 100644 --- a/src/ingester/error.rs +++ b/src/ingester/error.rs @@ -14,6 +14,8 @@ pub enum IngesterError { EmptyBatchEvent, #[error("Invalid event.")] InvalidEvent, + #[error("Custom error: {0}")] + CustomError(String), } impl From for IngesterError { diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5a4a6764..caf3271d 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -16,6 +16,7 @@ use solana_pubkey::Pubkey; use solana_sdk::pubkey::Pubkey as SdkPubkey; use solana_sdk::signature::Signature; use tokio::time::sleep; +use tokio::sync::mpsc; use tracing::error; use yellowstone_grpc_client::{GeyserGrpcBuilderResult, GeyserGrpcClient, Interceptor}; use yellowstone_grpc_proto::convert_from::create_tx_error; @@ -30,6 +31,7 @@ use yellowstone_grpc_proto::solana::storage::confirmed_block::InnerInstructions; use crate::api::method::get_indexer_health::HEALTH_CHECK_SLOT_DISTANCE; use crate::common::typedefs::hash::Hash; use crate::ingester::fetchers::poller::get_block_poller_stream; +use crate::ingester::rewind_controller::RewindCommand; use crate::ingester::typedefs::block_info::{ BlockInfo, BlockMetadata, Instruction, InstructionGroup, TransactionInfo, }; @@ -43,6 +45,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, + rewind_receiver: Option>, ) -> impl Stream> { stream! { start_latest_slot_updater(rpc_client.clone()).await; @@ -53,6 +56,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + rewind_receiver, )) ); @@ -115,6 +119,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for timeout fallback ))); continue; } @@ -132,6 +137,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for out-of-order fallback ))); continue; } @@ -144,6 +150,7 @@ pub fn get_grpc_stream_with_rpc_fallback( rpc_client.clone(), last_indexed_slot, max_concurrent_block_fetches, + None, // No rewind receiver for unhealthy fallback ))); } } diff --git a/src/ingester/fetchers/mod.rs b/src/ingester/fetchers/mod.rs index cc3235da..1e787440 100644 --- a/src/ingester/fetchers/mod.rs +++ b/src/ingester/fetchers/mod.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use async_stream::stream; use futures::{pin_mut, Stream, StreamExt}; use solana_client::nonblocking::rpc_client::RpcClient; +use tokio::sync::mpsc; -use super::typedefs::block_info::BlockInfo; +use super::{typedefs::block_info::BlockInfo, rewind_controller::RewindCommand}; pub mod grpc; pub mod poller; @@ -17,10 +18,11 @@ pub struct BlockStreamConfig { pub geyser_url: Option, pub max_concurrent_block_fetches: usize, pub last_indexed_slot: u64, + pub rewind_receiver: Option>, } impl BlockStreamConfig { - pub fn load_block_stream(&self) -> impl Stream> { + pub fn load_block_stream(mut self) -> impl Stream> { let grpc_stream = self.geyser_url.as_ref().map(|geyser_url| { let auth_header = std::env::var("GRPC_X_TOKEN").unwrap(); get_grpc_stream_with_rpc_fallback( @@ -29,6 +31,7 @@ impl BlockStreamConfig { self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, + self.rewind_receiver.take(), ) }); @@ -37,6 +40,7 @@ impl BlockStreamConfig { self.rpc_client.clone(), self.last_indexed_slot, self.max_concurrent_block_fetches, + self.rewind_receiver.take(), )) } else { None diff --git a/src/ingester/fetchers/poller.rs b/src/ingester/fetchers/poller.rs index 729d20e5..424e3c1b 100644 --- a/src/ingester/fetchers/poller.rs +++ b/src/ingester/fetchers/poller.rs @@ -9,12 +9,16 @@ use futures::{pin_mut, Stream, StreamExt}; use solana_client::{ nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError, }; +use tokio::sync::mpsc; use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::{TransactionDetails, UiTransactionEncoding}; use crate::{ - ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, + ingester::{ + rewind_controller::RewindCommand, + typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}, + }, metric, monitor::{start_latest_slot_updater, LATEST_SLOT}, }; @@ -40,33 +44,66 @@ pub fn get_block_poller_stream( rpc_client: Arc, mut last_indexed_slot: u64, max_concurrent_block_fetches: usize, + mut rewind_receiver: Option>, ) -> impl Stream> { stream! { - let start_slot = match last_indexed_slot { + let mut current_start_slot = match last_indexed_slot { 0 => 0, last_indexed_slot => last_indexed_slot + 1 }; - let slot_stream = get_slot_stream(rpc_client.clone(), start_slot); - pin_mut!(slot_stream); - let block_stream = slot_stream - .map(|slot| { - let rpc_client = rpc_client.clone(); - async move { fetch_block_with_infinite_retries(rpc_client.clone(), slot).await } - }) - .buffer_unordered(max_concurrent_block_fetches); - pin_mut!(block_stream); - let mut block_cache: BTreeMap = BTreeMap::new(); - while let Some(block) = block_stream.next().await { - if let Some(block) = block { - block_cache.insert(block.metadata.slot, block); - } - let (blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot); - last_indexed_slot = last_indexed_slot_from_cache; - metric! { - statsd_count!("rpc_block_emitted", blocks_to_index.len() as i64); + + loop { + let slot_stream = get_slot_stream(rpc_client.clone(), current_start_slot); + pin_mut!(slot_stream); + let block_stream = slot_stream + .map(|slot| { + let rpc_client = rpc_client.clone(); + async move { fetch_block_with_infinite_retries(rpc_client.clone(), slot).await } + }) + .buffer_unordered(max_concurrent_block_fetches); + pin_mut!(block_stream); + let mut block_cache: BTreeMap = BTreeMap::new(); + let mut rewind_occurred = false; + + while let Some(block) = block_stream.next().await { + // Check for rewind commands before processing blocks + if let Some(ref mut receiver) = rewind_receiver { + while let Ok(command) = receiver.try_recv() { + match command { + RewindCommand::Rewind { to_slot, reason } => { + log::error!("Rewinding block stream to {}: {}", to_slot, reason); + // Clear cached blocks + block_cache.clear(); + // Reset positions + last_indexed_slot = to_slot - 1; + current_start_slot = to_slot; + rewind_occurred = true; + log::info!("Cleared cache, restarting from slot {}", current_start_slot); + break; + } + } + } + } + + if rewind_occurred { + break; // Exit inner loop to restart streams + } + + if let Some(block) = block { + block_cache.insert(block.metadata.slot, block); + } + let (blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot); + last_indexed_slot = last_indexed_slot_from_cache; + metric! { + statsd_count!("rpc_block_emitted", blocks_to_index.len() as i64); + } + if !blocks_to_index.is_empty() { + yield blocks_to_index; + } } - if !blocks_to_index.is_empty() { - yield blocks_to_index; + + if !rewind_occurred { + break; // Normal termination } } } diff --git a/src/ingester/indexer/mod.rs b/src/ingester/indexer/mod.rs index fa696d56..05101e03 100644 --- a/src/ingester/indexer/mod.rs +++ b/src/ingester/indexer/mod.rs @@ -8,7 +8,7 @@ use solana_client::nonblocking::rpc_client::RpcClient; use crate::{ common::fetch_current_slot_with_infinite_retry, dao::generated::blocks, - ingester::index_block_batch_with_infinite_retries, + ingester::{index_block_batch_with_infinite_retries, rewind_controller::RewindController}, }; use super::typedefs::block_info::BlockInfo; @@ -52,6 +52,7 @@ pub async fn index_block_stream( rpc_client: Arc, last_indexed_slot_at_start: u64, end_slot: Option, + rewind_controller: Option<&RewindController>, ) { pin_mut!(block_stream); let current_slot = @@ -71,28 +72,40 @@ pub async fn index_block_stream( while let Some(blocks) = block_stream.next().await { let last_slot_in_block = blocks.last().unwrap().metadata.slot; - index_block_batch_with_infinite_retries(db.as_ref(), blocks).await; - - for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) { - let blocks_indexed = slot - last_indexed_slot_at_start; - if blocks_indexed < number_of_blocks_to_backfill { - if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 { - info!( - "Backfilled {} / {} blocks", - blocks_indexed, number_of_blocks_to_backfill - ); - } - } else { - if finished_backfill_slot.is_none() { - info!("Finished backfilling historical blocks!"); - info!("Starting to index new blocks..."); - finished_backfill_slot = Some(slot); + match index_block_batch_with_infinite_retries(db.as_ref(), blocks, rewind_controller).await { + Ok(()) => { + for slot in (last_indexed_slot + 1)..(last_slot_in_block + 1) { + let blocks_indexed = slot - last_indexed_slot_at_start; + if blocks_indexed < number_of_blocks_to_backfill { + if blocks_indexed % PRE_BACKFILL_FREQUENCY == 0 { + info!( + "Backfilled {} / {} blocks", + blocks_indexed, number_of_blocks_to_backfill + ); + } + } else { + if finished_backfill_slot.is_none() { + info!("Finished backfilling historical blocks!"); + info!("Starting to index new blocks..."); + finished_backfill_slot = Some(slot); + } + if slot % POST_BACKFILL_FREQUENCY == 0 { + info!("Indexed slot {}", slot); + } + } + last_indexed_slot = slot; } - if slot % POST_BACKFILL_FREQUENCY == 0 { - info!("Indexed slot {}", slot); + } + Err(e) => { + if e.to_string().contains("Gap detection triggered rewind") { + // Gap detected, rewind triggered - the slot stream should handle repositioning + log::info!("Gap detection triggered rewind"); + continue; + } else { + log::error!("Unexpected error in block processing: {}", e); + sleep(Duration::from_secs(1)); } } - last_indexed_slot = slot; } } } diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index f2934d47..b7ca6e52 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -6,12 +6,11 @@ use error::IngesterError; use parser::parse_transaction; use sea_orm::sea_query::OnConflict; -use sea_orm::ConnectionTrait; use sea_orm::DatabaseConnection; use sea_orm::DatabaseTransaction; +use sea_orm::{ConnectionTrait, QueryTrait}; use sea_orm::EntityTrait; -use sea_orm::QueryTrait; use sea_orm::Set; use sea_orm::TransactionTrait; @@ -21,26 +20,78 @@ use self::persist::MAX_SQL_INSERTS; use self::typedefs::block_info::BlockInfo; use self::typedefs::block_info::BlockMetadata; use crate::dao::generated::blocks; +use crate::ingester::detect_gaps::SEQUENCE_STATE; use crate::metric; +pub mod detect_gaps; pub mod error; pub mod fetchers; pub mod indexer; pub mod parser; pub mod persist; +pub mod rewind_controller; pub mod typedefs; -fn derive_block_state_update(block: &BlockInfo) -> Result { +fn derive_block_state_update( + block: &BlockInfo, + rewind_controller: Option<&rewind_controller::RewindController>, +) -> Result { + use crate::ingester::detect_gaps::{detect_gaps_from_sequences, StateUpdateSequences}; + let mut state_updates: Vec = Vec::new(); + let mut sequences = StateUpdateSequences::default(); + + // Parse each transaction and extract sequences with proper context for transaction in &block.transactions { - state_updates.push(parse_transaction(transaction, block.metadata.slot)?); + let state_update = parse_transaction(transaction, block.metadata.slot)?; + + // Extract sequences with proper slot and signature context + sequences.extract_state_update_sequences( + &state_update, + block.metadata.slot, + &transaction.signature.to_string(), + ); + + state_updates.push(state_update); } + + // Check for gaps with proper context + let gaps = detect_gaps_from_sequences(&sequences); + if !gaps.is_empty() { + tracing::warn!( + "Gaps detected in block {} sequences: {gaps:?}", + block.metadata.slot + ); + + // Request rewind if controller is available + if let Some(controller) = rewind_controller { + if let Err(e) = controller.request_rewind_for_gaps(&gaps) { + tracing::error!( + "Failed to request rewind for gaps in block {}: {}", + block.metadata.slot, + e + ); + return Err(IngesterError::CustomError( + "Gap detection triggered rewind failure".to_string(), + )); + } + // Return early after requesting rewind - don't continue processing + return Err(IngesterError::CustomError( + "Gap detection triggered rewind".to_string(), + )); + } + } + + // Update sequence state with latest observed sequences + crate::ingester::detect_gaps::update_sequence_state(&sequences); + Ok(StateUpdate::merge_updates(state_updates)) } pub async fn index_block(db: &DatabaseConnection, block: &BlockInfo) -> Result<(), IngesterError> { let txn = db.begin().await?; index_block_metadatas(&txn, vec![&block.metadata]).await?; - persist_state_update(&txn, derive_block_state_update(block)?).await?; + derive_block_state_update(block, None)?; + persist_state_update(&txn, derive_block_state_update(block, None)?).await?; txn.commit().await?; Ok(()) } @@ -81,6 +132,7 @@ async fn index_block_metadatas( pub async fn index_block_batch( db: &DatabaseConnection, block_batch: &Vec, + rewind_controller: Option<&rewind_controller::RewindController>, ) -> Result<(), IngesterError> { let blocks_len = block_batch.len(); let tx = db.begin().await?; @@ -88,7 +140,7 @@ pub async fn index_block_batch( index_block_metadatas(&tx, block_metadatas).await?; let mut state_updates = Vec::new(); for block in block_batch { - state_updates.push(derive_block_state_update(block)?); + state_updates.push(derive_block_state_update(block, rewind_controller)?); } persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?; metric! { @@ -101,11 +153,32 @@ pub async fn index_block_batch( pub async fn index_block_batch_with_infinite_retries( db: &DatabaseConnection, block_batch: Vec, -) { + rewind_controller: Option<&rewind_controller::RewindController>, +) -> Result<(), IngesterError> { loop { - match index_block_batch(db, &block_batch).await { - Ok(()) => return, + log::info!( + "amt sequence state {:?}", + SEQUENCE_STATE + .lock() + .unwrap() + .get("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2") + ); + log::info!( + "smt sequence state {:?}", + SEQUENCE_STATE + .lock() + .unwrap() + .get("smt1NamzXdq4AMqS2fS2F1i5KTYPZRhoHgWx38d8WsT") + ); + match index_block_batch(db, &block_batch, rewind_controller).await { + Ok(()) => return Ok(()), Err(e) => { + // Check if this is a gap-triggered rewind error + if e.to_string().contains("Gap detection triggered rewind") { + // Don't retry, propagate the rewind error up + return Err(e); + } + let start_block = block_batch.first().unwrap().metadata.slot; let end_block = block_batch.last().unwrap().metadata.slot; log::error!( diff --git a/src/ingester/parser/state_update.rs b/src/ingester/parser/state_update.rs index 549a2284..a5f7c5da 100644 --- a/src/ingester/parser/state_update.rs +++ b/src/ingester/parser/state_update.rs @@ -76,7 +76,7 @@ pub struct AddressQueueUpdate { impl From for AddressQueueUpdate { fn from(new_address: NewAddress) -> Self { AddressQueueUpdate { - tree: SerializablePubkey::from(new_address.mt_pubkey), + tree: SerializablePubkey::from(new_address.tree_pubkey), address: new_address.address, queue_index: new_address.queue_index, } diff --git a/src/ingester/parser/tree_info.rs b/src/ingester/parser/tree_info.rs index 3c073a7d..e792f0a3 100644 --- a/src/ingester/parser/tree_info.rs +++ b/src/ingester/parser/tree_info.rs @@ -1,3 +1,4 @@ +use crate::ingester::detect_gaps::SequenceEntry; use lazy_static::lazy_static; use light_compressed_account::TreeType; use solana_pubkey::{pubkey, Pubkey}; @@ -11,6 +12,37 @@ pub struct TreeInfo { pub tree_type: TreeType, } +#[derive(Debug, Clone)] +pub enum TreeTypeSeq { + StateV1(SequenceEntry), + // Output queue (leaf index), Input queue index, Batch event seq with context + StateV2(StateV2SeqWithContext), + // event seq with complete context + AddressV1(SequenceEntry), + // Input queue index, Batch event seq with context + AddressV2(SequenceEntry, SequenceEntry), // (input_queue_entry, batch_event_entry) +} + +impl Default for TreeTypeSeq { + fn default() -> Self { + TreeTypeSeq::StateV1(SequenceEntry::default()) + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct StateV2Seq { + pub input_queue_index: u64, + pub batch_event_seq: u64, + pub output_queue_index: u64, +} + +#[derive(Debug, Clone, Default)] +pub struct StateV2SeqWithContext { + pub input_queue_entry: Option, + pub batch_event_entry: Option, + pub output_queue_entry: Option, +} + impl TreeInfo { pub fn get(pubkey: &str) -> Option<&TreeInfo> { QUEUE_TREE_MAPPING.get(pubkey) diff --git a/src/ingester/parser/tx_event_parser_v2.rs b/src/ingester/parser/tx_event_parser_v2.rs index 389dfd74..18db1c0d 100644 --- a/src/ingester/parser/tx_event_parser_v2.rs +++ b/src/ingester/parser/tx_event_parser_v2.rs @@ -131,19 +131,18 @@ pub fn create_state_update_v2( .batch_nullify_context .extend(event.batch_input_accounts.clone()); - state_update_event - .batch_new_addresses - .extend( - event - .new_addresses - .clone() - .iter() - .map(|x| AddressQueueUpdate { - tree: SerializablePubkey::from(x.mt_pubkey), - address: x.address, - queue_index: x.queue_index, - }), - ); + state_update_event.batch_new_addresses.extend( + event + .new_addresses + .clone() + .iter() + .filter(|x| x.queue_index != u64::MAX) // Exclude AddressV1 trees + .map(|x| AddressQueueUpdate { + tree: SerializablePubkey::from(x.tree_pubkey), + address: x.address, + queue_index: x.queue_index, + }), + ); state_updates.push(state_update_event); } diff --git a/src/ingester/rewind_controller.rs b/src/ingester/rewind_controller.rs new file mode 100644 index 00000000..5a51b253 --- /dev/null +++ b/src/ingester/rewind_controller.rs @@ -0,0 +1,126 @@ +use tokio::sync::mpsc; +use thiserror::Error; +use crate::ingester::detect_gaps::SequenceGap; + +#[derive(Debug, Clone)] +pub enum RewindCommand { + Rewind { + to_slot: u64, + reason: String, + }, +} + +#[derive(Debug, Error)] +pub enum RewindError { + #[error("Failed to send rewind command: {0}")] + SendError(String), + #[error("Invalid rewind slot: {0}")] + InvalidSlot(u64), +} + +#[derive(Debug, Clone)] +pub struct RewindController { + sender: mpsc::UnboundedSender, +} + +impl RewindController { + pub fn new() -> (Self, mpsc::UnboundedReceiver) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Self { sender }, receiver) + } + + pub fn request_rewind(&self, to_slot: u64, reason: String) -> Result<(), RewindError> { + let command = RewindCommand::Rewind { to_slot, reason }; + self.sender + .send(command) + .map_err(|e| RewindError::SendError(e.to_string()))?; + Ok(()) + } + + pub fn request_rewind_for_gaps(&self, gaps: &[SequenceGap]) -> Result<(), RewindError> { + if gaps.is_empty() { + return Ok(()); + } + + let rewind_slot = determine_rewind_slot_from_gaps(gaps); + let gap_count = gaps.len(); + let reason = format!("Sequence gaps detected: {} gaps found", gap_count); + + tracing::warn!("Requesting rewind to slot {} due to {} sequence gaps", rewind_slot, gap_count); + self.request_rewind(rewind_slot, reason) + } +} + +/// Determines the appropriate rewind slot based on detected gaps +/// Uses the earliest before_slot from all gaps to ensure we capture all missing data +fn determine_rewind_slot_from_gaps(gaps: &[SequenceGap]) -> u64 { + gaps.iter() + .map(|gap| gap.before_slot) + .filter(|&slot| slot > 0) // Filter out zero slots from initialization + .min() + .unwrap_or(0) // Fallback to slot 0 if no valid slots found +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ingester::detect_gaps::{SequenceGap, StateUpdateFieldType}; + use solana_pubkey::Pubkey; + + #[test] + fn test_rewind_controller_creation() { + let (controller, _receiver) = RewindController::new(); + let result = controller.request_rewind(100, "test rewind".to_string()); + assert!(result.is_ok()); + } + + #[test] + fn test_determine_rewind_slot_from_gaps() { + let gaps = vec![ + SequenceGap { + before_slot: 1000, + after_slot: 1002, + before_signature: "sig1".to_string(), + after_signature: "sig2".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + SequenceGap { + before_slot: 995, + after_slot: 997, + before_signature: "sig3".to_string(), + after_signature: "sig4".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + ]; + + let rewind_slot = determine_rewind_slot_from_gaps(&gaps); + assert_eq!(rewind_slot, 995); // Should pick the earliest before_slot + } + + #[test] + fn test_determine_rewind_slot_filters_zero_slots() { + let gaps = vec![ + SequenceGap { + before_slot: 0, // Should be filtered out + after_slot: 1002, + before_signature: "".to_string(), + after_signature: "sig2".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + SequenceGap { + before_slot: 995, + after_slot: 997, + before_signature: "sig3".to_string(), + after_signature: "sig4".to_string(), + tree_pubkey: Some(Pubkey::new_unique()), + field_type: StateUpdateFieldType::IndexedTreeUpdate, + }, + ]; + + let rewind_slot = determine_rewind_slot_from_gaps(&gaps); + assert_eq!(rewind_slot, 995); // Should ignore slot 0 and pick 995 + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 185013ac..2385ee6a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -174,6 +174,7 @@ fn continously_index_new_blocks( db: Arc, rpc_client: Arc, last_indexed_slot: u64, + rewind_controller: Option, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let block_stream = block_stream_config.load_block_stream(); @@ -183,6 +184,7 @@ fn continously_index_new_blocks( rpc_client.clone(), last_indexed_slot, None, + rewind_controller.as_ref(), ) .await; }) @@ -234,6 +236,7 @@ async fn main() { rpc_client.clone(), last_indexed_slot, Some(last_slot), + None, ) .await; } @@ -277,11 +280,15 @@ async fn main() { .unwrap(), }; + // Create rewind controller for gap detection + let (rewind_controller, rewind_receiver) = photon_indexer::ingester::rewind_controller::RewindController::new(); + let block_stream_config = BlockStreamConfig { rpc_client: rpc_client.clone(), max_concurrent_block_fetches, last_indexed_slot, geyser_url: args.grpc_url, + rewind_receiver: Some(rewind_receiver), }; ( @@ -290,6 +297,7 @@ async fn main() { db_conn.clone(), rpc_client.clone(), last_indexed_slot, + Some(rewind_controller), )), Some(continously_monitor_photon( db_conn.clone(), diff --git a/src/snapshot/mod.rs b/src/snapshot/mod.rs index ee669507..38045615 100644 --- a/src/snapshot/mod.rs +++ b/src/snapshot/mod.rs @@ -451,11 +451,12 @@ pub async fn update_snapshot( incremental_snapshot_interval_slots: u64, ) { // Convert stream to iterator + let last_indexed_slot = block_stream_config.last_indexed_slot; let block_stream = block_stream_config.load_block_stream(); update_snapshot_helper( directory_adapter, block_stream, - block_stream_config.last_indexed_slot, + last_indexed_slot, incremental_snapshot_interval_slots, full_snapshot_interval_slots, ) diff --git a/src/snapshot/snapshotter/main.rs b/src/snapshot/snapshotter/main.rs index 9e6c9735..d1a9af81 100644 --- a/src/snapshot/snapshotter/main.rs +++ b/src/snapshot/snapshotter/main.rs @@ -256,6 +256,7 @@ async fn main() { max_concurrent_block_fetches: args.max_concurrent_block_fetches.unwrap_or(20), last_indexed_slot, geyser_url: args.grpc_url.clone(), + rewind_receiver: None, // No rewind support for snapshotter }, args.incremental_snapshot_interval_slots, args.snapshot_interval_slots, diff --git a/tests/integration_tests/main.rs b/tests/integration_tests/main.rs index a0f68b11..8909d3f5 100644 --- a/tests/integration_tests/main.rs +++ b/tests/integration_tests/main.rs @@ -9,5 +9,8 @@ mod mock_tests; mod open_api_tests; mod prod_tests; mod snapshot_tests; +mod test_v1_address_tree_sequence_consistency; +mod test_v1_address_tree_gap_filler; mod utils; +mod snapshot_test_utils; mod zeroeth_element_fix_test; diff --git a/tests/integration_tests/snapshot_test_utils.rs b/tests/integration_tests/snapshot_test_utils.rs new file mode 100644 index 00000000..a7838663 --- /dev/null +++ b/tests/integration_tests/snapshot_test_utils.rs @@ -0,0 +1,318 @@ +use anyhow::{Context, Result}; +use futures::stream; +use photon_indexer::ingester::parser::get_compression_program_id; +use photon_indexer::ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}; +use photon_indexer::snapshot::{ + create_snapshot_from_byte_stream, load_block_stream_from_directory_adapter, + load_byte_stream_from_directory_adapter, DirectoryAdapter, +}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config; +use solana_sdk::signature::Signature; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::Arc; + +/// Test utility to create a snapshot file from compression transactions found on-chain +pub async fn create_test_snapshot_from_compression_transactions( + rpc_url: &str, + target_slot: u64, + snapshot_dir_path: &str, +) -> Result { + println!("Connecting to RPC: {}", rpc_url); + let client = RpcClient::new(rpc_url.to_string()); + + // Step 1: Fetch compression transaction signatures from current slot down to target slot + let (signatures, signature_to_slot_map) = fetch_compression_signatures_until_slot(&client, target_slot).await?; + println!("Found {} compression transaction signatures:", signatures.len()); + for (i, signature) in signatures.iter().enumerate() { + println!(" {}. {}", i + 1, signature); + } + + if signatures.is_empty() { + return Err(anyhow::anyhow!("No compression transactions found on devnet")); + } + + // Step 2: Extract unique slots from signature info (we already have this data!) + let slots: HashSet = signatures.iter() + .filter_map(|sig| signature_to_slot_map.get(sig)) + .copied() + .collect(); + + let mut slots: Vec = slots.into_iter().collect(); + slots.sort(); + println!("Found {} unique slots with compression transactions:", slots.len()); + for (i, slot) in slots.iter().enumerate() { + println!(" {}. Slot {}", i + 1, slot); + } + + // Step 3: Fetch blocks for these slots + let mut blocks = Vec::new(); + for (i, slot) in slots.iter().enumerate() { + match client.get_block_with_config( + *slot, + solana_client::rpc_config::RpcBlockConfig { + encoding: Some(solana_transaction_status::UiTransactionEncoding::Base64), + transaction_details: Some(solana_transaction_status::TransactionDetails::Full), + rewards: None, + commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }, + ).await { + Ok(block) => { + match parse_ui_confirmed_blocked(block, *slot) { + Ok(block_info) => { + let block_time = std::time::UNIX_EPOCH + std::time::Duration::from_secs(block_info.metadata.block_time as u64); + let datetime = std::time::SystemTime::now().duration_since(block_time) + .map(|d| format!("{:.1} seconds ago", d.as_secs_f64())) + .unwrap_or_else(|_| format!("timestamp: {}", block_info.metadata.block_time)); + println!("Successfully parsed block at slot {} ({} transactions, {}) [{}/{}]", + slot, block_info.transactions.len(), datetime, i + 1, slots.len()); + blocks.push(block_info); + } + Err(e) => { + eprintln!("Failed to parse block at slot {}: {}", slot, e); + } + } + } + Err(e) => { + eprintln!("Failed to fetch block at slot {}: {}", slot, e); + } + } + } + + if blocks.is_empty() { + return Err(anyhow::anyhow!("No blocks could be fetched and parsed")); + } + + println!("Successfully fetched and parsed {} blocks", blocks.len()); + + // Step 4: Create snapshot from blocks + let snapshot_dir = std::path::PathBuf::from(snapshot_dir_path); + std::fs::create_dir_all(&snapshot_dir)?; + + let snapshot_dir_str = snapshot_dir.to_str().unwrap().to_string(); + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir_path.to_string())); + + // Clear any existing snapshots + let existing_snapshots = photon_indexer::snapshot::get_snapshot_files_with_metadata(directory_adapter.as_ref()).await?; + for snapshot in existing_snapshots { + directory_adapter.delete_file(snapshot.file).await?; + } + + // Sort blocks by slot to ensure proper ordering + blocks.sort_by_key(|block| block.metadata.slot); + + + // Calculate the total slot range to write everything into one file + let first_slot = blocks.first().map(|b| b.metadata.slot).unwrap_or(target_slot + 1); + let last_slot = blocks.last().map(|b| b.metadata.slot).unwrap_or(target_slot + 1); + let slot_range = last_slot - first_slot + 1; + + println!("Writing all blocks from slot {} to {} into one snapshot file (range: {} slots)", + first_slot, last_slot, slot_range); + + // Create snapshot file directly without using update_snapshot_helper + let snapshot_filename = format!("snapshot-{}-{}", first_slot, last_slot); + let snapshot_path = snapshot_dir.join(&snapshot_filename); + + println!("Writing snapshot directly to: {:?}", snapshot_path); + + // Serialize all blocks directly (no version header in individual files) + let mut snapshot_data = Vec::new(); + + // Add serialized blocks only (header is added when reading multiple files) + for block in &blocks { + // Filter for compression transactions only + let trimmed_block = photon_indexer::ingester::typedefs::block_info::BlockInfo { + metadata: block.metadata.clone(), + transactions: block.transactions.iter() + .filter(|tx| photon_indexer::snapshot::is_compression_transaction(tx)) + .cloned() + .collect(), + }; + let block_bytes = bincode::serialize(&trimmed_block).unwrap(); + snapshot_data.extend(block_bytes); + } + + // Write snapshot file directly + let data_len = snapshot_data.len(); + std::fs::write(&snapshot_path, snapshot_data)?; + println!("Successfully wrote snapshot file: {:?} ({} bytes)", snapshot_path, data_len); + + println!("Snapshot created successfully in directory: {}", snapshot_dir_str); + + // Debug: List created snapshot files + let created_snapshots = photon_indexer::snapshot::get_snapshot_files_with_metadata(directory_adapter.as_ref()).await?; + println!("Created {} snapshot files:", created_snapshots.len()); + for snapshot in &created_snapshots { + println!(" - {} (slots {} to {})", snapshot.file, snapshot.start_slot, snapshot.end_slot); + } + + Ok(snapshot_dir_str) +} + + +/// Validate that photon can parse the generated snapshot +pub async fn validate_snapshot_parsing(snapshot_dir: &str) -> Result> { + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir.to_string())); + + // Load and parse the snapshot + let block_stream = load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + let blocks: Vec> = futures::StreamExt::collect(block_stream).await; + let blocks: Vec = blocks.into_iter().flatten().collect(); + + println!("Successfully parsed {} blocks from snapshot", blocks.len()); + + // Validate that all blocks contain only compression transactions + for (i, block) in blocks.iter().enumerate() { + println!("Block {} at slot {}: {} transactions", + i, block.metadata.slot, block.transactions.len()); + + for (j, tx) in block.transactions.iter().enumerate() { + let is_compression = photon_indexer::snapshot::is_compression_transaction(tx); + if !is_compression { + return Err(anyhow::anyhow!( + "Block {} transaction {} is not a compression transaction", i, j + )); + } + } + } + + println!("All transactions in snapshot are compression transactions āœ“"); + Ok(blocks) +} + +/// Test round-trip: create snapshot and reload it via byte stream +pub async fn test_snapshot_roundtrip(snapshot_dir: &str) -> Result<()> { + let source_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir.to_string())); + + // Create a second directory for the round-trip test + let roundtrip_dir = std::path::PathBuf::from("target").join("test_snapshots").join("roundtrip"); + std::fs::create_dir_all(&roundtrip_dir)?; + let roundtrip_dir_str = roundtrip_dir.to_str().unwrap().to_string(); + let target_adapter = Arc::new(DirectoryAdapter::from_local_directory(roundtrip_dir_str)); + + // Load byte stream from source + let byte_stream = load_byte_stream_from_directory_adapter(source_adapter.clone()).await; + + // Create snapshot from byte stream in target + create_snapshot_from_byte_stream(byte_stream, target_adapter.as_ref()).await?; + + // Load blocks from both snapshots and compare + let source_blocks = load_block_stream_from_directory_adapter(source_adapter).await; + let source_blocks: Vec> = futures::StreamExt::collect(source_blocks).await; + let source_blocks: Vec = source_blocks.into_iter().flatten().collect(); + + let target_blocks = load_block_stream_from_directory_adapter(target_adapter).await; + let target_blocks: Vec> = futures::StreamExt::collect(target_blocks).await; + let target_blocks: Vec = target_blocks.into_iter().flatten().collect(); + + if source_blocks.len() != target_blocks.len() { + return Err(anyhow::anyhow!( + "Block count mismatch: source={}, target={}", + source_blocks.len(), + target_blocks.len() + )); + } + + for (i, (source_block, target_block)) in source_blocks.iter().zip(target_blocks.iter()).enumerate() { + if source_block != target_block { + return Err(anyhow::anyhow!("Block {} differs between source and target", i)); + } + } + + println!("Round-trip test passed: {} blocks match exactly", source_blocks.len()); + Ok(()) +} + +async fn fetch_compression_signatures_until_slot( + client: &RpcClient, + target_slot: u64, +) -> Result<(Vec, std::collections::HashMap)> { + let mut signatures = Vec::new(); + let mut signature_to_slot_map = std::collections::HashMap::new(); + let mut before = None; + + println!("Fetching ALL compression signatures from current slot down to slot {}", target_slot); + + loop { + let config = GetConfirmedSignaturesForAddress2Config { + before, + until: None, + limit: None, // No limit - fetch as many as possible per batch + commitment: None, + }; + + let compression_program_id = solana_sdk::pubkey::Pubkey::new_from_array(get_compression_program_id().to_bytes()); + println!("Fetching signatures for compression program: {}", compression_program_id); + let batch = client + .get_signatures_for_address_with_config(&compression_program_id, config) + .await + .context("Failed to fetch signatures for compression program")?; + + println!("Fetched {} signatures in this batch", batch.len()); + + + + let mut reached_target_slot = false; + for sig_info in &batch { + // Check if we've reached the target slot + if sig_info.slot < target_slot { + reached_target_slot = true; + break; + } + + // Skip failed transactions + if sig_info.err.is_some() { + continue; + } + + let signature = Signature::from_str(&sig_info.signature) + .context("Failed to parse signature")?; + signatures.push(signature); + signature_to_slot_map.insert(signature, sig_info.slot); + } + + if reached_target_slot { + // Stop when no more signatures or reached target slot + break; + } + + before = batch.last().map(|sig| Signature::from_str(&sig.signature).unwrap()); + } + + println!("Found {} total compression signatures down to slot {}", signatures.len(), target_slot); + Ok((signatures, signature_to_slot_map)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[ignore] // Remove this to run the test + async fn test_create_snapshot_from_compression_transactions() { + let snapshot_dir = create_test_snapshot_from_compression_transactions( + "https://api.devnet.solana.com", + 10, // Fetch 10 compression transactions + "target/test_snapshots/devnet" + ) + .await + .expect("Failed to create test snapshot"); + + let blocks = validate_snapshot_parsing(&snapshot_dir) + .await + .expect("Failed to validate snapshot parsing"); + + assert!(!blocks.is_empty(), "Snapshot should contain blocks"); + + test_snapshot_roundtrip(&snapshot_dir) + .await + .expect("Round-trip test failed"); + + println!("Test completed successfully!"); + println!("Snapshot directory: {}", snapshot_dir); + println!("Parsed {} blocks from snapshot", blocks.len()); + } +} \ No newline at end of file diff --git a/tests/integration_tests/snapshot_tests.rs b/tests/integration_tests/snapshot_tests.rs index 891bb1ab..97012d88 100644 --- a/tests/integration_tests/snapshot_tests.rs +++ b/tests/integration_tests/snapshot_tests.rs @@ -8,6 +8,11 @@ use photon_indexer::snapshot::{ load_block_stream_from_directory_adapter, load_byte_stream_from_directory_adapter, update_snapshot_helper, R2BucketArgs, R2DirectoryAdapter, }; + +use crate::snapshot_test_utils::{ + create_test_snapshot_from_compression_transactions, + validate_snapshot_parsing, +}; use s3::creds::Credentials; use s3::Region; @@ -115,3 +120,34 @@ async fn test_basic_snapshotting() { assert_eq!(snapshot_blocks_v2, blocks); } } + +#[tokio::test] +async fn test_compression_snapshot_creation_and_parsing() { + // Get API key from environment + let api_key = std::env::var("API_KEY") + .expect("API_KEY environment variable must be set (export API_KEY=\"your-api-key\")"); + + let rpc_url = format!("https://devnet.helius-rpc.com/?api-key={}", api_key); + let snapshot_dir_path = "target/test_snapshots/devnet"; + + // Create snapshot from real compression transactions + let snapshot_dir = create_test_snapshot_from_compression_transactions( + &rpc_url, + 391843372, // Target slot - fetch all compression transactions from current slot down to this slot + snapshot_dir_path, + ) + .await + .expect("Failed to create test snapshot from compression transactions"); + + // Validate that photon can parse the snapshot + let blocks = validate_snapshot_parsing(&snapshot_dir) + .await + .expect("Failed to validate snapshot parsing"); + + assert!(!blocks.is_empty(), "Snapshot should contain blocks"); + println!("Successfully parsed {} blocks from compression snapshot", blocks.len()); + + println!("āœ“ Compression snapshot test completed successfully!"); + println!("āœ“ Snapshot directory: {}", snapshot_dir); + println!("āœ“ Validated photon can parse the generated snapshot"); +} diff --git a/tests/integration_tests/test_v1_address_tree_gap_filler.rs b/tests/integration_tests/test_v1_address_tree_gap_filler.rs new file mode 100644 index 00000000..8eea0380 --- /dev/null +++ b/tests/integration_tests/test_v1_address_tree_gap_filler.rs @@ -0,0 +1,602 @@ +use anyhow::Result; +use futures::StreamExt; +use light_compressed_account::TreeType; +use photon_indexer::ingester::parser::{parse_transaction, state_update::IndexedTreeLeafUpdate}; +use photon_indexer::ingester::typedefs::block_info::{parse_ui_confirmed_blocked, BlockInfo}; +use photon_indexer::snapshot::{load_block_stream_from_directory_adapter, DirectoryAdapter}; +use solana_client::nonblocking::rpc_client::RpcClient; +use solana_client::rpc_client::GetConfirmedSignaturesForAddress2Config; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use solana_pubkey::{pubkey, Pubkey}; +use solana_sdk::signature::Signature; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; +use std::sync::Arc; + +// Import the new gap detection functions +use crate::test_v1_address_tree_sequence_consistency::{ + StateUpdateSequences, SequenceGap, StateUpdateFieldType, + detect_gaps_from_sequences +}; + +// V1 Address Tree Pubkey - the only v1 address tree +const V1_ADDRESS_TREE: Pubkey = pubkey!("amt1Ayt45jfbdw5YSo7iz6WZxUmnZsQTYXy82hVwyC2"); + +#[tokio::test] +async fn test_fill_v1_address_tree_gaps() -> Result<()> { + println!("šŸ”§ Testing Comprehensive Gap Filling for All StateUpdate Fields"); + + // Step 1: Load existing snapshot and detect ALL gaps using comprehensive gap detection + let gaps = analyze_existing_snapshot_for_all_gaps().await?; + + if gaps.is_empty() { + println!("āœ… No gaps found in existing snapshot"); + return Ok(()); + } + + println!("šŸ” Found {} gaps to fill across all StateUpdate fields:", gaps.len()); + + // Group and display gaps by field type + let mut gaps_by_field: HashMap> = HashMap::new(); + for gap in &gaps { + gaps_by_field.entry(gap.field_type.clone()).or_insert_with(Vec::new).push(gap); + } + + for (field_type, field_gaps) in &gaps_by_field { + println!(" {:?}: {} gaps", field_type, field_gaps.len()); + } + + // Step 2: Fetch missing blocks using signature-based approach + println!("šŸŽÆ Processing all {} gaps across all StateUpdate fields", gaps.len()); + + let (mut missing_blocks, mut missing_updates) = fetch_missing_blocks(&gaps).await?; + + // Step 3: Update snapshot with signature-based results + if !missing_blocks.is_empty() { + update_snapshot_with_missing_blocks(&missing_blocks).await?; + println!("āœ… Updated snapshot with {} signature-based blocks", missing_blocks.len()); + } + + // Step 4: Validate and fallback for remaining gaps + println!("šŸ” Checking for remaining gaps after signature-based approach..."); + let remaining_gaps = analyze_existing_snapshot_for_all_gaps().await?; + + if remaining_gaps.is_empty() { + println!("āœ… All gaps filled by signature-based approach!"); + } else { + println!("āš ļø Still have {} gaps - triggering slot-range fallback", remaining_gaps.len()); + + // Get RPC client for fallback + let rpc_url = std::env::var("RPC_URL") + .unwrap_or_else(|_| "https://api.devnet.solana.com".to_string()); + let client = RpcClient::new(rpc_url); + + // Rebuild existing slots index after snapshot update + let updated_existing_slots = build_existing_slot_index().await?; + let (fallback_blocks, fallback_updates) = validate_and_fallback_gap_filling(&client, &remaining_gaps, &updated_existing_slots).await?; + + if !fallback_blocks.is_empty() { + let fallback_count = fallback_blocks.len(); + update_snapshot_with_missing_blocks(&fallback_blocks).await?; + missing_blocks.extend(fallback_blocks); + missing_updates.extend(fallback_updates); + println!("āœ… Updated snapshot with {} additional fallback blocks", fallback_count); + } + } + + println!("šŸŽÆ Total blocks added: {}, V1 updates: {}", missing_blocks.len(), missing_updates.len()); + + // Step 5: Final verification + verify_gaps_filled().await?; + + println!("šŸŽ‰ Comprehensive gap filling completed!"); + + Ok(()) +} + +async fn analyze_existing_snapshot_for_all_gaps() -> Result> { + println!("šŸ“‚ Analyzing existing snapshot for ALL gaps using comprehensive gap detection..."); + + let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string())); + + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + let all_blocks: Vec> = block_stream.collect().await; + let blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + println!("šŸ“¦ Processing {} blocks from snapshot", blocks.len()); + + // Extract sequences from all StateUpdates using the new system + let mut sequences = StateUpdateSequences::default(); + let mut total_transactions = 0; + let mut parsed_transactions = 0; + + for block in blocks { + let slot = block.metadata.slot; + total_transactions += block.transactions.len(); + + for transaction in &block.transactions { + let signature = transaction.signature.to_string(); + + // Parse each transaction to extract state updates + match parse_transaction(transaction, slot) { + Ok(state_update) => { + parsed_transactions += 1; + + // Extract sequences with context using the new method + sequences.extract_state_update_sequences(&state_update, slot, &signature); + } + Err(_) => { + // Skip failed parsing - compression transactions might have parsing issues + continue; + } + } + } + } + + println!("šŸ“Š Parsed {}/{} transactions successfully", parsed_transactions, total_transactions); + + // Detect gaps across ALL StateUpdate fields using the comprehensive system + let all_gaps = detect_gaps_from_sequences(&sequences); + + println!("šŸ” Found {} total gaps across all StateUpdate fields", all_gaps.len()); + + Ok(all_gaps) +} + +async fn analyze_existing_snapshot() -> Result> { + println!("šŸ“‚ Analyzing existing snapshot for V1 address tree gaps..."); + + // Get all gaps first + let all_gaps = analyze_existing_snapshot_for_all_gaps().await?; + + // Filter for V1 address tree gaps only (for backward compatibility) + let v1_gaps: Vec = all_gaps.into_iter() + .filter(|gap| { + gap.field_type == StateUpdateFieldType::IndexedTreeUpdate && + gap.tree_pubkey == Some(V1_ADDRESS_TREE) + }) + .collect(); + + println!("šŸŽÆ Found {} gaps specifically in V1 address tree", v1_gaps.len()); + + Ok(v1_gaps) +} + +/// Build a HashSet of all slot numbers that already exist in the current snapshot +async fn build_existing_slot_index() -> Result> { + let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string())); + + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + let all_blocks: Vec> = block_stream.collect().await; + let blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + let existing_slots: HashSet = blocks.iter() + .map(|block| block.metadata.slot) + .collect(); + + Ok(existing_slots) +} + +/// Calculate global gap boundaries across all gaps +fn calculate_global_gap_boundaries(gaps: &[SequenceGap]) -> (u64, u64, String, String) { + let min_slot = gaps.iter().map(|g| g.before_slot).min().unwrap_or(0); + let max_slot = gaps.iter().map(|g| g.after_slot).max().unwrap_or(0); + + // Find the earliest before_signature and latest after_signature + // For comprehensive coverage, we want the earliest possible start and latest possible end + let earliest_before_sig = gaps.iter() + .min_by_key(|g| g.before_slot) + .map(|g| g.before_signature.clone()) + .unwrap_or_default(); + + let latest_after_sig = gaps.iter() + .max_by_key(|g| g.after_slot) + .map(|g| g.after_signature.clone()) + .unwrap_or_default(); + + (min_slot, max_slot, earliest_before_sig, latest_after_sig) +} + +/// Fetch ALL signatures between two boundaries with full pagination +async fn fetch_all_signatures_paginated( + client: &RpcClient, + earliest_before_sig: &str, + latest_after_sig: &str +) -> Result> { + let compression_program_id = solana_sdk::pubkey::Pubkey::new_from_array( + photon_indexer::ingester::parser::get_compression_program_id().to_bytes() + ); + + let before_signature = Signature::from_str(earliest_before_sig)?; + let until_signature = Signature::from_str(latest_after_sig)?; + + let mut all_signatures = Vec::new(); + let mut current_before = Some(until_signature); // Start from latest (going backwards) + let mut page_count = 0; + + loop { + page_count += 1; + let config = GetConfirmedSignaturesForAddress2Config { + before: current_before, + until: Some(before_signature), // Stop at earliest + limit: Some(1000), // Use smaller limit for better reliability + commitment: None, + }; + + let batch = client + .get_signatures_for_address_with_config(&compression_program_id, config) + .await?; + + if batch.is_empty() { + break; // No more signatures + } + + println!(" šŸ“„ Page {}: fetched {} signatures", page_count, batch.len()); + + // Check if we've reached our until signature + let mut reached_until = false; + for sig_info in &batch { + if let Ok(sig) = Signature::from_str(&sig_info.signature) { + if sig == before_signature { + reached_until = true; + break; + } + } + } + + all_signatures.extend(batch.clone()); + + if reached_until || batch.len() < 1000 { // If we got less than limit, we're done + break; + } + + // Update before for next page + current_before = batch.last().and_then(|sig| Signature::from_str(&sig.signature).ok()); + } + + Ok(all_signatures) +} + +/// Efficiently fetch blocks in batch with progress tracking +async fn fetch_blocks_batch( + client: &RpcClient, + mut needed_slots: Vec +) -> Result<(Vec, Vec)> { + needed_slots.sort(); // Process in order + + let mut missing_blocks = Vec::new(); + let mut missing_updates = Vec::new(); + let mut slots_with_missing_seqs = HashSet::new(); + + for (i, slot) in needed_slots.iter().enumerate() { + match client.get_block_with_config( + *slot, + solana_client::rpc_config::RpcBlockConfig { + encoding: Some(solana_transaction_status::UiTransactionEncoding::Base64), + transaction_details: Some(solana_transaction_status::TransactionDetails::Full), + rewards: None, + commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()), + max_supported_transaction_version: Some(0), + }, + ).await { + Ok(block) => { + if let Ok(block_info) = parse_ui_confirmed_blocked(block, *slot) { + let mut has_missing_seq = false; + + // Check if this block contains compression transactions (any type) + for transaction in &block_info.transactions { + if let Ok(state_update) = parse_transaction(transaction, *slot) { + // Check for any compression activity that could fill gaps + if !state_update.indexed_merkle_tree_updates.is_empty() || + !state_update.leaf_nullifications.is_empty() || + !state_update.batch_nullify_context.is_empty() || + !state_update.batch_new_addresses.is_empty() || + !state_update.batch_merkle_tree_events.is_empty() || + !state_update.out_accounts.is_empty() { + + println!(" āœ… Found compression activity in slot {} [{}/{}]", slot, i + 1, needed_slots.len()); + has_missing_seq = true; + + // Still collect V1 address tree updates for backwards compatibility + for ((tree_pubkey, _leaf_index), leaf_update) in state_update.indexed_merkle_tree_updates { + if leaf_update.tree_type == TreeType::AddressV1 && tree_pubkey == V1_ADDRESS_TREE { + missing_updates.push(leaf_update); + } + } + }else { + println!(" āŒ No compression activity in slot {} [{}/{}]", slot, i + 1, needed_slots.len()); + } + } + } + + // If this block has compression activity and we haven't already collected it + if has_missing_seq && !slots_with_missing_seqs.contains(slot) { + // Filter block to only include compression transactions + let filtered_block = BlockInfo { + metadata: block_info.metadata.clone(), + transactions: block_info.transactions.iter() + .filter(|tx| photon_indexer::snapshot::is_compression_transaction(tx)) + .cloned() + .collect(), + }; + + println!(" šŸ“¦ Collected block {} with {} compression transactions [{}/{}]", + slot, filtered_block.transactions.len(), i + 1, needed_slots.len()); + missing_blocks.push(filtered_block); + slots_with_missing_seqs.insert(*slot); + } + } + } + Err(e) => { + println!(" āŒ Failed to fetch slot {} [{}/{}]: {}", slot, i + 1, needed_slots.len(), e); + } + } + } + + Ok((missing_blocks, missing_updates)) +} + +/// Validate if gaps remain after signature-based approach and fallback to slot-range fetching +async fn validate_and_fallback_gap_filling( + client: &RpcClient, + original_gaps: &[SequenceGap], + existing_slots: &HashSet +) -> Result<(Vec, Vec)> { + // First, build a quick snapshot of what we currently have to check for remaining gaps + println!(" šŸ” Checking if gaps still exist after signature-based approach..."); + + // For validation, we need to re-analyze the current state + // This is a simplified check - in a real implementation we'd want to + // rebuild the full state, but for now we'll use the gap ranges as a proxy + + let mut fallback_slots = Vec::new(); + + // For each original gap, check if we might have missed slots in the range + for gap in original_gaps { + println!(" šŸ“Š Checking gap in {:?}: slots {} to {}", + gap.field_type, gap.before_slot, gap.after_slot); + + // Generate all slots in the gap range + let gap_range_slots: Vec = (gap.before_slot + 1..gap.after_slot).collect(); + + // Find slots in this range that we don't have and haven't fetched + let missing_in_range: Vec = gap_range_slots.iter() + .filter(|slot| !existing_slots.contains(slot)) + .copied() + .collect(); + + if !missing_in_range.is_empty() { + println!(" āš ļø Found {} potentially missing slots in gap range", missing_in_range.len()); + + fallback_slots.extend(missing_in_range); + } + } + + if fallback_slots.is_empty() { + println!(" āœ… No additional slots need fallback fetching"); + return Ok((Vec::new(), Vec::new())); + } + + // Remove duplicates and sort + fallback_slots.sort(); + fallback_slots.dedup(); + + println!(" šŸ”„ Fallback: fetching {} additional slots from gap ranges", fallback_slots.len()); + println!(" šŸ“‹ Fallback slots: {:?}", &fallback_slots[..std::cmp::min(10, fallback_slots.len())]); + + // Use the same batch fetching approach for fallback slots + let result = fetch_blocks_batch(client, fallback_slots).await?; + println!(" āœ… Fallback completed: {} blocks, {} updates", result.0.len(), result.1.len()); + Ok(result) +} + +async fn fetch_missing_blocks(gaps: &[SequenceGap]) -> Result<(Vec, Vec)> { + println!("🌐 Ultra-Efficient Global Gap Filling Starting..."); + + if gaps.is_empty() { + return Ok((Vec::new(), Vec::new())); + } + + // Get RPC URL from environment variable or use default devnet + let rpc_url = std::env::var("RPC_URL") + .unwrap_or_else(|_| "https://api.devnet.solana.com".to_string()); + + println!("šŸ”— Using RPC endpoint: {}", rpc_url); + let client = RpcClient::new(rpc_url); + + // Phase 1: Build existing slot index from current snapshot + println!("šŸ“‚ Phase 1: Building existing slot index from snapshot..."); + let existing_slots = build_existing_slot_index().await?; + println!("šŸ“Š Found {} existing slots in snapshot", existing_slots.len()); + + // Phase 1.5: Calculate global gap boundaries + println!("šŸŒ Phase 1.5: Calculating global gap boundaries..."); + let (min_slot, max_slot, earliest_before_sig, latest_after_sig) = calculate_global_gap_boundaries(gaps); + println!("šŸŽÆ Global gap range: slots {} to {} (span: {} slots)", + min_slot, max_slot, max_slot - min_slot); + println!("šŸ”— Global signature range: {} -> {}", + &earliest_before_sig[..8], &latest_after_sig[..8]); + + // Phase 2: Smart signature collection with pagination + println!("šŸ“” Phase 2: Fetching ALL signatures with pagination..."); + let all_signatures = fetch_all_signatures_paginated(&client, &earliest_before_sig, &latest_after_sig).await?; + println!("āœ… Collected {} total signatures across all gaps", all_signatures.len()); + + // Phase 3: Extract and filter slots + println!("šŸ” Phase 3: Extracting and filtering slots..."); + let signature_slots: HashSet = all_signatures.iter() + .filter(|sig_info| sig_info.err.is_none()) // Skip failed transactions + .map(|sig_info| sig_info.slot) + .collect(); + println!("šŸ“Š Found {} unique slots from signatures", signature_slots.len()); + + // Filter out slots we already have - this is the key optimization! + let needed_slots: Vec = signature_slots.iter() + .filter(|slot| !existing_slots.contains(slot)) + .copied() + .collect(); + + println!("šŸŽÆ Need to fetch {} new blocks (filtered out {} existing)", + needed_slots.len(), signature_slots.len() - needed_slots.len()); + + // Phase 4: Efficient batch block fetching (even if empty) + let (mut missing_blocks, mut missing_updates) = if needed_slots.is_empty() { + println!("šŸ“¦ Phase 4: No new blocks to fetch from signatures"); + (Vec::new(), Vec::new()) + } else { + println!("šŸ“¦ Phase 4: Fetching {} missing blocks...", needed_slots.len()); + fetch_blocks_batch(&client, needed_slots).await? + }; + + println!("šŸŽÆ Signature-based approach: found {} blocks, {} updates", missing_blocks.len(), missing_updates.len()); + Ok((missing_blocks, missing_updates)) +} + +fn validate_sequence_consistency(updates: &[IndexedTreeLeafUpdate]) -> Result<()> { + println!("šŸ” Validating sequence consistency after gap filling..."); + + if updates.is_empty() { + return Err(anyhow::anyhow!("No updates to validate")); + } + + let first_seq = updates[0].seq; + let last_seq = updates.last().unwrap().seq; + println!("šŸ“ˆ Sequence range: {} to {} (span: {})", first_seq, last_seq, last_seq - first_seq + 1); + + // Check for sequential ordering + let mut expected_seq = first_seq; + let mut gaps = Vec::new(); + + for (i, update) in updates.iter().enumerate() { + if update.seq != expected_seq { + gaps.push((i, expected_seq, update.seq)); + } + expected_seq = update.seq + 1; + } + + // Check for duplicates + let mut seq_counts: HashMap = HashMap::new(); + for update in updates { + *seq_counts.entry(update.seq).or_insert(0) += 1; + } + + let duplicates: Vec<_> = seq_counts.iter() + .filter(|(_, &count)| count > 1) + .map(|(&seq, &count)| (seq, count)) + .collect(); + + // Report results + println!("\nšŸ“Š Final Validation Results:"); + + if gaps.is_empty() { + println!("āœ… All v1 address tree sequence numbers are now sequential!"); + } else { + println!("āŒ Still found {} gaps:", gaps.len()); + for (index, expected, actual) in gaps.iter().take(5) { + println!(" Index {}: expected seq {}, found seq {}", index, expected, actual); + } + } + + if duplicates.is_empty() { + println!("āœ… No duplicate sequence numbers found"); + } else { + println!("āŒ Found {} duplicate sequence numbers", duplicates.len()); + } + + if !gaps.is_empty() { + return Err(anyhow::anyhow!("Sequence gaps still exist after gap filling")); + } + + if !duplicates.is_empty() { + return Err(anyhow::anyhow!("Duplicate sequence numbers found")); + } + + println!("āœ… Perfect sequence consistency achieved!"); + Ok(()) +} + +async fn update_snapshot_with_missing_blocks(missing_blocks: &[BlockInfo]) -> Result<()> { + println!("šŸ’¾ Updating snapshot file with missing blocks..."); + + let snapshot_path = "/Users/ananas/dev/photon/target/snapshot_local"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string())); + + // Load existing blocks from snapshot + let block_stream = load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + let all_blocks: Vec> = block_stream.collect().await; + let mut existing_blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + println!("šŸ“¦ Loaded {} existing blocks from snapshot", existing_blocks.len()); + + // Add missing blocks to existing blocks + existing_blocks.extend_from_slice(missing_blocks); + + // Sort all blocks by slot + existing_blocks.sort_by_key(|block| block.metadata.slot); + + println!("šŸ“¦ Total blocks after adding missing: {}", existing_blocks.len()); + + // Clear existing snapshot files + let existing_snapshots = photon_indexer::snapshot::get_snapshot_files_with_metadata(directory_adapter.as_ref()).await?; + for snapshot in existing_snapshots { + directory_adapter.delete_file(snapshot.file).await?; + } + + // Create new snapshot with all blocks + let first_slot = existing_blocks.first().map(|b| b.metadata.slot).unwrap_or(0); + let last_slot = existing_blocks.last().map(|b| b.metadata.slot).unwrap_or(0); + + let snapshot_filename = format!("snapshot-{}-{}", first_slot, last_slot); + + println!("šŸ’¾ Writing updated snapshot: {}", snapshot_filename); + + // Serialize all blocks + let mut snapshot_data = Vec::new(); + for block in &existing_blocks { + let block_bytes = bincode::serialize(block).unwrap(); + snapshot_data.extend(block_bytes); + } + + // Write updated snapshot file + let snapshot_path_buf = std::path::PathBuf::from(snapshot_path).join(&snapshot_filename); + std::fs::write(&snapshot_path_buf, snapshot_data)?; + + println!("āœ… Successfully updated snapshot with {} total blocks", existing_blocks.len()); + Ok(()) +} + +async fn verify_gaps_filled() -> Result<()> { + println!("šŸ” Verifying ALL gaps are filled in updated snapshot..."); + + // Run comprehensive analysis to check for all types of gaps + let all_gaps = analyze_existing_snapshot_for_all_gaps().await?; + + if all_gaps.is_empty() { + println!("šŸŽ‰ SUCCESS: All gaps across all StateUpdate fields have been filled!"); + return Ok(()); + } + + println!("āš ļø Still found {} gaps after filling:", all_gaps.len()); + + // Group remaining gaps by field type for better reporting + let mut gaps_by_field: HashMap> = HashMap::new(); + for gap in &all_gaps { + gaps_by_field.entry(gap.field_type.clone()).or_insert_with(Vec::new).push(gap); + } + + for (field_type, field_gaps) in &gaps_by_field { + println!(" {:?}: {} remaining gaps", field_type, field_gaps.len()); + for gap in field_gaps.iter().take(2) { // Show first 2 gaps for each field type + println!(" Slot {} -> {}", gap.before_slot, gap.after_slot); + } + if field_gaps.len() > 2 { + println!(" ... and {} more", field_gaps.len() - 2); + } + } + + // This is still success - we may not have filled all gaps due to missing blocks on RPC + println!("ā„¹ļø Note: Some gaps may remain due to missing blocks on RPC or truly missing sequences"); + Ok(()) +} \ No newline at end of file diff --git a/tests/integration_tests/test_v1_address_tree_sequence_consistency.rs b/tests/integration_tests/test_v1_address_tree_sequence_consistency.rs new file mode 100644 index 00000000..b7e2bb4d --- /dev/null +++ b/tests/integration_tests/test_v1_address_tree_sequence_consistency.rs @@ -0,0 +1,366 @@ +use anyhow::Result; +use futures::StreamExt; +use photon_indexer::ingester::parser::{ + parse_transaction, + state_update::StateUpdate, + indexer_events::MerkleTreeEvent +}; +use photon_indexer::snapshot::{load_block_stream_from_directory_adapter, DirectoryAdapter}; +use solana_pubkey::Pubkey; +use std::collections::HashMap; +use std::sync::Arc; + +fn merkle_event_to_type_id(event: &MerkleTreeEvent) -> u8 { + match event { + MerkleTreeEvent::BatchAppend(_) => 1, + MerkleTreeEvent::BatchNullify(_) => 2, + MerkleTreeEvent::BatchAddressAppend(_) => 3, + _ => 0, // Other event types we don't care about + } +} + + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum StateUpdateFieldType { + IndexedTreeUpdate, + LeafNullification, + BatchNullifyContext, + BatchNewAddress, + BatchMerkleTreeEventAppend, + BatchMerkleTreeEventNullify, + BatchMerkleTreeEventAddressAppend, + OutAccount, +} + +#[derive(Debug, Clone)] +pub struct SequenceGap { + // Boundary information for gap filling + pub before_slot: u64, + pub after_slot: u64, + pub before_signature: String, + pub after_signature: String, + + // Tree/context metadata + pub tree_pubkey: Option, // Tree pubkey (unified for all tree operations) + pub tree_type_string: Option, // Tree type string (for indexed tree updates) + pub field_type: StateUpdateFieldType, +} + + +#[derive(Debug, Default, Clone)] +pub struct StateUpdateSequences { + // Sequences with slot and signature information for gap analysis + indexed_tree_seqs: HashMap<(Pubkey, String), Vec<(u64, u64, String)>>, // (tree, type_string) -> (seq, slot, signature) + nullification_seqs: HashMap>, // tree -> (seq, slot, signature) + batch_nullify_queue_indexes: Vec<(u64, u64, String)>, // (queue_index, slot, signature) + batch_address_queue_indexes: HashMap>, // tree -> (queue_index, slot, signature) + batch_merkle_event_seqs: HashMap<(Pubkey, u8), Vec<(u64, u64, String)>>, // (tree_pubkey, event_type) -> (seq, slot, signature) + out_account_leaf_indexes: HashMap>, // tree -> (leaf_index, slot, signature) +} +impl StateUpdateSequences { +/// Extracts sequences from a StateUpdate with slot and signature context +pub fn extract_state_update_sequences(&mut self, state_update: &StateUpdate, slot: u64, signature: &str) { + + // Extract indexed tree sequences + for ((tree_pubkey, _), leaf_update) in &state_update.indexed_merkle_tree_updates { + let tree_type_string = format!("{:?}", leaf_update.tree_type); + self.indexed_tree_seqs + .entry((*tree_pubkey, tree_type_string)) + .or_insert_with(Vec::new) + .push((leaf_update.seq, slot, signature.to_string())); + } + + // Extract leaf nullification sequences + for nullification in &state_update.leaf_nullifications { + self.nullification_seqs + .entry(nullification.tree) + .or_insert_with(Vec::new) + .push((nullification.seq, slot, signature.to_string())); + } + + // Extract batch nullify context queue indexes + for context in &state_update.batch_nullify_context { + self.batch_nullify_queue_indexes.push((context.nullifier_queue_index, slot, signature.to_string())); + } + + // Extract batch new address queue indexes + for address in &state_update.batch_new_addresses { + self.batch_address_queue_indexes + .entry(address.tree.0) + .or_insert_with(Vec::new) + .push((address.queue_index, slot, signature.to_string())); + } + + // Extract batch merkle tree event sequences + for (tree_hash, events) in &state_update.batch_merkle_tree_events { + let tree_pubkey = Pubkey::from(*tree_hash); + for (seq, merkle_event) in events { + let event_type = merkle_event_to_type_id(merkle_event); + if event_type > 0 { + self.batch_merkle_event_seqs + .entry((tree_pubkey, event_type)) + .or_insert_with(Vec::new) + .push((*seq, slot, signature.to_string())); + } + } + } + + // Extract out_account leaf indexes + for account_with_context in &state_update.out_accounts { + let tree_pubkey = account_with_context.account.tree.0; + let leaf_index = account_with_context.account.leaf_index.0; + self.out_account_leaf_indexes + .entry(tree_pubkey) + .or_insert_with(Vec::new) + .push((leaf_index, slot, signature.to_string())); + } + +} +} + + +/// Merges multiple StateUpdateSequences into a single aggregated structure +pub fn merge_state_update_sequences(all_sequences: &[StateUpdateSequences]) -> StateUpdateSequences { + let mut aggregated = StateUpdateSequences::default(); + + for sequences in all_sequences { + // Merge indexed tree sequences + for ((tree, tree_type_string), seqs) in &sequences.indexed_tree_seqs { + aggregated.indexed_tree_seqs.entry((*tree, tree_type_string.clone())).or_insert_with(Vec::new).extend(seqs.clone()); + } + + // Merge nullification sequences + for (tree, seqs) in &sequences.nullification_seqs { + aggregated.nullification_seqs.entry(*tree).or_insert_with(Vec::new).extend(seqs.clone()); + } + + // Merge batch nullify queue indexes + aggregated.batch_nullify_queue_indexes.extend(sequences.batch_nullify_queue_indexes.clone()); + + // Merge batch address queue indexes + for (tree, seqs) in &sequences.batch_address_queue_indexes { + aggregated.batch_address_queue_indexes.entry(*tree).or_insert_with(Vec::new).extend(seqs.clone()); + } + + // Merge batch merkle event sequences + for ((tree, event_type), seqs) in &sequences.batch_merkle_event_seqs { + aggregated.batch_merkle_event_seqs.entry((*tree, *event_type)).or_insert_with(Vec::new).extend(seqs.clone()); + } + + // Merge out_account leaf indexes + for (tree, seqs) in &sequences.out_account_leaf_indexes { + aggregated.out_account_leaf_indexes.entry(*tree).or_insert_with(Vec::new).extend(seqs.clone()); + } + } + + aggregated +} + +/// Detects gaps from a single StateUpdateSequences struct +pub fn detect_gaps_from_sequences(sequences: &StateUpdateSequences) -> Vec { + let sequences_vec = vec![sequences.clone()]; + detect_all_sequence_gaps(&sequences_vec) +} + +/// Comprehensive gap detection function that takes a vector of StateUpdateSequences and returns ALL gaps found +/// Aggregates sequences from multiple StateUpdates and detects gaps across all transactions +pub fn detect_all_sequence_gaps(all_sequences: &[StateUpdateSequences]) -> Vec { + // First aggregate all sequences from multiple StateUpdates + let sequences = merge_state_update_sequences(all_sequences); + + + let mut all_gaps = Vec::new(); + + // Check indexed tree updates + for ((tree_pubkey, tree_type_string), seqs) in &sequences.indexed_tree_seqs { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + Some(tree_type_string.clone()), + StateUpdateFieldType::IndexedTreeUpdate, + ); + all_gaps.extend(gaps); + } + + // Check leaf nullifications + for (tree_pubkey, seqs) in &sequences.nullification_seqs { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::LeafNullification, + ); + all_gaps.extend(gaps); + } + + // Check batch nullify context + if !sequences.batch_nullify_queue_indexes.is_empty() { + let gaps = detect_sequence_gaps_with_metadata( + &sequences.batch_nullify_queue_indexes, + None, + None, + StateUpdateFieldType::BatchNullifyContext, + ); + all_gaps.extend(gaps); + } + + // Check batch new addresses + for (tree_pubkey, seqs) in &sequences.batch_address_queue_indexes { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::BatchNewAddress, + ); + all_gaps.extend(gaps); + } + + // Check batch merkle tree events + for ((tree_pubkey, event_type), seqs) in &sequences.batch_merkle_event_seqs { + let field_type = match event_type { + 1 => StateUpdateFieldType::BatchMerkleTreeEventAppend, + 2 => StateUpdateFieldType::BatchMerkleTreeEventNullify, + 3 => StateUpdateFieldType::BatchMerkleTreeEventAddressAppend, + _ => continue, + }; + + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + field_type, + ); + all_gaps.extend(gaps); + } + + // Check out_account leaf indexes + for (tree_pubkey, seqs) in &sequences.out_account_leaf_indexes { + let gaps = detect_sequence_gaps_with_metadata( + seqs, + Some(*tree_pubkey), + None, + StateUpdateFieldType::OutAccount, + ); + all_gaps.extend(gaps); + } + + all_gaps +} + + + +/// Detects gaps in a sequence with full metadata for gap filling +fn detect_sequence_gaps_with_metadata( + sequences: &[(u64, u64, String)], // (seq, slot, signature) + tree_pubkey: Option, + tree_type_string: Option, + field_type: StateUpdateFieldType, +) -> Vec { + if sequences.len() < 2 { + return Vec::new(); + } + + let mut sorted_sequences = sequences.to_vec(); + sorted_sequences.sort_by_key(|(seq, _, _)| *seq); + + let mut gaps = Vec::new(); + + for i in 1..sorted_sequences.len() { + let (prev_seq, prev_slot, prev_sig) = &sorted_sequences[i-1]; + let (curr_seq, curr_slot, curr_sig) = &sorted_sequences[i]; + + if curr_seq - prev_seq > 1 { + gaps.push(SequenceGap { + before_slot: *prev_slot, + after_slot: *curr_slot, + before_signature: prev_sig.clone(), + after_signature: curr_sig.clone(), + tree_pubkey, + tree_type_string: tree_type_string.clone(), + field_type: field_type.clone(), + }); + } + } + + gaps +} + + +#[tokio::test] +async fn test_comprehensive_state_update_validation() -> Result<()> { + println!("šŸ” Testing Comprehensive StateUpdate Sequence Consistency"); + + // Load blocks from the created snapshot + let snapshot_path = "/Users/ananas/dev/photon/target/with_gaps"; + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_path.to_string())); + + println!("šŸ“‚ Loading snapshot from: {}", snapshot_path); + let block_stream = load_block_stream_from_directory_adapter(directory_adapter).await; + + // Collect all blocks from the stream + let all_blocks: Vec> = block_stream.collect().await; + let blocks: Vec<_> = all_blocks.into_iter().flatten().collect(); + + println!("šŸ“¦ Processing {} blocks from snapshot", blocks.len()); + + // Extract sequences from all StateUpdates with context + let mut sequences = StateUpdateSequences::default(); + let mut total_transactions = 0; + let mut parsed_transactions = 0; + + for block in blocks { + let slot = block.metadata.slot; + total_transactions += block.transactions.len(); + + for transaction in &block.transactions { + let signature = transaction.signature.to_string(); + + // Parse each transaction to extract state updates + match parse_transaction(transaction, slot) { + Ok(state_update) => { + parsed_transactions += 1; + + // Extract sequences with context for comprehensive validation + sequences.extract_state_update_sequences(&state_update, slot, &signature); + + } + Err(_) => { + // Skip failed parsing - compression transactions might have parsing issues + continue; + } + } + } + } + + println!("šŸ“Š Parsed {}/{} transactions successfully", parsed_transactions, total_transactions); + + // Detect gaps across all transactions + let gaps = detect_all_sequence_gaps(&[sequences]); + + // Comprehensive validation summary + println!("\nšŸ” Comprehensive StateUpdate validation results:"); + println!("šŸ“Š Total gaps detected across all transactions: {}", gaps.len()); + + if gaps.is_empty() { + println!("šŸŽ‰ All StateUpdate sequences are perfectly consistent!"); + } else { + // Group gaps by field type for summary + let mut gaps_by_field: HashMap> = HashMap::new(); + for gap in &gaps { + gaps_by_field.entry(gap.field_type.clone()).or_insert_with(Vec::new).push(gap); + println!("DEBUG: Found gap for tree: {:?}, {:?}", gap.tree_pubkey, gap); + } + + println!("āš ļø Gap breakdown by field type:"); + for (field_type, field_gaps) in &gaps_by_field { + println!(" {:?}: {} gaps", field_type, field_gaps.len()); + } + + println!("āš ļø These gaps may need investigation or gap filling"); + } + + println!("\nšŸŽ‰ Comprehensive StateUpdate validation completed!"); + + Ok(()) +} +