diff --git a/.github/actions/setup-and-build/action.yml b/.github/actions/setup-and-build/action.yml index 15a62eef3e..38477abab2 100644 --- a/.github/actions/setup-and-build/action.yml +++ b/.github/actions/setup-and-build/action.yml @@ -27,7 +27,7 @@ runs: shell: bash run: | sudo apt-get update - sudo apt-get install -y libudev-dev pkg-config build-essential + sudo apt-get install -y libudev-dev pkg-config build-essential protobuf-compiler - name: Load versions id: versions diff --git a/Cargo.lock b/Cargo.lock index 5c2130e879..5cce5ff1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -893,6 +893,49 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" +dependencies = [ + "axum-core", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "sync_wrapper 1.0.2", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59446ce19cd142f8833f856eb31f3eb097812d1479ab224f54d72428ca21ea22" +dependencies = [ + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.12.3" @@ -2117,6 +2160,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fastbloom" version = "0.14.0" @@ -2177,6 +2226,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2551bf44bc5f776c15044b9b94153a00198be06743e262afaaa61f11ac7523a5" +[[package]] +name = "fixedbitset" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" + [[package]] name = "flate2" version = "1.1.4" @@ -2229,6 +2284,7 @@ dependencies = [ "futures", "itertools 0.14.0", "lazy_static", + "light-account-checks", "light-batched-merkle-tree", "light-client", "light-compressed-account", @@ -2246,6 +2302,8 @@ dependencies = [ "num-bigint 0.4.6", "photon-api", "prometheus", + "prost", + "prost-types", "rand 0.8.5", "reqwest 0.12.24", "scopeguard", @@ -2259,6 +2317,10 @@ dependencies = [ "solana-transaction-status", "thiserror 2.0.17", "tokio", + "tokio-stream", + "tonic", + "tonic-prost", + "tonic-prost-build", "tracing", "tracing-appender", "tracing-subscriber", @@ -2274,6 +2336,7 @@ dependencies = [ "async-stream", "async-trait", "bb8", + "bs58", "futures", "governor 0.8.1", "light-account-checks", @@ -2293,6 +2356,7 @@ dependencies = [ "solana-sdk", "thiserror 2.0.17", "tokio", + "tokio-postgres", "tracing", ] @@ -2834,6 +2898,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -2873,6 +2938,19 @@ dependencies = [ "webpki-roots 1.0.3", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.7.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -3300,6 +3378,7 @@ checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" dependencies = [ "bitflags 2.9.4", "libc", + "redox_syscall", ] [[package]] @@ -4255,6 +4334,22 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "memchr" version = "2.7.6" @@ -4352,6 +4447,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "native-tls" version = "0.2.14" @@ -4750,6 +4851,35 @@ dependencies = [ "num", ] +[[package]] +name = "petgraph" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" +dependencies = [ + "fixedbitset", + "indexmap 2.11.4", +] + +[[package]] +name = "phf" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1562dc717473dbaa4c1f85a36410e03c047b2e7df7f45ee938fbef64ae7fadf" +dependencies = [ + "phf_shared", + "serde", +] + +[[package]] +name = "phf_shared" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e57fef6bc5981e38c2ce2d63bfa546861309f875b8a75f092d1d54ae2d64f266" +dependencies = [ + "siphasher 1.0.1", +] + [[package]] name = "photon-api" version = "0.52.0" @@ -4897,6 +5027,35 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "postgres-protocol" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbef655056b916eb868048276cfd5d6a7dea4f81560dfd047f97c8c6fe3fcfd4" +dependencies = [ + "base64 0.22.1", + "byteorder", + "bytes", + "fallible-iterator", + "hmac 0.12.1", + "md-5", + "memchr", + "rand 0.9.2", + "sha2 0.10.9", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef4605b7c057056dd35baeb6ac0c0338e4975b1f2bef0f65da953285eb007095" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "potential_utf" version = "0.1.3" @@ -5001,12 +5160,86 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c3320f9abac597dcbc668774ef006702672474aad53c6d596b62e487b40b1" +dependencies = [ + "heck 0.5.0", + "itertools 0.14.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", + "regex", + "syn 2.0.106", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "prost-types" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b4db3d6da204ed77bb26ba83b6122a73aeb2e87e25fbf7ad2e84c4ccbf8f72" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "pulldown-cmark" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e8bbe1a966bd2f362681a44f6edce3c2310ac21e4d5067a6e7ec396297a6ea0" +dependencies = [ + "bitflags 2.9.4", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "21.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b6a0769a491a08b31ea5c62494a8f144ee0987d86d670a8af4df1e1b7cde75" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "qstring" version = "0.7.2" @@ -9783,6 +10016,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.8.0" @@ -10265,6 +10509,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b40d66d9b2cfe04b628173409368e58247e8eddbbd3b0e6c6ba1d09f20f6c9e" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand 0.9.2", + "socket2 0.6.1", + "tokio", + "tokio-util 0.7.16", + "whoami", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -10310,6 +10580,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util 0.7.16", ] [[package]] @@ -10468,6 +10739,74 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.12", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2 0.6.1", + "sync_wrapper 1.0.2", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c40aaccc9f9eccf2cd82ebc111adc13030d23e887244bc9cfa5d1d636049de3" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4a16cba4043dc3ff43fcb3f96b4c5c154c64cbd18ca8dce2ab2c6a451d058a2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.106", + "tempfile", + "tonic-build", +] + [[package]] name = "tower" version = "0.5.2" @@ -10476,11 +10815,15 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.11.4", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", + "tokio-util 0.7.16", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -10686,6 +11029,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.19" @@ -10701,6 +11050,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -10907,6 +11262,12 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.104" @@ -11032,6 +11393,17 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/cli/src/commands/test-validator/index.ts b/cli/src/commands/test-validator/index.ts index 332fadf278..f201ee2e6e 100644 --- a/cli/src/commands/test-validator/index.ts +++ b/cli/src/commands/test-validator/index.ts @@ -75,6 +75,12 @@ class SetupCommand extends Command { default: 3001, exclusive: ["skip-prover"], }), + "grpc-port": Flags.integer({ + description: "Enable Photon indexer gRPC on this port.", + required: false, + default: 50051, + exclusive: ["skip-indexer"], + }), "limit-ledger-size": Flags.integer({ description: "Keep this amount of shreds in root slots.", required: false, @@ -194,6 +200,7 @@ class SetupCommand extends Command { rpcPort: flags["rpc-port"], gossipHost: flags["gossip-host"], indexerPort: flags["indexer-port"], + grpcPort: flags["grpc-port"], proverPort: flags["prover-port"], prover: !flags["skip-prover"], skipSystemAccounts: flags["skip-system-accounts"], diff --git a/cli/src/utils/initTestEnv.ts b/cli/src/utils/initTestEnv.ts index 4367b23c5c..e28994b107 100644 --- a/cli/src/utils/initTestEnv.ts +++ b/cli/src/utils/initTestEnv.ts @@ -87,6 +87,7 @@ export async function initTestEnv({ prover = true, rpcPort = 8899, indexerPort = 8784, + grpcPort = 50051, proverPort = 3001, gossipHost = "127.0.0.1", checkPhotonVersion = true, @@ -101,6 +102,7 @@ export async function initTestEnv({ prover: boolean; rpcPort?: number; indexerPort?: number; + grpcPort?: number; proverPort?: number; gossipHost?: string; checkPhotonVersion?: boolean; @@ -131,6 +133,7 @@ export async function initTestEnv({ indexerPort, checkPhotonVersion, photonDatabaseUrl, + grpcPort, ); } diff --git a/cli/src/utils/processPhotonIndexer.ts b/cli/src/utils/processPhotonIndexer.ts index 1945818bc1..aa75bb37ba 100644 --- a/cli/src/utils/processPhotonIndexer.ts +++ b/cli/src/utils/processPhotonIndexer.ts @@ -41,6 +41,7 @@ export async function startIndexer( indexerPort: number, checkPhotonVersion: boolean = true, photonDatabaseUrl?: string, + grpcPort: number = 50051, ) { await killIndexer(); const resolvedOrNull = which.sync("photon", { nothrow: true }); @@ -57,6 +58,8 @@ export async function startIndexer( indexerPort.toString(), "--rpc-url", rpcUrl, + "--grpc-port", + grpcPort.toString(), ]; if (photonDatabaseUrl) { args.push("--db-url", photonDatabaseUrl); diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml index 5d22cf7d5c..9c60fed55a 100644 --- a/forester-utils/Cargo.toml +++ b/forester-utils/Cargo.toml @@ -47,3 +47,7 @@ num-traits = { workspace = true } bb8 = { workspace = true } async-trait = { workspace = true } governor = { workspace = true } + +[dev-dependencies] +tokio-postgres = "0.7" +bs58 = { workspace = true } diff --git a/forester-utils/src/instructions/address_batch_update.rs b/forester-utils/src/instructions/address_batch_update.rs index bbd35ceb12..97a8cd24b4 100644 --- a/forester-utils/src/instructions/address_batch_update.rs +++ b/forester-utils/src/instructions/address_batch_update.rs @@ -2,10 +2,7 @@ use std::{pin::Pin, sync::Arc, time::Duration}; use account_compression::processor::initialize_address_merkle_tree::Pubkey; use async_stream::stream; -use futures::{ - stream::{FuturesOrdered, Stream}, - StreamExt, -}; +use futures::stream::Stream; use light_batched_merkle_tree::{ constants::DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, merkle_tree::InstructionDataAddressAppendInputs, }; @@ -23,8 +20,8 @@ use tracing::{debug, error, info, warn}; use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer}; -const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 500; -const MAX_PROOFS_PER_TX: usize = 3; +const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 1000; +const MAX_PROOFS_PER_TX: usize = 4; pub struct AddressUpdateConfig { pub rpc_pool: Arc>, @@ -54,13 +51,14 @@ async fn stream_instruction_data<'a, R: Rpc>( let max_zkp_batches_per_call = calculate_max_zkp_batches_per_call(zkp_batch_size); let total_chunks = leaves_hash_chains.len().div_ceil(max_zkp_batches_per_call); + let mut next_queue_index: Option = None; + for chunk_idx in 0..total_chunks { let chunk_start = chunk_idx * max_zkp_batches_per_call; let chunk_end = std::cmp::min(chunk_start + max_zkp_batches_per_call, leaves_hash_chains.len()); let chunk_hash_chains = &leaves_hash_chains[chunk_start..chunk_end]; let elements_for_chunk = chunk_hash_chains.len() * zkp_batch_size as usize; - let processed_items_offset = chunk_start * zkp_batch_size as usize; { if chunk_idx > 0 { @@ -76,11 +74,15 @@ async fn stream_instruction_data<'a, R: Rpc>( let indexer_update_info = { let mut connection = rpc_pool.get_connection().await?; let indexer = connection.indexer_mut()?; + debug!( + "Requesting {} addresses from Photon for chunk {} with start_queue_index={:?}", + elements_for_chunk, chunk_idx, next_queue_index + ); match indexer .get_address_queue_with_proofs( &merkle_tree_pubkey, elements_for_chunk as u16, - Some(processed_items_offset as u64), + next_queue_index, None, ) .await { @@ -92,6 +94,26 @@ async fn stream_instruction_data<'a, R: Rpc>( } }; + // Log Photon response details + debug!( + "Photon response for chunk {}: received {} addresses, batch_start_index={}, first_queue_index={:?}, last_queue_index={:?}", + chunk_idx, + indexer_update_info.value.addresses.len(), + indexer_update_info.value.batch_start_index, + indexer_update_info.value.addresses.first().map(|a| a.queue_index), + indexer_update_info.value.addresses.last().map(|a| a.queue_index) + ); + + // Update next_queue_index for the next chunk based on the last address returned + if let Some(last_address) = indexer_update_info.value.addresses.last() { + next_queue_index = Some(last_address.queue_index + 1); + debug!( + "Setting next_queue_index={} for chunk {}", + next_queue_index.unwrap(), + chunk_idx + 1 + ); + } + if chunk_idx == 0 { if let Some(first_proof) = indexer_update_info.value.non_inclusion_proofs.first() { if first_proof.root != current_root { @@ -121,56 +143,23 @@ async fn stream_instruction_data<'a, R: Rpc>( }; current_root = new_current_root; - info!("Generating {} ZK proofs with hybrid approach for chunk {}", all_inputs.len(), chunk_idx + 1); - - let mut futures_ordered = FuturesOrdered::new(); - let mut proof_buffer = Vec::new(); - let mut pending_count = 0; + info!("Generating {} zk proofs for batch_address chunk {} (parallel)", all_inputs.len(), chunk_idx + 1); - for (i, inputs) in all_inputs.into_iter().enumerate() { + // Generate ALL proofs in parallel using join_all + let proof_futures: Vec<_> = all_inputs.into_iter().enumerate().map(|(i, inputs)| { let client = Arc::clone(&proof_client); - futures_ordered.push_back(async move { + async move { let result = client.generate_batch_address_append_proof(inputs).await; (i, result) - }); - pending_count += 1; - - if pending_count >= MAX_PROOFS_PER_TX { - for _ in 0..MAX_PROOFS_PER_TX.min(pending_count) { - if let Some((idx, result)) = futures_ordered.next().await { - match result { - Ok((compressed_proof, new_root)) => { - let instruction_data = InstructionDataAddressAppendInputs { - new_root, - compressed_proof: CompressedProof { - a: compressed_proof.a, - b: compressed_proof.b, - c: compressed_proof.c, - }, - }; - proof_buffer.push(instruction_data); - }, - Err(e) => { - error!("Address proof failed to generate at index {}: {:?}", idx, e); - yield Err(ForesterUtilsError::Prover(format!( - "Address proof generation failed at batch {} in chunk {}: {}", - idx, chunk_idx, e - ))); - return; - } - } - pending_count -= 1; - } - } - - if !proof_buffer.is_empty() { - yield Ok(proof_buffer.clone()); - proof_buffer.clear(); - } } - } + }).collect(); + + // Wait for all proofs to complete in parallel + let proof_results = futures::future::join_all(proof_futures).await; - while let Some((idx, result)) = futures_ordered.next().await { + // Process results and batch them into groups of MAX_PROOFS_PER_TX + let mut proof_buffer = Vec::new(); + for (idx, result) in proof_results { match result { Ok((compressed_proof, new_root)) => { let instruction_data = InstructionDataAddressAppendInputs { @@ -183,6 +172,7 @@ async fn stream_instruction_data<'a, R: Rpc>( }; proof_buffer.push(instruction_data); + // Yield when we have MAX_PROOFS_PER_TX proofs ready if proof_buffer.len() >= MAX_PROOFS_PER_TX { yield Ok(proof_buffer.clone()); proof_buffer.clear(); @@ -199,6 +189,7 @@ async fn stream_instruction_data<'a, R: Rpc>( } } + // Yield any remaining proofs if !proof_buffer.is_empty() { yield Ok(proof_buffer); } @@ -249,19 +240,51 @@ fn get_all_circuit_inputs_for_chunk( for (batch_idx, leaves_hash_chain) in chunk_hash_chains.iter().enumerate() { let start_idx = batch_idx * batch_size as usize; let end_idx = start_idx + batch_size as usize; + + let addresses_len = indexer_update_info.value.addresses.len(); + if start_idx >= addresses_len { + return Err(ForesterUtilsError::Indexer(format!( + "Insufficient addresses: batch {} requires start_idx {} but only {} addresses available", + batch_idx, start_idx, addresses_len + ))); + } + let safe_end_idx = std::cmp::min(end_idx, addresses_len); + if safe_end_idx - start_idx != batch_size as usize { + return Err(ForesterUtilsError::Indexer(format!( + "Insufficient addresses: batch {} requires {} addresses (indices {}..{}) but only {} available", + batch_idx, batch_size, start_idx, end_idx, safe_end_idx - start_idx + ))); + } + let batch_addresses: Vec<[u8; 32]> = indexer_update_info.value.addresses - [start_idx..end_idx] + [start_idx..safe_end_idx] .iter() .map(|x| x.address) .collect(); + let proofs_len = indexer_update_info.value.non_inclusion_proofs.len(); + if start_idx >= proofs_len { + return Err(ForesterUtilsError::Indexer(format!( + "Insufficient non-inclusion proofs: batch {} requires start_idx {} but only {} proofs available", + batch_idx, start_idx, proofs_len + ))); + } + let safe_proofs_end_idx = std::cmp::min(end_idx, proofs_len); + if safe_proofs_end_idx - start_idx != batch_size as usize { + return Err(ForesterUtilsError::Indexer(format!( + "Insufficient non-inclusion proofs: batch {} requires {} proofs (indices {}..{}) but only {} available", + batch_idx, batch_size, start_idx, end_idx, safe_proofs_end_idx - start_idx + ))); + } + let mut low_element_values = Vec::new(); let mut low_element_next_values = Vec::new(); let mut low_element_indices = Vec::new(); let mut low_element_next_indices = Vec::new(); let mut low_element_proofs = Vec::new(); - for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..end_idx] { + for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..safe_proofs_end_idx] + { low_element_values.push(proof.low_address_value); low_element_indices.push(proof.low_address_index as usize); low_element_next_indices.push(proof.low_address_next_index as usize); @@ -269,7 +292,8 @@ fn get_all_circuit_inputs_for_chunk( low_element_proofs.push(proof.low_address_proof.to_vec()); } - if create_hash_chain_from_slice(&batch_addresses)? != *leaves_hash_chain { + let computed_hash_chain = create_hash_chain_from_slice(&batch_addresses)?; + if computed_hash_chain != *leaves_hash_chain { return Err(ForesterUtilsError::Prover( "Addresses hash chain does not match".into(), )); @@ -323,6 +347,7 @@ pub async fn get_address_update_instruction_stream<'a, R: Rpc>( let (current_root, leaves_hash_chains, start_index, zkp_batch_size) = ( merkle_tree_data.current_root, merkle_tree_data.leaves_hash_chains, + // merkle_tree_data.batch_start_index, merkle_tree_data.next_index, merkle_tree_data.zkp_batch_size, ); diff --git a/forester-utils/src/lib.rs b/forester-utils/src/lib.rs index 93a0fff089..f77fd9cb8a 100644 --- a/forester-utils/src/lib.rs +++ b/forester-utils/src/lib.rs @@ -22,6 +22,7 @@ pub struct ParsedMerkleTreeData { pub pending_batch_index: u32, pub num_inserted_zkps: u64, pub current_zkp_batch_index: u64, + pub batch_start_index: u64, pub leaves_hash_chains: Vec<[u8; 32]>, } diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 55406669f5..40d9cb802b 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -12,6 +12,7 @@ solana-client = { workspace = true } solana-account-decoder = { workspace = true } solana-program = { workspace = true } account-compression = { workspace = true } +light-account-checks = { workspace = true } light-batched-merkle-tree = { workspace = true } light-compressed-account = { workspace = true, features = ["std"] } light-system-program-anchor = { workspace = true, features = ["cpi"] } @@ -52,6 +53,16 @@ scopeguard = "1.2.0" itertools = "0.14.0" num-bigint = { workspace = true } +# gRPC client for Photon queue subscriptions (match Photon versions) +tonic = "0.14.2" +prost = "0.14.1" +prost-types = "0.14.1" +tonic-prost = "0.14.2" +tokio-stream = { version = "0.1", features = ["sync"] } + +[build-dependencies] +tonic-prost-build = "0.14.2" + [dev-dependencies] serial_test = { workspace = true } light-prover-client = { workspace = true, features = ["devenv"] } diff --git a/forester/build.rs b/forester/build.rs new file mode 100644 index 0000000000..2760c2372a --- /dev/null +++ b/forester/build.rs @@ -0,0 +1,6 @@ +fn main() -> Result<(), Box> { + tonic_prost_build::configure().compile_protos(&["proto/photon.proto"], &["proto"])?; + println!("cargo:rerun-if-changed=proto/photon.proto"); + + Ok(()) +} diff --git a/forester/proto/photon.proto b/forester/proto/photon.proto new file mode 100644 index 0000000000..0069b4a09d --- /dev/null +++ b/forester/proto/photon.proto @@ -0,0 +1,70 @@ +syntax = "proto3"; + +package photon; + +// Queue information service +service QueueService { + // Get current queue information for all or specific trees + rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse); + + // Subscribe to queue updates + rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate); +} + +// Request message for GetQueueInfo +message GetQueueInfoRequest { + // Optional list of tree pubkeys to filter by (base58 encoded) + // If empty, returns info for all trees + repeated string trees = 1; +} + +// Response message for GetQueueInfo +message GetQueueInfoResponse { + repeated QueueInfo queues = 1; + uint64 slot = 2; +} + +// Information about a single queue +message QueueInfo { + // Tree public key (base58 encoded) + string tree = 1; + + // Queue public key (base58 encoded) + string queue = 2; + + // Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2 + uint32 queue_type = 3; + + // Current number of items in the queue + uint64 queue_size = 4; +} + +// Request message for SubscribeQueueUpdates +message SubscribeQueueUpdatesRequest { + // Optional list of tree pubkeys to subscribe to (base58 encoded) + // If empty, subscribes to all trees + repeated string trees = 1; + + // Whether to send initial state before streaming updates + bool send_initial_state = 2; +} + +// Streamed queue update message +message QueueUpdate { + // The queue that was updated + QueueInfo queue_info = 1; + + // Slot at which the update occurred + uint64 slot = 2; + + // Type of update + UpdateType update_type = 3; +} + +// Type of queue update +enum UpdateType { + UPDATE_TYPE_UNSPECIFIED = 0; + UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription + UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue + UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue +} diff --git a/forester/src/cli.rs b/forester/src/cli.rs index 28d81c7cc2..d8a0739e80 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -68,6 +68,9 @@ pub struct StartArgs { #[arg(long, env = "FORESTER_PHOTON_API_KEY")] pub photon_api_key: Option, + #[arg(long, env = "FORESTER_PHOTON_GRPC_URL")] + pub photon_grpc_url: Option, + #[arg(long, env = "FORESTER_INDEXER_BATCH_SIZE", default_value = "50")] pub indexer_batch_size: usize, diff --git a/forester/src/config.rs b/forester/src/config.rs index d90c6a63f9..d4369ff31d 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -40,6 +40,7 @@ pub struct ExternalServicesConfig { pub prover_address_append_url: Option, pub prover_api_key: Option, pub photon_api_key: Option, + pub photon_grpc_url: Option, pub pushgateway_url: Option, pub pagerduty_routing_key: Option, pub rpc_rate_limit: Option, @@ -127,7 +128,7 @@ impl GeneralConfig { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub struct RpcPoolConfig { pub max_size: u32, pub connection_timeout_secs: u64, @@ -232,6 +233,7 @@ impl ForesterConfig { .or_else(|| args.prover_url.clone()), prover_api_key: args.prover_api_key.clone(), photon_api_key: args.photon_api_key.clone(), + photon_grpc_url: args.photon_grpc_url.clone(), pushgateway_url: args.push_gateway_url.clone(), pagerduty_routing_key: args.pagerduty_routing_key.clone(), rpc_rate_limit: args.rpc_rate_limit, @@ -310,6 +312,7 @@ impl ForesterConfig { prover_address_append_url: None, prover_api_key: None, photon_api_key: None, + photon_grpc_url: None, pushgateway_url: args.push_gateway_url.clone(), pagerduty_routing_key: args.pagerduty_routing_key.clone(), rpc_rate_limit: None, diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index bff6495ff1..4d2fcaa752 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -62,6 +62,7 @@ use crate::{ slot_tracker::{slot_duration, wait_until_slot_reached, SlotTracker}, tree_data_sync::fetch_trees, tree_finder::TreeFinder, + work_coordinator::{QueueUpdateMessage, WorkCoordinator}, ForesterConfig, ForesterEpochInfo, Result, }; @@ -106,6 +107,8 @@ pub struct EpochManager { new_tree_sender: broadcast::Sender, tx_cache: Arc>, ops_cache: Arc>, + /// Persistent WorkCoordinator for event-driven V2 processing (created at startup) + coordinator: Option>, } impl Clone for EpochManager { @@ -122,6 +125,7 @@ impl Clone for EpochManager { new_tree_sender: self.new_tree_sender.clone(), tx_cache: self.tx_cache.clone(), ops_cache: self.ops_cache.clone(), + coordinator: self.coordinator.clone(), } } } @@ -139,6 +143,41 @@ impl EpochManager { tx_cache: Arc>, ops_cache: Arc>, ) -> Result { + let coordinator = if let Some(url) = &config.external_services.photon_grpc_url { + info!( + "Creating persistent WorkCoordinator at startup for gRPC URL: {}", + url + ); + match WorkCoordinator::new(url.clone()).await { + Ok(coord) => { + let coord_arc = Arc::new(coord); + + tokio::spawn({ + let coord_clone = Arc::clone(&coord_arc); + async move { + info!("Starting persistent WorkCoordinator dispatcher"); + if let Err(e) = coord_clone.run_dispatcher().await { + error!("WorkCoordinator dispatcher error: {:?}", e); + } + } + }); + + info!("Persistent WorkCoordinator created successfully"); + Some(coord_arc) + } + Err(e) => { + warn!( + "Failed to create WorkCoordinator at startup: {:?}. V2 trees will use polling fallback.", + e + ); + None + } + } + } else { + info!("photon_grpc_url not configured, V2 trees will use polling mode"); + None + }; + Ok(Self { config, protocol_config, @@ -151,6 +190,7 @@ impl EpochManager { new_tree_sender, tx_cache, ops_cache, + coordinator, }) } @@ -522,24 +562,25 @@ impl EpochManager { .await { Ok(info) => info, - Err(e) => { - // Check if this is a RegistrationPhaseEnded error by downcasting - if let Some(ForesterError::Registration( - RegistrationError::RegistrationPhaseEnded { - epoch: failed_epoch, - current_slot, - registration_end, - }, - )) = e.downcast_ref::() - { - info!( - "Registration period ended for epoch {} (current slot: {}, registration ended at: {}). Will retry when next epoch registration opens.", - failed_epoch, current_slot, registration_end - ); - return Ok(()); - } - return Err(e); + Err(ForesterError::Registration( + RegistrationError::RegistrationPhaseEnded { + epoch: failed_epoch, + current_slot, + registration_end, + }, + )) => { + let next_epoch = failed_epoch + 1; + let next_phases = get_epoch_phases(&self.protocol_config, next_epoch); + let slots_to_wait = + next_phases.registration.start.saturating_sub(current_slot); + + info!( + "Too late to register for epoch {} (registration ended at slot {}, current slot: {}). Next available epoch: {}. Registration opens at slot {} ({} slots to wait).", + failed_epoch, registration_end, current_slot, next_epoch, next_phases.registration.start, slots_to_wait + ); + return Ok(()); } + Err(e) => return Err(e.into()), } } }; @@ -586,16 +627,17 @@ impl EpochManager { epoch: u64, max_retries: u32, retry_delay: Duration, - ) -> Result { + ) -> std::result::Result { let rpc = LightClient::new(LightClientConfig { url: self.config.external_services.rpc_url.to_string(), photon_url: self.config.external_services.indexer_url.clone(), api_key: self.config.external_services.photon_api_key.clone(), - commitment_config: None, + commitment_config: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()), fetch_active_tree: false, }) - .await?; - let slot = rpc.get_slot().await?; + .await + .map_err(ForesterError::Rpc)?; + let slot = rpc.get_slot().await.map_err(ForesterError::Rpc)?; let phases = get_epoch_phases(&self.protocol_config, epoch); // Check if it's already too late to register @@ -638,7 +680,7 @@ impl EpochManager { error!("Failed to send PagerDuty alert: {:?}", alert_err); } } - return Err(e); + return Err(ForesterError::Other(e)); } } } @@ -658,7 +700,7 @@ impl EpochManager { url: self.config.external_services.rpc_url.to_string(), photon_url: self.config.external_services.indexer_url.clone(), api_key: self.config.external_services.photon_api_key.clone(), - commitment_config: None, + commitment_config: Some(solana_sdk::commitment_config::CommitmentConfig::processed()), fetch_active_tree: false, }) .await?; @@ -809,12 +851,13 @@ impl EpochManager { ) -> Result { let mut rpc = self.rpc_pool.get_connection().await?; let active_phase_start_slot = epoch_info.epoch.phases.active.start; + let active_phase_end_slot = epoch_info.epoch.phases.active.end; let current_slot = self.slot_tracker.estimated_current_slot(); if current_slot >= active_phase_start_slot { info!( - "Active phase has already started. Current slot: {}. Active phase start slot: {}", - current_slot, active_phase_start_slot + "Active phase has already started. Current slot: {}. Active phase start slot: {}. Slots left: {}.", + current_slot, active_phase_start_slot, active_phase_end_slot.saturating_sub(current_slot) ); } else { let waiting_slots = active_phase_start_slot - current_slot; @@ -895,58 +938,85 @@ impl EpochManager { self.sync_slot().await?; + let (_, v2_trees): (Vec<_>, Vec<_>) = epoch_info + .trees + .iter() + .filter(|tree| !should_skip_tree(&self.config, &tree.tree_accounts.tree_type)) + .partition(|tree| { + matches!( + tree.tree_accounts.tree_type, + TreeType::StateV1 | TreeType::AddressV1 + ) + }); + + let coordinator = self.coordinator.clone(); + + if let Some(ref coord) = coordinator { + if coord.is_healthy() { + info!("Using WorkCoordinator for {} V2 trees", v2_trees.len()); + } else { + info!( + "WorkCoordinator exists but not yet healthy. V2 trees will use polling fallback until connection establishes." + ); + } + } else if !v2_trees.is_empty() { + info!("No WorkCoordinator available. V2 trees will use polling mode."); + } + let self_arc = Arc::new(self.clone()); let epoch_info_arc = Arc::new(epoch_info.clone()); - let mut handles: Vec>> = Vec::new(); for tree in epoch_info.trees.iter() { - if self.config.general_config.skip_v1_address_trees - && tree.tree_accounts.tree_type == TreeType::AddressV1 - { - info!("skipping address v1"); - continue; - } else if self.config.general_config.skip_v2_address_trees - && tree.tree_accounts.tree_type == TreeType::AddressV2 - { - info!("skipping address v2"); - - continue; - } else if self.config.general_config.skip_v1_state_trees - && tree.tree_accounts.tree_type == TreeType::StateV1 - { - info!("skipping state v1"); - continue; - } else if self.config.general_config.skip_v2_state_trees - && tree.tree_accounts.tree_type == TreeType::StateV2 - { - info!("skipping state v2"); + if should_skip_tree(&self.config, &tree.tree_accounts.tree_type) { continue; } + let queue_update_rx = if matches!( + tree.tree_accounts.tree_type, + TreeType::StateV2 | TreeType::AddressV2 + ) { + if let Some(ref coord) = coordinator { + Some(coord.register_tree(tree.tree_accounts.merkle_tree).await) + } else { + None + } + } else { + None + }; + + let has_channel = queue_update_rx.is_some(); info!( - "Creating thread for tree {}", - tree.tree_accounts.merkle_tree + "Creating thread for tree {} (type: {:?}, event_driven: {})", + tree.tree_accounts.merkle_tree, tree.tree_accounts.tree_type, has_channel ); + let self_clone = self_arc.clone(); let epoch_info_clone = epoch_info_arc.clone(); let tree = tree.clone(); + let coordinator_clone = coordinator.clone(); + let handle = tokio::spawn(async move { - self_clone - .process_queue( + let result = self_clone + .process_queue_v2( &epoch_info_clone.epoch, &epoch_info_clone.forester_epoch_pda, - tree, + tree.clone(), + queue_update_rx, + coordinator_clone.clone(), ) - .await + .await; + + if let Some(coord) = coordinator_clone { + coord.unregister_tree(&tree.tree_accounts.merkle_tree).await; + } + + result }); handles.push(handle); } - trace!("Threads created. Waiting for active phase to end"); - - // Wait for all tasks to complete for result in join_all(handles).await { match result { Ok(Ok(())) => { @@ -983,13 +1053,38 @@ impl EpochManager { mut tree_schedule: TreeForesterSchedule, ) -> Result<()> { let mut current_slot = self.slot_tracker.estimated_current_slot(); + + let total_slots = tree_schedule.slots.len(); + let eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(); + info!( + "Starting process_queue for tree {}: total_slots={}, eligible_slots={}, current_slot={}, active_phase_end={}", + tree_schedule.tree_accounts.merkle_tree, + total_slots, + eligible_slots, + current_slot, + epoch_info.phases.active.end + ); 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { + info!( + "Searching for next slot to process. Current slot: {}", + current_slot + ); let next_slot_to_process = tree_schedule .slots .iter_mut() .enumerate() .find_map(|(idx, opt_slot)| opt_slot.as_ref().map(|s| (idx, s.clone()))); + info!( + "Next slot to process: {:?}", + next_slot_to_process.as_ref().map(|(idx, slot)| ( + idx, + slot.slot, + slot.start_solana_slot, + slot.end_solana_slot + )) + ); + if let Some((slot_idx, light_slot_details)) = next_slot_to_process { match self .process_light_slot( @@ -1032,6 +1127,113 @@ impl EpochManager { Ok(()) } + #[instrument( + level = "debug", + skip(self, epoch_info, epoch_pda, tree_schedule, queue_update_rx, coordinator), + fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch, + tree = %tree_schedule.tree_accounts.merkle_tree) + )] + pub async fn process_queue_v2( + &self, + epoch_info: &Epoch, + epoch_pda: &ForesterEpochPda, + mut tree_schedule: TreeForesterSchedule, + mut queue_update_rx: Option>, + coordinator: Option>, + ) -> Result<()> { + let mut current_slot = self.slot_tracker.estimated_current_slot(); + + let total_slots = tree_schedule.slots.len(); + let eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(); + let tree_type = tree_schedule.tree_accounts.tree_type; + + info!( + "process_queue_v2 tree={}: type={:?}, total_slots={}, eligible_slots={}, event_driven={}, current_slot={}, active_phase_end={}", + tree_schedule.tree_accounts.merkle_tree, + tree_type, + total_slots, + eligible_slots, + queue_update_rx.is_some(), + current_slot, + epoch_info.phases.active.end + ); + + let use_events = queue_update_rx.is_some(); + + 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { + let next_slot_to_process = tree_schedule + .slots + .iter_mut() + .enumerate() + .find_map(|(idx, opt_slot)| opt_slot.as_ref().map(|s| (idx, s.clone()))); + + if let Some((slot_idx, light_slot_details)) = next_slot_to_process { + let result = match tree_type { + TreeType::StateV1 | TreeType::AddressV1 => { + self.process_light_slot( + epoch_info, + epoch_pda, + &tree_schedule.tree_accounts, + &light_slot_details, + ) + .await + } + TreeType::StateV2 | TreeType::AddressV2 => { + if use_events && queue_update_rx.is_some() { + self.process_light_slot_v2_event( + epoch_info, + epoch_pda, + &tree_schedule.tree_accounts, + &light_slot_details, + queue_update_rx.as_mut().unwrap(), + coordinator.clone(), + ) + .await + } else { + self.process_light_slot_v2_fallback( + epoch_info, + epoch_pda, + &tree_schedule.tree_accounts, + &light_slot_details, + ) + .await + } + } + }; + + match result { + Ok(_) => { + trace!( + "Successfully processed light slot {:?}", + light_slot_details.slot + ); + } + Err(e) => { + error!( + "Error processing light slot {:?}: {:?}", + light_slot_details.slot, e + ); + } + } + tree_schedule.slots[slot_idx] = None; + } else { + info!( + "No further eligible slots in schedule for tree {}", + tree_schedule.tree_accounts.merkle_tree + ); + break 'outer_slot_loop; + } + + current_slot = self.slot_tracker.estimated_current_slot(); + } + + info!( + "Exiting process_queue_v2 for tree {}", + tree_schedule.tree_accounts.merkle_tree + ); + Ok(()) + } + #[instrument( level = "debug", skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details), @@ -1045,8 +1247,8 @@ impl EpochManager { tree_accounts: &TreeAccounts, forester_slot_details: &ForesterSlot, ) -> Result<()> { - trace!( - "Found eligible slot: {:?}. Target start: {}, Target end: {}", + info!( + "Processing slot {} ({}-{})", forester_slot_details.slot, forester_slot_details.start_solana_slot, forester_slot_details.end_solana_slot @@ -1058,6 +1260,10 @@ impl EpochManager { forester_slot_details.start_solana_slot, ) .await?; + info!( + "Slot {} started, beginning processing", + forester_slot_details.slot + ); let mut estimated_slot = self.slot_tracker.estimated_current_slot(); @@ -1073,7 +1279,7 @@ impl EpochManager { let current_light_slot = (estimated_slot - epoch_info.phases.active.start) / epoch_pda.protocol_config.slot_length; if current_light_slot != forester_slot_details.slot { - warn!("Light slot mismatch. Exiting processing for this slot."); + warn!("Slot mismatch. Exiting processing for this slot."); break 'inner_processing_loop; } @@ -1127,8 +1333,252 @@ impl EpochManager { push_metrics(&self.config.external_services.pushgateway_url).await?; estimated_slot = self.slot_tracker.estimated_current_slot(); - // Add polling interval to reduce RPC pressure and improve response time - tokio::time::sleep(std::time::Duration::from_millis(200)).await; + let sleep_duration_ms = if items_processed_this_iteration > 0 { + 10_000 + } else { + 45_000 + }; + + tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await; + } + Ok(()) + } + + #[instrument( + level = "debug", + skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details, queue_update_rx, coordinator), + fields(tree = %tree_accounts.merkle_tree) + )] + async fn process_light_slot_v2_event( + &self, + epoch_info: &Epoch, + epoch_pda: &ForesterEpochPda, + tree_accounts: &TreeAccounts, + forester_slot_details: &ForesterSlot, + queue_update_rx: &mut mpsc::Receiver, + coordinator: Option>, + ) -> Result<()> { + info!( + "Processing V2 light slot {} ({}-{})", + forester_slot_details.slot, + forester_slot_details.start_solana_slot, + forester_slot_details.end_solana_slot + ); + + let mut rpc = self.rpc_pool.get_connection().await?; + wait_until_slot_reached( + &mut *rpc, + &self.slot_tracker, + forester_slot_details.start_solana_slot, + ) + .await?; + + let tree_pubkey = tree_accounts.merkle_tree; + let mut estimated_slot = self.slot_tracker.estimated_current_slot(); + + let mut fallback_timer = tokio::time::interval(Duration::from_secs(5)); + fallback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + 'inner_processing_loop: loop { + if estimated_slot >= forester_slot_details.end_solana_slot { + trace!( + "Ending V2 event processing for slot {:?}", + forester_slot_details.slot + ); + break 'inner_processing_loop; + } + + let current_light_slot = (estimated_slot - epoch_info.phases.active.start) + / epoch_pda.protocol_config.slot_length; + if current_light_slot != forester_slot_details.slot { + warn!("V2 slot mismatch. Exiting processing."); + break 'inner_processing_loop; + } + + if !self + .check_forester_eligibility( + epoch_pda, + current_light_slot, + &tree_accounts.queue, + epoch_info.epoch, + epoch_info, + ) + .await? + { + break 'inner_processing_loop; + } + + tokio::select! { + Some(update) = queue_update_rx.recv() => { + if update.queue_size > 0 { + info!( + "V2 Queue update received for tree {}: {} items (type: {:?})", + tree_pubkey, update.queue_size, update.queue_type + ); + + let processing_start_time = Instant::now(); + match self.dispatch_tree_processing( + epoch_info, + epoch_pda, + tree_accounts, + forester_slot_details, + estimated_slot, + ).await { + Ok(count) => { + if count > 0 { + info!("V2 event processed {} items", count); + self.update_metrics_and_counts( + epoch_info.epoch, + count, + processing_start_time.elapsed(), + ).await; + } + } + Err(e) => { + error!("V2 event processing failed: {:?}", e); + } + } + } else { + trace!("V2 received empty queue update for tree {}", tree_pubkey); + } + } + + _ = fallback_timer.tick() => { + let is_healthy = coordinator + .as_ref() + .map(|c| c.is_healthy()) + .unwrap_or(false); + + if !is_healthy { + warn!("V2 gRPC connection unhealthy, running fallback check for tree {}", tree_pubkey); + let processing_start_time = Instant::now(); + match self.dispatch_tree_processing( + epoch_info, + epoch_pda, + tree_accounts, + forester_slot_details, + estimated_slot, + ).await { + Ok(count) if count > 0 => { + info!("V2 fallback found {} items", count); + self.update_metrics_and_counts( + epoch_info.epoch, + count, + processing_start_time.elapsed(), + ).await; + } + Ok(_) => trace!("V2 fallback check: no work"), + Err(e) => error!("V2 fallback check failed: {:?}", e), + } + } else { + trace!("V2 fallback check skipped (gRPC healthy)"); + } + } + } + + push_metrics(&self.config.external_services.pushgateway_url).await?; + estimated_slot = self.slot_tracker.estimated_current_slot(); + } + + Ok(()) + } + + /// V2 polling fallback (when gRPC unavailable) + #[instrument( + level = "debug", + skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details), + fields(tree = %tree_accounts.merkle_tree) + )] + async fn process_light_slot_v2_fallback( + &self, + epoch_info: &Epoch, + epoch_pda: &ForesterEpochPda, + tree_accounts: &TreeAccounts, + forester_slot_details: &ForesterSlot, + ) -> Result<()> { + info!( + "Processing V2 light slot {} fallback ({}-{})", + forester_slot_details.slot, + forester_slot_details.start_solana_slot, + forester_slot_details.end_solana_slot + ); + + let mut rpc = self.rpc_pool.get_connection().await?; + wait_until_slot_reached( + &mut *rpc, + &self.slot_tracker, + forester_slot_details.start_solana_slot, + ) + .await?; + + let mut estimated_slot = self.slot_tracker.estimated_current_slot(); + + 'inner_processing_loop: loop { + if estimated_slot >= forester_slot_details.end_solana_slot { + break 'inner_processing_loop; + } + + let current_light_slot = (estimated_slot - epoch_info.phases.active.start) + / epoch_pda.protocol_config.slot_length; + if current_light_slot != forester_slot_details.slot { + break 'inner_processing_loop; + } + + if !self + .check_forester_eligibility( + epoch_pda, + current_light_slot, + &tree_accounts.queue, + epoch_info.epoch, + epoch_info, + ) + .await? + { + break 'inner_processing_loop; + } + + let processing_start_time = Instant::now(); + let items_processed_this_iteration = match self + .dispatch_tree_processing( + epoch_info, + epoch_pda, + tree_accounts, + forester_slot_details, + estimated_slot, + ) + .await + { + Ok(count) => count, + Err(e) => { + error!("Failed V2 polling fallback: {:?}", e); + break 'inner_processing_loop; + } + }; + + if items_processed_this_iteration > 0 { + info!( + "V2 polling fallback processed {} items", + items_processed_this_iteration + ); + } + + self.update_metrics_and_counts( + epoch_info.epoch, + items_processed_this_iteration, + processing_start_time.elapsed(), + ) + .await; + + push_metrics(&self.config.external_services.pushgateway_url).await?; + estimated_slot = self.slot_tracker.estimated_current_slot(); + + let sleep_duration_ms = if items_processed_this_iteration > 0 { + 1_000 + } else { + 5_000 + }; + + tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await; } Ok(()) } @@ -1256,6 +1706,13 @@ impl EpochManager { ) .await?; + if num_sent > 0 { + info!( + "Processed {} items from tree {}", + num_sent, tree_accounts.merkle_tree + ); + } + match self.rollover_if_needed(tree_accounts).await { Ok(_) => Ok(num_sent), Err(e) => { @@ -1366,7 +1823,7 @@ impl EpochManager { url: self.config.external_services.rpc_url.to_string(), photon_url: self.config.external_services.indexer_url.clone(), api_key: self.config.external_services.photon_api_key.clone(), - commitment_config: None, + commitment_config: Some(solana_sdk::commitment_config::CommitmentConfig::processed()), fetch_active_tree: false, }) .await?; @@ -1530,6 +1987,16 @@ fn calculate_remaining_time_or_default( .unwrap_or(Duration::ZERO) } +/// Helper function to check if a tree should be skipped based on configuration +fn should_skip_tree(config: &ForesterConfig, tree_type: &TreeType) -> bool { + match tree_type { + TreeType::AddressV1 => config.general_config.skip_v1_address_trees, + TreeType::AddressV2 => config.general_config.skip_v2_address_trees, + TreeType::StateV1 => config.general_config.skip_v1_state_trees, + TreeType::StateV2 => config.general_config.skip_v2_state_trees, + } +} + #[instrument( level = "info", skip(config, protocol_config, rpc_pool, shutdown, work_report_sender, slot_tracker), @@ -1668,3 +2135,196 @@ pub async fn run_service( }) .await } + +#[cfg(test)] +mod tests { + use light_client::rpc::RetryConfig; + use solana_sdk::{pubkey::Pubkey, signature::Keypair}; + + use super::*; + use crate::{ + config::{ExternalServicesConfig, GeneralConfig}, + ForesterConfig, + }; + + fn create_test_config_with_skip_flags( + skip_v1_state: bool, + skip_v1_address: bool, + skip_v2_state: bool, + skip_v2_address: bool, + ) -> ForesterConfig { + ForesterConfig { + external_services: ExternalServicesConfig { + rpc_url: "http://localhost:8899".to_string(), + ws_rpc_url: None, + indexer_url: None, + prover_url: None, + prover_append_url: None, + prover_update_url: None, + prover_address_append_url: None, + prover_api_key: None, + photon_api_key: None, + photon_grpc_url: None, + pushgateway_url: None, + pagerduty_routing_key: None, + rpc_rate_limit: None, + photon_rate_limit: None, + send_tx_rate_limit: None, + }, + retry_config: RetryConfig::default(), + queue_config: Default::default(), + indexer_config: Default::default(), + transaction_config: Default::default(), + general_config: GeneralConfig { + slot_update_interval_seconds: 10, + tree_discovery_interval_seconds: 1, + enable_metrics: false, + skip_v1_state_trees: skip_v1_state, + skip_v1_address_trees: skip_v1_address, + skip_v2_state_trees: skip_v2_state, + skip_v2_address_trees: skip_v2_address, + tree_id: None, + }, + rpc_pool_config: Default::default(), + registry_pubkey: Pubkey::default(), + payer_keypair: Keypair::new(), + derivation_pubkey: Pubkey::default(), + address_tree_data: vec![], + state_tree_data: vec![], + } + } + + #[test] + fn test_should_skip_tree_none_skipped() { + let config = create_test_config_with_skip_flags(false, false, false, false); + + assert!(!should_skip_tree(&config, &TreeType::StateV1)); + assert!(!should_skip_tree(&config, &TreeType::StateV2)); + assert!(!should_skip_tree(&config, &TreeType::AddressV1)); + assert!(!should_skip_tree(&config, &TreeType::AddressV2)); + } + + #[test] + fn test_should_skip_tree_all_v1_skipped() { + let config = create_test_config_with_skip_flags(true, true, false, false); + + assert!(should_skip_tree(&config, &TreeType::StateV1)); + assert!(should_skip_tree(&config, &TreeType::AddressV1)); + assert!(!should_skip_tree(&config, &TreeType::StateV2)); + assert!(!should_skip_tree(&config, &TreeType::AddressV2)); + } + + #[test] + fn test_should_skip_tree_all_v2_skipped() { + let config = create_test_config_with_skip_flags(false, false, true, true); + + assert!(!should_skip_tree(&config, &TreeType::StateV1)); + assert!(!should_skip_tree(&config, &TreeType::AddressV1)); + assert!(should_skip_tree(&config, &TreeType::StateV2)); + assert!(should_skip_tree(&config, &TreeType::AddressV2)); + } + + #[test] + fn test_should_skip_tree_only_state_trees() { + let config = create_test_config_with_skip_flags(true, false, true, false); + + assert!(should_skip_tree(&config, &TreeType::StateV1)); + assert!(should_skip_tree(&config, &TreeType::StateV2)); + assert!(!should_skip_tree(&config, &TreeType::AddressV1)); + assert!(!should_skip_tree(&config, &TreeType::AddressV2)); + } + + #[test] + fn test_should_skip_tree_only_address_trees() { + let config = create_test_config_with_skip_flags(false, true, false, true); + + assert!(!should_skip_tree(&config, &TreeType::StateV1)); + assert!(!should_skip_tree(&config, &TreeType::StateV2)); + assert!(should_skip_tree(&config, &TreeType::AddressV1)); + assert!(should_skip_tree(&config, &TreeType::AddressV2)); + } + + #[test] + fn test_should_skip_tree_mixed_config() { + // Skip V1 state and V2 address + let config = create_test_config_with_skip_flags(true, false, false, true); + + assert!(should_skip_tree(&config, &TreeType::StateV1)); + assert!(!should_skip_tree(&config, &TreeType::StateV2)); + assert!(!should_skip_tree(&config, &TreeType::AddressV1)); + assert!(should_skip_tree(&config, &TreeType::AddressV2)); + } + + #[test] + fn test_general_config_test_address_v2() { + let config = GeneralConfig::test_address_v2(); + + assert!(config.skip_v1_state_trees); + assert!(config.skip_v1_address_trees); + assert!(config.skip_v2_state_trees); + assert!(!config.skip_v2_address_trees); + } + + #[test] + fn test_general_config_test_state_v2() { + let config = GeneralConfig::test_state_v2(); + + assert!(config.skip_v1_state_trees); + assert!(config.skip_v1_address_trees); + assert!(!config.skip_v2_state_trees); + assert!(config.skip_v2_address_trees); + } + + #[test] + fn test_work_item_is_address_tree() { + let tree_account = TreeAccounts { + merkle_tree: Pubkey::new_unique(), + queue: Pubkey::new_unique(), + is_rolledover: false, + tree_type: TreeType::AddressV1, + }; + + let work_item = WorkItem { + tree_account, + queue_item_data: QueueItemData { + hash: [0u8; 32], + index: 0, + }, + }; + + assert!(work_item.is_address_tree()); + assert!(!work_item.is_state_tree()); + } + + #[test] + fn test_work_item_is_state_tree() { + let tree_account = TreeAccounts { + merkle_tree: Pubkey::new_unique(), + queue: Pubkey::new_unique(), + is_rolledover: false, + tree_type: TreeType::StateV1, + }; + + let work_item = WorkItem { + tree_account, + queue_item_data: QueueItemData { + hash: [0u8; 32], + index: 0, + }, + }; + + assert!(!work_item.is_address_tree()); + assert!(work_item.is_state_tree()); + } + + #[test] + fn test_work_report_creation() { + let report = WorkReport { + epoch: 42, + processed_items: 100, + }; + + assert_eq!(report.epoch, 42); + assert_eq!(report.processed_items, 100); + } +} diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index c6f745a526..affc04f4e4 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -196,6 +196,11 @@ pub async fn fetch_forester_status(args: &StatusArgs) -> crate::Result<()> { run_queue_info(config.clone(), &trees, TreeType::AddressV2).await?; for tree in &trees { + // Skip rolled-over trees + if tree.is_rolledover { + continue; + } + let tree_type = format!("{}", tree.tree_type); let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type).await?; let fullness_percentage = tree_info.fullness * 100.0; @@ -248,12 +253,16 @@ pub async fn fetch_forester_status(args: &StatusArgs) -> crate::Result<()> { let protocol_config = protocol_config_pdas[0].clone(); + // Filter out rolled-over trees + let active_trees: Vec = + trees.iter().filter(|t| !t.is_rolledover).cloned().collect(); + if !active_epoch_foresters.is_empty() && current_epoch_pda_entry.is_some() { print_current_forester_assignments( slot, current_active_epoch, active_epoch_foresters, - &trees, + &active_trees, current_epoch_pda_entry, &protocol_config, ); diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 006441d63b..f3e61a8768 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -19,6 +19,7 @@ pub mod telemetry; pub mod tree_data_sync; pub mod tree_finder; pub mod utils; +pub mod work_coordinator; use std::{sync::Arc, time::Duration}; @@ -62,7 +63,7 @@ pub async fn run_queue_info( .unwrap(); let trees: Vec<_> = trees .iter() - .filter(|t| t.tree_type == queue_type) + .filter(|t| t.tree_type == queue_type && !t.is_rolledover) .sorted_by_key(|t| t.merkle_tree.to_string()) .cloned() .collect(); diff --git a/forester/src/processor/v1/helpers.rs b/forester/src/processor/v1/helpers.rs index 753014dd5d..ddd22362c4 100644 --- a/forester/src/processor/v1/helpers.rs +++ b/forester/src/processor/v1/helpers.rs @@ -168,6 +168,97 @@ pub async fn fetch_proofs_and_create_instructions( // Process state proofs and create instructions for (item, proof) in state_items.iter().zip(state_proofs.into_iter()) { proofs.push(MerkleProofType::StateProof(proof.clone())); + + let _debug = false; + if _debug { + let onchain_account = rpc + .get_account(item.tree_account.merkle_tree) + .await? + .ok_or_else(|| { + anyhow::anyhow!("Tree account {} not found", item.tree_account.merkle_tree) + })?; + let onchain_tree = match account_compression::state_merkle_tree_from_bytes_zero_copy( + &onchain_account.data, + ) { + Ok(tree) => tree, + Err(e) => { + tracing::error!( + "Failed to deserialize onchain tree {}: {}", + item.tree_account.merkle_tree, + e + ); + return Err(anyhow::anyhow!("Failed to deserialize onchain tree: {}", e)); + } + }; + + let onchain_root = onchain_tree.root(); + let onchain_root_index = onchain_tree.root_index(); + let onchain_changelog_index = onchain_tree.changelog_index(); + + tracing::info!( + "Creating nullify instruction for tree {}: hash={}, leaf_index={}, root_seq={}, changelog_index={}, indexer_root={}", + item.tree_account.merkle_tree, + bs58::encode(&item.queue_item_data.hash).into_string(), + proof.leaf_index, + proof.root_seq, + proof.root_seq % STATE_MERKLE_TREE_CHANGELOG, + bs58::encode(&proof.root).into_string() + ); + + tracing::info!( + "Onchain tree {} state: current_root={}, root_index={}, changelog_index={}", + item.tree_account.merkle_tree, + bs58::encode(&onchain_root).into_string(), + onchain_root_index, + onchain_changelog_index + ); + + let capacity = onchain_tree.roots.capacity(); + let first_index = onchain_tree.roots.first_index(); + + let root_history: Vec = onchain_tree + .roots + .iter() + .enumerate() + .map(|(offset, root)| { + let buffer_index = (first_index + offset) % capacity.max(1); + format!("#{buffer_index}: {}", bs58::encode(root).into_string()) + }) + .collect(); + + tracing::info!( + "Onchain root history (len={}, capacity={}): {:?}", + onchain_tree.roots.len(), + capacity, + root_history, + ); + + let indexer_root_position = + onchain_tree + .roots + .iter() + .enumerate() + .find_map(|(offset, root)| { + (root == &proof.root).then_some((first_index + offset) % capacity.max(1)) + }); + + tracing::info!( + "Indexer root {} present_at_buffer_index={:?}", + bs58::encode(&proof.root).into_string(), + indexer_root_position, + ); + + if indexer_root_position.is_none() { + return Err(anyhow::anyhow!( + "Indexer root {} not found in onchain root history for tree {}. Current root: {}, root_index: {}", + bs58::encode(&proof.root).into_string(), + item.tree_account.merkle_tree, + bs58::encode(&onchain_root).into_string(), + onchain_root_index + )); + } + } + let instruction = create_nullify_instruction( CreateNullifyInstructionInputs { nullifier_queue: item.tree_account.queue, diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index a3ed6704d8..555710c8d9 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -351,7 +351,7 @@ async fn execute_transaction_chunk_sending( } TransactionSendResult::Failure(err, sig_opt) => { if let Some(sig) = sig_opt { - warn!(tx.signature = %sig, error = ?err, "Transaction failed to send"); + error!(tx.signature = %sig, error = ?err, "Transaction failed to send"); } else { error!(error = ?err, "Transaction failed to send, no signature available"); } diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index 1784f2c39d..08464a0e66 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -519,6 +519,18 @@ impl BatchProcessor { .push(merkle_tree.hash_chain_stores[batch_index as usize][i as usize]); } + debug!( + "Extracted {} hash chains from on-chain merkle tree. batch_index={}, num_inserted_zkps={}, current_zkp_batch_index={}", + leaves_hash_chains.len(), + batch_index, + num_inserted_zkps, + current_zkp_batch_index + ); + if !leaves_hash_chains.is_empty() { + debug!("First hash chain: {:?}", leaves_hash_chains.first()); + debug!("Last hash chain: {:?}", leaves_hash_chains.last()); + } + let parsed_data = ParsedMerkleTreeData { next_index: merkle_tree.next_index, current_root: *merkle_tree.root_history.last().unwrap(), @@ -527,6 +539,7 @@ impl BatchProcessor { pending_batch_index: batch_index as u32, num_inserted_zkps, current_zkp_batch_index, + batch_start_index: batch.start_index, leaves_hash_chains, }; diff --git a/forester/src/slot_tracker.rs b/forester/src/slot_tracker.rs index 49c96cad71..8fb6179c75 100644 --- a/forester/src/slot_tracker.rs +++ b/forester/src/slot_tracker.rs @@ -83,33 +83,34 @@ pub async fn wait_until_slot_reached( ) -> crate::Result<()> { trace!("Waiting for slot {}", target_slot); + const MAX_SLEEP_SLOTS: u64 = 50; // ~20 seconds max sleep between checks + loop { - let current_estimated_slot = slot_tracker.estimated_current_slot(); + let actual_slot = rpc.get_slot().await?; + slot_tracker.update(actual_slot); - if current_estimated_slot >= target_slot { - // Double-check with actual RPC call - let actual_slot = rpc.get_slot().await?; - if actual_slot >= target_slot { - break; - } + if actual_slot >= target_slot { + trace!("Slot {} reached (actual: {})", target_slot, actual_slot); + break; } - let sleep_duration = if current_estimated_slot < target_slot { - let slots_to_wait = target_slot - current_estimated_slot; - Duration::from_secs_f64(slots_to_wait as f64 * slot_duration().as_secs_f64()) - } else { - slot_duration() - }; + let slots_remaining = target_slot.saturating_sub(actual_slot); + + let sleep_slots = slots_remaining.min(MAX_SLEEP_SLOTS); + let sleep_duration = + Duration::from_secs_f64(sleep_slots as f64 * slot_duration().as_secs_f64()); trace!( - "Estimated slot: {}, waiting for {} seconds", - current_estimated_slot, + "Current slot: {}, target slot: {}, sleeping for {} slots ({:.1} seconds)", + actual_slot, + target_slot, + sleep_slots, sleep_duration.as_secs_f64() ); + tokio::task::yield_now().await; sleep(sleep_duration).await; } - trace!("Slot {} reached", target_slot); Ok(()) } diff --git a/forester/src/tree_data_sync.rs b/forester/src/tree_data_sync.rs index 8e939047f9..16203077cb 100644 --- a/forester/src/tree_data_sync.rs +++ b/forester/src/tree_data_sync.rs @@ -53,6 +53,11 @@ fn process_address_account(account: &Account, pubkey: Pubkey) -> Result Result { + light_account_checks::checks::check_discriminator::(&account.data) + .map_err(|_| AccountDeserializationError::BatchStateMerkleTree { + error: "Invalid discriminator".to_string(), + })?; + let tree_account = BatchedMerkleTreeAccount::state_from_bytes(&mut account.data, &pubkey.into()).map_err( |e| AccountDeserializationError::BatchStateMerkleTree { @@ -67,6 +72,11 @@ fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result< } fn process_batch_address_account(account: &mut Account, pubkey: Pubkey) -> Result { + light_account_checks::checks::check_discriminator::(&account.data) + .map_err(|_| AccountDeserializationError::BatchAddressMerkleTree { + error: "Invalid discriminator".to_string(), + })?; + let tree_account = BatchedMerkleTreeAccount::address_from_bytes(&mut account.data, &pubkey.into()).map_err( |e| AccountDeserializationError::BatchAddressMerkleTree { diff --git a/forester/src/work_coordinator.rs b/forester/src/work_coordinator.rs new file mode 100644 index 0000000000..0c6bd529b3 --- /dev/null +++ b/forester/src/work_coordinator.rs @@ -0,0 +1,213 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use anyhow::{anyhow, Context, Result}; +use light_compressed_account::QueueType; +use solana_sdk::pubkey::Pubkey; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::StreamExt; +use tonic::transport::Channel; +use tracing::{debug, error, info, trace, warn}; + +// Generated protobuf code +pub mod proto { + tonic::include_proto!("photon"); +} + +use proto::{queue_service_client::QueueServiceClient, SubscribeQueueUpdatesRequest}; + +/// Message sent to tree tasks when queue updates occur +#[derive(Debug, Clone)] +pub struct QueueUpdateMessage { + pub tree: Pubkey, + pub queue: Pubkey, + pub queue_type: QueueType, + pub queue_size: u64, + pub slot: u64, + pub update_type: proto::UpdateType, +} + +#[derive(Debug)] +pub struct WorkCoordinator { + grpc_client: RwLock>, + tree_notifiers: Arc>>>, + connection_healthy: Arc, + photon_grpc_url: String, +} + +impl WorkCoordinator { + pub async fn new(photon_grpc_url: String) -> Result { + info!("Connecting to Photon gRPC at {}", photon_grpc_url); + + let grpc_client = QueueServiceClient::connect(photon_grpc_url.clone()) + .await + .context("Failed to connect to Photon gRPC service")?; + + info!("Successfully connected to Photon gRPC"); + + Ok(Self { + grpc_client: RwLock::new(grpc_client), + tree_notifiers: Arc::new(RwLock::new(HashMap::new())), + connection_healthy: Arc::new(AtomicBool::new(false)), + photon_grpc_url, + }) + } + + pub async fn register_tree(&self, tree_pubkey: Pubkey) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(100); + self.tree_notifiers.write().await.insert(tree_pubkey, tx); + debug!("Registered tree {} for queue updates", tree_pubkey); + rx + } + + pub async fn unregister_tree(&self, tree_pubkey: &Pubkey) { + self.tree_notifiers.write().await.remove(tree_pubkey); + debug!("Unregistered tree {}", tree_pubkey); + } + + pub async fn run_dispatcher(self: Arc) -> Result<()> { + let mut reconnect_delay = Duration::from_secs(1); + const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); + + loop { + match self.dispatch_loop().await { + Ok(()) => { + warn!("gRPC stream ended; attempting to reconnect…"); + self.connection_healthy.store(false, Ordering::Relaxed); + tokio::time::sleep(reconnect_delay).await; + let _ = self.reconnect().await; + reconnect_delay = Duration::from_secs(1); + continue; + } + Err(e) => { + error!("gRPC dispatcher error: {:?}", e); + self.connection_healthy.store(false, Ordering::Relaxed); + + warn!("Reconnecting in {:?}...", reconnect_delay); + tokio::time::sleep(reconnect_delay).await; + reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); + + match self.reconnect().await { + Ok(()) => { + info!("Successfully reconnected to Photon gRPC"); + reconnect_delay = Duration::from_secs(1); + } + Err(e) => { + error!("Failed to reconnect: {:?}", e); + } + } + } + } + } + } + + async fn reconnect(&self) -> Result<()> { + let new_client = QueueServiceClient::connect(self.photon_grpc_url.clone()) + .await + .context("Failed to reconnect to Photon gRPC service")?; + *self.grpc_client.write().await = new_client; + Ok(()) + } + + async fn dispatch_loop(&self) -> Result<()> { + info!("Starting gRPC queue update subscription"); + + let request = SubscribeQueueUpdatesRequest { + trees: vec![], + send_initial_state: true, + }; + + let mut stream = self + .grpc_client + .read() + .await + .clone() + .subscribe_queue_updates(request) + .await + .context("Failed to subscribe to queue updates")? + .into_inner(); + + self.connection_healthy.store(true, Ordering::Relaxed); + info!("gRPC subscription established successfully"); + + while let Some(update_result) = stream.next().await { + let update = update_result.context("Error receiving queue update")?; + + let queue_info = update + .queue_info + .ok_or_else(|| anyhow!("Missing queue_info in update"))?; + + let tree_pubkey = queue_info + .tree + .parse::() + .context("Failed to parse tree pubkey")?; + + let queue_pubkey = queue_info + .queue + .parse::() + .context("Failed to parse queue pubkey")?; + + let queue_type = QueueType::from(queue_info.queue_type as u64); + + let update_type = proto::UpdateType::try_from(update.update_type) + .unwrap_or(proto::UpdateType::Unspecified); + + let message = QueueUpdateMessage { + tree: tree_pubkey, + queue: queue_pubkey, + queue_type, + queue_size: queue_info.queue_size, + slot: update.slot, + update_type, + }; + + let notifiers = self.tree_notifiers.read().await; + if let Some(tx) = notifiers.get(&tree_pubkey) { + match tx.try_send(message.clone()) { + Ok(()) => { + trace!( + "Routed update to tree {}: {} items (type: {:?})", + tree_pubkey, + message.queue_size, + queue_type + ); + } + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "Tree {} channel full, dropping update (tree processing slower than updates)", + tree_pubkey + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!("Tree {} channel closed (task likely finished)", tree_pubkey); + } + } + } else { + trace!("Received update for unregistered tree {}", tree_pubkey); + } + } + + warn!("gRPC stream ended"); + self.connection_healthy.store(false, Ordering::Relaxed); + Ok(()) + } + + pub fn is_healthy(&self) -> bool { + self.connection_healthy.load(Ordering::Relaxed) + } + + pub async fn registered_tree_count(&self) -> usize { + self.tree_notifiers.read().await.len() + } + + pub async fn shutdown(&self) { + info!("Shutting down WorkCoordinator"); + self.connection_healthy.store(false, Ordering::Relaxed); + } +} diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 526533970f..873b8f4261 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -121,6 +121,13 @@ fn get_photon_api_key() -> Option { } } +fn get_photon_grpc_url() -> Option { + match TestMode::from_env() { + TestMode::Local => Some("http://localhost:50051".to_string()), + TestMode::Devnet => env::var("PHOTON_GRPC_URL").ok(), + } +} + fn get_prover_api_key() -> Option { match TestMode::from_env() { TestMode::Local => None, @@ -202,6 +209,7 @@ async fn e2e_test() { prover_address_append_url: None, prover_api_key: get_prover_api_key(), photon_api_key: get_photon_api_key(), + photon_grpc_url: get_photon_grpc_url(), pushgateway_url: None, pagerduty_routing_key: None, rpc_rate_limit: None, @@ -250,6 +258,7 @@ async fn e2e_test() { "../target/deploy/create_address_test_program.so".to_string(), )], limit_ledger_size: None, + grpc_port: Some(50051), })) .await; spawn_prover().await; diff --git a/forester/tests/legacy/batched_address_test.rs b/forester/tests/legacy/batched_address_test.rs index 7b6db499d7..fcf328b960 100644 --- a/forester/tests/legacy/batched_address_test.rs +++ b/forester/tests/legacy/batched_address_test.rs @@ -40,6 +40,7 @@ async fn test_address_batched() { "../target/deploy/create_address_test_program.so".to_string(), )], limit_ledger_size: None, + grpc_port: None, })) .await; let tree_params = InitAddressTreeAccountsInstructionData::test_default(); diff --git a/forester/tests/legacy/batched_state_async_indexer_test.rs b/forester/tests/legacy/batched_state_async_indexer_test.rs index 38c7b95362..935d7e6492 100644 --- a/forester/tests/legacy/batched_state_async_indexer_test.rs +++ b/forester/tests/legacy/batched_state_async_indexer_test.rs @@ -82,6 +82,7 @@ async fn test_state_indexer_async_batched() { wait_time: 30, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; spawn_prover().await; diff --git a/forester/tests/legacy/batched_state_indexer_test.rs b/forester/tests/legacy/batched_state_indexer_test.rs index 32d3a229a8..fe3ca8d3ab 100644 --- a/forester/tests/legacy/batched_state_indexer_test.rs +++ b/forester/tests/legacy/batched_state_indexer_test.rs @@ -43,6 +43,7 @@ async fn test_state_indexer_batched() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; diff --git a/forester/tests/legacy/batched_state_test.rs b/forester/tests/legacy/batched_state_test.rs index 559af5b181..2559954e59 100644 --- a/forester/tests/legacy/batched_state_test.rs +++ b/forester/tests/legacy/batched_state_test.rs @@ -47,6 +47,7 @@ async fn test_state_batched() { wait_time: 30, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; diff --git a/forester/tests/legacy/e2e_test.rs b/forester/tests/legacy/e2e_test.rs index 9dc712eb3f..8c0789ddd1 100644 --- a/forester/tests/legacy/e2e_test.rs +++ b/forester/tests/legacy/e2e_test.rs @@ -39,6 +39,7 @@ async fn test_epoch_monitor_with_2_foresters() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; let forester_keypair1 = Keypair::new(); @@ -385,6 +386,7 @@ async fn test_epoch_double_registration() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; diff --git a/forester/tests/legacy/e2e_v1_test.rs b/forester/tests/legacy/e2e_v1_test.rs index 050ece14af..092e4dd419 100644 --- a/forester/tests/legacy/e2e_v1_test.rs +++ b/forester/tests/legacy/e2e_v1_test.rs @@ -40,6 +40,7 @@ async fn test_e2e_v1() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; let forester_keypair1 = Keypair::new(); @@ -382,6 +383,7 @@ async fn test_epoch_double_registration() { wait_time: 90, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index bd502d527a..d7f8358414 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -52,6 +52,7 @@ async fn test_priority_fee_request() { photon_api_key: Some( std::env::var("PHOTON_API_KEY").expect("PHOTON_API_KEY must be set in environment"), ), + photon_grpc_url: None, indexer_batch_size: 50, indexer_max_concurrent_batches: 10, legacy_ixs_per_tx: 1, diff --git a/forester/tests/test_batch_append_spent.rs b/forester/tests/test_batch_append_spent.rs index 38caf3ab02..015a44ba34 100644 --- a/forester/tests/test_batch_append_spent.rs +++ b/forester/tests/test_batch_append_spent.rs @@ -50,6 +50,7 @@ async fn test_batch_sequence() { wait_time: 10, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, })) .await; diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index c97bf8c28d..409df4d339 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -89,6 +89,7 @@ pub fn forester_config() -> ForesterConfig { prover_address_append_url: None, prover_api_key: None, photon_api_key: None, + photon_grpc_url: None, pushgateway_url: None, pagerduty_routing_key: None, rpc_rate_limit: None, diff --git a/program-tests/compressed-token-test/tests/v1.rs b/program-tests/compressed-token-test/tests/v1.rs index 8a3d86527e..287aa2089d 100644 --- a/program-tests/compressed-token-test/tests/v1.rs +++ b/program-tests/compressed-token-test/tests/v1.rs @@ -5443,6 +5443,7 @@ async fn test_transfer_with_photon_and_batched_tree() { wait_time: 15, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, }) .await; diff --git a/program-tests/system-cpi-v2-test/tests/event.rs b/program-tests/system-cpi-v2-test/tests/event.rs index 47625f9ef9..6c7c13a0fe 100644 --- a/program-tests/system-cpi-v2-test/tests/event.rs +++ b/program-tests/system-cpi-v2-test/tests/event.rs @@ -538,6 +538,7 @@ async fn generate_photon_test_data_multiple_events() { "../../target/deploy/create_address_test_program.so".to_string(), )], limit_ledger_size: None, + grpc_port: None, }) .await; diff --git a/prover/client/src/proof_types/batch_address_append/proof_inputs.rs b/prover/client/src/proof_types/batch_address_append/proof_inputs.rs index 0b61a3e199..baeba1bb03 100644 --- a/prover/client/src/proof_types/batch_address_append/proof_inputs.rs +++ b/prover/client/src/proof_types/batch_address_append/proof_inputs.rs @@ -191,6 +191,24 @@ pub fn get_batch_address_append_circuit_inputs( bigint_to_be_bytes_array::<32>(&next_index.into()).unwrap(), ]; + for (idx, ((low_value, new_value), high_value)) in patched_low_element_values + .iter() + .zip(new_element_values.iter()) + .zip(patched_low_element_next_values.iter()) + .enumerate() + { + let low = BigUint::from_bytes_be(low_value); + let new = BigUint::from_bytes_be(new_value); + let high = BigUint::from_bytes_be(high_value); + + if !(low < new && new < high) { + return Err(ProverClientError::GenericError(format!( + "Invalid address ordering at batch position {} (low = {:#x}, new = {:#x}, high = {:#x})", + idx, low, new, high + ))); + } + } + let public_input_hash = create_hash_chain_from_array(hash_chain_inputs)?; Ok(BatchAddressAppendInputs { diff --git a/scripts/devenv/versions.sh b/scripts/devenv/versions.sh index a9204c1667..b38cc9c033 100755 --- a/scripts/devenv/versions.sh +++ b/scripts/devenv/versions.sh @@ -13,7 +13,7 @@ export SOLANA_VERSION="2.2.15" export ANCHOR_VERSION="0.31.1" export JQ_VERSION="1.8.0" export PHOTON_VERSION="0.51.0" -export PHOTON_COMMIT="ad49094e59195c664a683b7c7814e26563640d57" +export PHOTON_COMMIT="06862b290f32025bc150f82a4acba4961ee24178" export REDIS_VERSION="8.0.1" export ANCHOR_TAG="anchor-v${ANCHOR_VERSION}" diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index 0d886dab5f..8d80086990 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -1655,7 +1655,7 @@ impl Indexer for PhotonIndexer { value: QueueElementsResult { elements: proofs, first_value_queue_index: Some( - api_result.first_value_queue_index as u64, + api_result.first_value_queue_index, ), }, }) diff --git a/sdk-libs/client/src/lib.rs b/sdk-libs/client/src/lib.rs index 5ab761c25e..39394c9e63 100644 --- a/sdk-libs/client/src/lib.rs +++ b/sdk-libs/client/src/lib.rs @@ -41,6 +41,7 @@ //! wait_time: 75, //! sbf_programs: vec![], //! limit_ledger_size: None, +//! grpc_port: None, //! }; //! spawn_validator(config).await; //! diff --git a/sdk-libs/client/src/local_test_validator.rs b/sdk-libs/client/src/local_test_validator.rs index 2d46ba7e72..0418cf5de1 100644 --- a/sdk-libs/client/src/local_test_validator.rs +++ b/sdk-libs/client/src/local_test_validator.rs @@ -9,6 +9,7 @@ pub struct LightValidatorConfig { pub wait_time: u64, pub sbf_programs: Vec<(String, String)>, pub limit_ledger_size: Option, + pub grpc_port: Option, } impl Default for LightValidatorConfig { @@ -19,6 +20,7 @@ impl Default for LightValidatorConfig { wait_time: 35, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, } } } @@ -46,6 +48,10 @@ pub async fn spawn_validator(config: LightValidatorConfig) { path.push_str(" --skip-prover"); } + if let Some(grpc_port) = config.grpc_port { + path.push_str(&format!(" --grpc-port {}", grpc_port)); + } + println!("Starting validator with command: {}", path); let child = Command::new("sh") diff --git a/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs b/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs index 656c76e39f..d09de2c0cb 100644 --- a/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs +++ b/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs @@ -15,7 +15,7 @@ pub struct GetQueueElementsPost200ResponseResult { #[serde(rename = "context")] pub context: Box, #[serde(rename = "firstValueQueueIndex")] - pub first_value_queue_index: u16, + pub first_value_queue_index: u64, #[serde(rename = "value")] pub value: Vec, } @@ -23,7 +23,7 @@ pub struct GetQueueElementsPost200ResponseResult { impl GetQueueElementsPost200ResponseResult { pub fn new( context: models::Context, - first_value_queue_index: u16, + first_value_queue_index: u64, value: Vec, ) -> GetQueueElementsPost200ResponseResult { GetQueueElementsPost200ResponseResult { diff --git a/sdk-libs/photon-api/src/models/account_context.rs b/sdk-libs/photon-api/src/models/account_context.rs index 971342d90e..b8cc31ca2a 100644 --- a/sdk-libs/photon-api/src/models/account_context.rs +++ b/sdk-libs/photon-api/src/models/account_context.rs @@ -24,7 +24,7 @@ pub struct AccountContext { rename = "nullifierQueueIndex", skip_serializing_if = "Option::is_none" )] - pub nullifier_queue_index: Option, + pub nullifier_queue_index: Option, /// A Solana public key represented as a base58 string. #[serde(rename = "queue")] pub queue: String, diff --git a/sdk-tests/client-test/tests/light_client.rs b/sdk-tests/client-test/tests/light_client.rs index 7ee8ba84a2..7f82532f10 100644 --- a/sdk-tests/client-test/tests/light_client.rs +++ b/sdk-tests/client-test/tests/light_client.rs @@ -56,6 +56,7 @@ async fn test_all_endpoints() { wait_time: 10, sbf_programs: vec![], limit_ledger_size: None, + grpc_port: None, }; spawn_validator(config).await;