From bf3a8e50107784e2d7e890adfc93acdaa77b2d6d Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Thu, 6 Nov 2025 15:57:14 +0000 Subject: [PATCH 01/12] feat: `--grpc-port` cli flag - add `--grpc-port` flag for test-validator CLI command. - Enabled configurable logging via `RUST_LOG` for spawned processes. --- cli/src/commands/test-validator/index.ts | 6 ++++++ cli/src/utils/initTestEnv.ts | 2 +- cli/src/utils/process.ts | 4 ++++ cli/src/utils/processPhotonIndexer.ts | 3 +++ 4 files changed, 14 insertions(+), 1 deletion(-) diff --git a/cli/src/commands/test-validator/index.ts b/cli/src/commands/test-validator/index.ts index f201ee2e6e..10558b1c26 100644 --- a/cli/src/commands/test-validator/index.ts +++ b/cli/src/commands/test-validator/index.ts @@ -69,6 +69,12 @@ class SetupCommand extends Command { default: 8784, exclusive: ["skip-indexer"], }), + "grpc-port": Flags.integer({ + description: "Enable Photon indexer gRPC on this port.", + required: false, + default: 50051, + exclusive: ["skip-indexer"], + }), "prover-port": Flags.integer({ description: "Enable Light Prover server on this port.", required: false, diff --git a/cli/src/utils/initTestEnv.ts b/cli/src/utils/initTestEnv.ts index e28994b107..b361f52591 100644 --- a/cli/src/utils/initTestEnv.ts +++ b/cli/src/utils/initTestEnv.ts @@ -1,4 +1,3 @@ -import { airdropSol } from "@lightprotocol/stateless.js"; import { getConfig, getPayer, setAnchorProvider, setConfig } from "./utils"; import { BASE_PATH, @@ -131,6 +130,7 @@ export async function initTestEnv({ await startIndexer( `http://127.0.0.1:${rpcPort}`, indexerPort, + grpcPort, checkPhotonVersion, photonDatabaseUrl, grpcPort, diff --git a/cli/src/utils/process.ts b/cli/src/utils/process.ts index 9956b15058..ffdc4003e5 100644 --- a/cli/src/utils/process.ts +++ b/cli/src/utils/process.ts @@ -212,6 +212,10 @@ export function spawnBinary(command: string, args: string[] = []) { stdio: ["ignore", out, err], shell: false, detached: true, + env: { + ...process.env, + RUST_LOG: process.env.RUST_LOG || "debug", + }, }); spawnedProcess.on("close", async (code) => { diff --git a/cli/src/utils/processPhotonIndexer.ts b/cli/src/utils/processPhotonIndexer.ts index aa75bb37ba..7a03ebcc40 100644 --- a/cli/src/utils/processPhotonIndexer.ts +++ b/cli/src/utils/processPhotonIndexer.ts @@ -39,6 +39,7 @@ function getPhotonInstallMessage(): string { export async function startIndexer( rpcUrl: string, indexerPort: number, + grpcPort: number = 50051, checkPhotonVersion: boolean = true, photonDatabaseUrl?: string, grpcPort: number = 50051, @@ -56,6 +57,8 @@ export async function startIndexer( const args: string[] = [ "--port", indexerPort.toString(), + "--grpc-port", + grpcPort.toString(), "--rpc-url", rpcUrl, "--grpc-port", From dea0920cebe64062abfbe771162d74dcbc1753af Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 7 Nov 2025 09:45:55 +0000 Subject: [PATCH 02/12] feat: `get_queue_elements` input + output queue --- .../src/instructions/state_batch_append.rs | 11 +- .../src/instructions/state_batch_nullify.rs | 10 +- program-tests/utils/src/e2e_test_env.rs | 8 +- .../utils/src/test_batch_forester.rs | 9 +- sdk-libs/client/src/indexer/indexer_trait.rs | 13 +- sdk-libs/client/src/indexer/photon_indexer.rs | 82 ++++++-- sdk-libs/client/src/indexer/types.rs | 6 +- sdk-libs/client/src/rpc/indexer.rs | 15 +- ...queue_elements_post_200_response_result.rs | 44 +++- ..._get_queue_elements_post_request_params.rs | 45 +++-- .../program-test/src/indexer/test_indexer.rs | 190 +++++++++++------- .../program-test/src/program_test/indexer.rs | 15 +- 12 files changed, 293 insertions(+), 155 deletions(-) diff --git a/forester-utils/src/instructions/state_batch_append.rs b/forester-utils/src/instructions/state_batch_append.rs index 25f2787666..32d72d14ff 100644 --- a/forester-utils/src/instructions/state_batch_append.rs +++ b/forester-utils/src/instructions/state_batch_append.rs @@ -9,7 +9,6 @@ use light_batched_merkle_tree::{ use light_client::{indexer::Indexer, rpc::Rpc}; use light_compressed_account::instruction_data::compressed_proof::CompressedProof; use light_hasher::bigint::bigint_to_be_bytes_array; -use light_merkle_tree_metadata::QueueType; use light_prover_client::{ proof_client::ProofClient, proof_types::batch_append::{get_batch_append_inputs, BatchAppendsCircuitInputs}, @@ -120,9 +119,10 @@ pub async fn get_append_instruction_stream<'a, R: Rpc>( indexer .get_queue_elements( merkle_tree_pubkey.to_bytes(), - QueueType::OutputStateV2, - zkp_batch_size, next_queue_index, + Some(zkp_batch_size), + None, + None, None, ) .await @@ -130,8 +130,8 @@ pub async fn get_append_instruction_stream<'a, R: Rpc>( let (batch_elements, batch_first_queue_idx) = match queue_elements_result { Ok(res) => { - let items = res.value.elements; - let first_idx = res.value.first_value_queue_index; + let items = res.value.output_queue_elements.unwrap_or_default(); + let first_idx = res.value.output_queue_index; if items.len() != zkp_batch_size as usize { warn!( "Got {} elements but expected {}, stopping", @@ -238,6 +238,5 @@ pub async fn get_append_instruction_stream<'a, R: Rpc>( yield Ok(proofs_buffer); } }; - Ok((Box::pin(stream), zkp_batch_size)) } diff --git a/forester-utils/src/instructions/state_batch_nullify.rs b/forester-utils/src/instructions/state_batch_nullify.rs index 35aab74a6e..b7bc5bd4d6 100644 --- a/forester-utils/src/instructions/state_batch_nullify.rs +++ b/forester-utils/src/instructions/state_batch_nullify.rs @@ -9,7 +9,6 @@ use light_batched_merkle_tree::{ use light_client::{indexer::Indexer, rpc::Rpc}; use light_compressed_account::instruction_data::compressed_proof::CompressedProof; use light_hasher::bigint::bigint_to_be_bytes_array; -use light_merkle_tree_metadata::QueueType; use light_prover_client::{ proof_client::ProofClient, proof_types::batch_update::{get_batch_update_inputs, BatchUpdateCircuitInputs}, @@ -113,9 +112,10 @@ pub async fn get_nullify_instruction_stream<'a, R: Rpc>( let indexer = connection.indexer_mut()?; indexer.get_queue_elements( merkle_tree_pubkey.to_bytes(), - QueueType::InputStateV2, - zkp_batch_size, + None, + None, next_queue_index, + Some(zkp_batch_size), None, ) .await @@ -123,8 +123,8 @@ pub async fn get_nullify_instruction_stream<'a, R: Rpc>( let (batch_elements, batch_first_queue_idx) = match queue_elements_result { Ok(res) => { - let items = res.value.elements; - let first_idx = res.value.first_value_queue_index; + let items = res.value.input_queue_elements.unwrap_or_default(); + let first_idx = res.value.input_queue_index; if items.len() != zkp_batch_size as usize { warn!( "Got {} elements but expected {}, stopping", diff --git a/program-tests/utils/src/e2e_test_env.rs b/program-tests/utils/src/e2e_test_env.rs index c912e7e2aa..aca8fc6596 100644 --- a/program-tests/utils/src/e2e_test_env.rs +++ b/program-tests/utils/src/e2e_test_env.rs @@ -121,7 +121,6 @@ use light_hasher::{bigint::bigint_to_be_bytes_array, Poseidon}; use light_indexed_merkle_tree::{ array::IndexedArray, reference::IndexedMerkleTree, HIGHEST_ADDRESS_PLUS_ONE, }; -use light_merkle_tree_metadata::QueueType; use light_program_test::{ accounts::{ state_tree::create_state_merkle_tree_and_queue_account, test_accounts::TestAccounts, @@ -746,15 +745,16 @@ where .indexer .get_queue_elements( merkle_tree_pubkey.to_bytes(), - QueueType::AddressV2, - batch.batch_size as u16, + None, + Some(batch.batch_size as u16), + None, None, None, ) .await .unwrap(); let addresses = - addresses.value.elements.iter().map(|x| x.account_hash).collect::>(); + addresses.value.output_queue_elements.unwrap_or_default().iter().map(|x| x.account_hash).collect::>(); // // local_leaves_hash_chain is only used for a test assertion. // let local_nullifier_hash_chain = create_hash_chain_from_array(&addresses); // assert_eq!(leaves_hash_chain, local_nullifier_hash_chain); diff --git a/program-tests/utils/src/test_batch_forester.rs b/program-tests/utils/src/test_batch_forester.rs index 6bc14569a5..f737207ad9 100644 --- a/program-tests/utils/src/test_batch_forester.rs +++ b/program-tests/utils/src/test_batch_forester.rs @@ -23,7 +23,6 @@ use light_batched_merkle_tree::{ use light_client::rpc::{Rpc, RpcError}; use light_compressed_account::{ hash_chain::create_hash_chain_from_slice, instruction_data::compressed_proof::CompressedProof, - QueueType, }; use light_hasher::{bigint::bigint_to_be_bytes_array, Poseidon}; use light_prover_client::{ @@ -654,8 +653,9 @@ pub async fn create_batch_update_address_tree_instruction_data_with_proof>(); diff --git a/sdk-libs/client/src/indexer/indexer_trait.rs b/sdk-libs/client/src/indexer/indexer_trait.rs index 7e94ebd0d2..ab08965ecf 100644 --- a/sdk-libs/client/src/indexer/indexer_trait.rs +++ b/sdk-libs/client/src/indexer/indexer_trait.rs @@ -1,5 +1,4 @@ use async_trait::async_trait; -use light_merkle_tree_metadata::QueueType; use solana_pubkey::Pubkey; use super::{ @@ -187,16 +186,18 @@ pub trait Indexer: std::marker::Send + std::marker::Sync { // TODO: in different pr: // replace num_elements & start_queue_index with PaginatedOptions // - return type should be ItemsWithCursor - /// Returns queue elements from the queue with the given merkle tree pubkey. For input - /// queues account compression program does not store queue elements in the + /// Returns queue elements from the queue with the given merkle tree pubkey. + /// Can fetch from output queue (append), input queue (nullify), or both atomically. + /// For input queues account compression program does not store queue elements in the /// account data but only emits these in the public transaction event. The /// indexer needs the queue elements to create batch update proofs. async fn get_queue_elements( &mut self, merkle_tree_pubkey: [u8; 32], - queue_type: QueueType, - num_elements: u16, - start_queue_index: Option, + output_queue_start_index: Option, + output_queue_limit: Option, + input_queue_start_index: Option, + input_queue_limit: Option, config: Option, ) -> Result, IndexerError>; diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index 8d80086990..b2dcb4984b 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -2,7 +2,6 @@ use std::{fmt::Debug, time::Duration}; use async_trait::async_trait; use bs58; -use light_merkle_tree_metadata::QueueType; use photon_api::{ apis::configuration::{ApiKey, Configuration}, models::GetCompressedAccountsByOwnerPostRequestParams, @@ -1581,9 +1580,10 @@ impl Indexer for PhotonIndexer { async fn get_queue_elements( &mut self, _pubkey: [u8; 32], - _queue_type: QueueType, - _num_elements: u16, - _start_queue_index: Option, + _output_queue_start_index: Option, + _output_queue_limit: Option, + _input_queue_start_index: Option, + _input_queue_limit: Option, _config: Option, ) -> Result, IndexerError> { #[cfg(not(feature = "v2"))] @@ -1592,18 +1592,20 @@ impl Indexer for PhotonIndexer { { use super::MerkleProofWithContext; let pubkey = _pubkey; - let queue_type = _queue_type; - let limit = _num_elements; - let start_queue_index = _start_queue_index; + let output_queue_start_index = _output_queue_start_index; + let output_queue_limit = _output_queue_limit; + let input_queue_start_index = _input_queue_start_index; + let input_queue_limit = _input_queue_limit; let config = _config.unwrap_or_default(); self.retry(config.retry_config, || async { let request: photon_api::models::GetQueueElementsPostRequest = photon_api::models::GetQueueElementsPostRequest { params: Box::from(photon_api::models::GetQueueElementsPostRequestParams { tree: bs58::encode(pubkey).into_string(), - queue_type: queue_type as u16, - limit, - start_queue_index, + output_queue_start_index, + output_queue_limit, + input_queue_start_index, + input_queue_limit, }), ..Default::default() }; @@ -1619,8 +1621,11 @@ impl Indexer for PhotonIndexer { if api_result.context.slot < config.slot { return Err(IndexerError::IndexerNotSyncedToSlot); } - let response = api_result.value; - let proofs: Vec = response + + // Parse output queue elements + let output_queue_elements = + api_result.output_queue_elements.map(|elements| { + elements .iter() .map(|x| { let proof = x @@ -1631,9 +1636,12 @@ impl Indexer for PhotonIndexer { let root = Hash::from_base58(&x.root).unwrap(); let leaf = Hash::from_base58(&x.leaf).unwrap(); let merkle_tree = Hash::from_base58(&x.tree).unwrap(); - let tx_hash = - x.tx_hash.as_ref().map(|x| Hash::from_base58(x).unwrap()); - let account_hash = Hash::from_base58(&x.account_hash).unwrap(); + let tx_hash = x + .tx_hash + .as_ref() + .map(|x| Hash::from_base58(x).unwrap()); + let account_hash = + Hash::from_base58(&x.account_hash).unwrap(); MerkleProofWithContext { proof, @@ -1646,17 +1654,53 @@ impl Indexer for PhotonIndexer { account_hash, } }) + .collect() + }); + + // Parse input queue elements + let input_queue_elements = + api_result.input_queue_elements.map(|elements| { + elements + .iter() + .map(|x| { + let proof = x + .proof + .iter() + .map(|x| Hash::from_base58(x).unwrap()) .collect(); + let root = Hash::from_base58(&x.root).unwrap(); + let leaf = Hash::from_base58(&x.leaf).unwrap(); + let merkle_tree = Hash::from_base58(&x.tree).unwrap(); + let tx_hash = x + .tx_hash + .as_ref() + .map(|x| Hash::from_base58(x).unwrap()); + let account_hash = + Hash::from_base58(&x.account_hash).unwrap(); + + MerkleProofWithContext { + proof, + root, + leaf_index: x.leaf_index, + leaf, + merkle_tree, + root_seq: x.root_seq, + tx_hash, + account_hash, + } + }) + .collect() + }); Ok(Response { context: Context { slot: api_result.context.slot, }, value: QueueElementsResult { - elements: proofs, - first_value_queue_index: Some( - api_result.first_value_queue_index, - ), + output_queue_elements, + output_queue_index: api_result.output_queue_index, + input_queue_elements, + input_queue_index: api_result.input_queue_index, }, }) } diff --git a/sdk-libs/client/src/indexer/types.rs b/sdk-libs/client/src/indexer/types.rs index d0c062a5bb..d88d51d1cf 100644 --- a/sdk-libs/client/src/indexer/types.rs +++ b/sdk-libs/client/src/indexer/types.rs @@ -31,8 +31,10 @@ pub type Hash = [u8; 32]; #[derive(Debug, Clone, PartialEq, Default)] pub struct QueueElementsResult { - pub elements: Vec, - pub first_value_queue_index: Option, + pub output_queue_elements: Option>, + pub output_queue_index: Option, + pub input_queue_elements: Option>, + pub input_queue_index: Option, } #[derive(Debug, Clone, PartialEq, Default)] diff --git a/sdk-libs/client/src/rpc/indexer.rs b/sdk-libs/client/src/rpc/indexer.rs index 8548ef7206..acafeac398 100644 --- a/sdk-libs/client/src/rpc/indexer.rs +++ b/sdk-libs/client/src/rpc/indexer.rs @@ -1,5 +1,4 @@ use async_trait::async_trait; -use light_compressed_account::QueueType; use solana_pubkey::Pubkey; use super::LightClient; @@ -205,9 +204,10 @@ impl Indexer for LightClient { async fn get_queue_elements( &mut self, merkle_tree_pubkey: [u8; 32], - queue_type: QueueType, - num_elements: u16, - start_queue_index: Option, + output_queue_start_index: Option, + output_queue_limit: Option, + input_queue_start_index: Option, + input_queue_limit: Option, config: Option, ) -> Result, IndexerError> { Ok(self @@ -216,9 +216,10 @@ impl Indexer for LightClient { .ok_or(IndexerError::NotInitialized)? .get_queue_elements( merkle_tree_pubkey, - queue_type, - num_elements, - start_queue_index, + output_queue_start_index, + output_queue_limit, + input_queue_start_index, + input_queue_limit, config, ) .await?) diff --git a/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs b/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs index d09de2c0cb..b37d9cda17 100644 --- a/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs +++ b/sdk-libs/photon-api/src/models/_get_queue_elements_post_200_response_result.rs @@ -14,22 +14,44 @@ use crate::models; pub struct GetQueueElementsPost200ResponseResult { #[serde(rename = "context")] pub context: Box, - #[serde(rename = "firstValueQueueIndex")] - pub first_value_queue_index: u64, - #[serde(rename = "value")] - pub value: Vec, + + #[serde( + rename = "outputQueueElements", + default, + skip_serializing_if = "Option::is_none" + )] + pub output_queue_elements: Option>, + + #[serde( + rename = "outputQueueIndex", + default, + skip_serializing_if = "Option::is_none" + )] + pub output_queue_index: Option, + + #[serde( + rename = "inputQueueElements", + default, + skip_serializing_if = "Option::is_none" + )] + pub input_queue_elements: Option>, + + #[serde( + rename = "inputQueueIndex", + default, + skip_serializing_if = "Option::is_none" + )] + pub input_queue_index: Option, } impl GetQueueElementsPost200ResponseResult { - pub fn new( - context: models::Context, - first_value_queue_index: u64, - value: Vec, - ) -> GetQueueElementsPost200ResponseResult { + pub fn new(context: models::Context) -> GetQueueElementsPost200ResponseResult { GetQueueElementsPost200ResponseResult { context: Box::new(context), - first_value_queue_index, - value, + output_queue_elements: None, + output_queue_index: None, + input_queue_elements: None, + input_queue_index: None, } } } diff --git a/sdk-libs/photon-api/src/models/_get_queue_elements_post_request_params.rs b/sdk-libs/photon-api/src/models/_get_queue_elements_post_request_params.rs index 778a50578a..64e34e74ef 100644 --- a/sdk-libs/photon-api/src/models/_get_queue_elements_post_request_params.rs +++ b/sdk-libs/photon-api/src/models/_get_queue_elements_post_request_params.rs @@ -12,28 +12,47 @@ use crate::models; #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] pub struct GetQueueElementsPostRequestParams { - #[serde(rename = "limit")] - pub limit: u16, - #[serde(rename = "queueType")] - pub queue_type: u16, + /// A 32-byte hash represented as a base58 string. + #[serde(rename = "tree")] + pub tree: String, + #[serde( - rename = "startQueueIndex", + rename = "outputQueueStartIndex", default, skip_serializing_if = "Option::is_none" )] - pub start_queue_index: Option, - /// A 32-byte hash represented as a base58 string. - #[serde(rename = "tree")] - pub tree: String, + pub output_queue_start_index: Option, + + #[serde( + rename = "outputQueueLimit", + default, + skip_serializing_if = "Option::is_none" + )] + pub output_queue_limit: Option, + + #[serde( + rename = "inputQueueStartIndex", + default, + skip_serializing_if = "Option::is_none" + )] + pub input_queue_start_index: Option, + + #[serde( + rename = "inputQueueLimit", + default, + skip_serializing_if = "Option::is_none" + )] + pub input_queue_limit: Option, } impl GetQueueElementsPostRequestParams { - pub fn new(limit: u16, queue_type: u16, tree: String) -> GetQueueElementsPostRequestParams { + pub fn new(tree: String) -> GetQueueElementsPostRequestParams { GetQueueElementsPostRequestParams { - limit, - queue_type, - start_queue_index: None, tree, + output_queue_start_index: None, + output_queue_limit: None, + input_queue_start_index: None, + input_queue_limit: None, } } } diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index 0a91049ba3..7dfa69cd32 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -42,7 +42,6 @@ use light_compressed_account::{ }; use light_event::event::PublicTransactionEvent; use light_hasher::{bigint::bigint_to_be_bytes_array, Poseidon}; -use light_merkle_tree_metadata::QueueType; use light_merkle_tree_reference::MerkleTree; use light_prover_client::{ constants::{PROVE_PATH, SERVER_ADDRESS}, @@ -619,9 +618,10 @@ impl Indexer for TestIndexer { async fn get_queue_elements( &mut self, _merkle_tree_pubkey: [u8; 32], - _queue_type: QueueType, - _num_elements: u16, - _start_offset: Option, + _output_queue_start_index: Option, + _output_queue_limit: Option, + _input_queue_start_index: Option, + _input_queue_limit: Option, _config: Option, ) -> Result, IndexerError> { #[cfg(not(feature = "v2"))] @@ -629,56 +629,83 @@ impl Indexer for TestIndexer { #[cfg(feature = "v2")] { let merkle_tree_pubkey = _merkle_tree_pubkey; - let queue_type = _queue_type; - let num_elements = _num_elements; + let output_queue_start_index = _output_queue_start_index.unwrap_or(0); + let output_queue_limit = _output_queue_limit; + let input_queue_start_index = _input_queue_start_index.unwrap_or(0); + let input_queue_limit = _input_queue_limit; let pubkey = Pubkey::new_from_array(merkle_tree_pubkey); + + // Check if this is an address tree let address_tree_bundle = self .address_merkle_trees .iter() .find(|x| x.accounts.merkle_tree == pubkey); if let Some(address_tree_bundle) = address_tree_bundle { - let end_offset = std::cmp::min( - num_elements as usize, - address_tree_bundle.queue_elements.len(), - ); - let queue_elements = address_tree_bundle.queue_elements[0..end_offset].to_vec(); + // For address trees, return output queue only + let output_queue_elements = if let Some(limit) = output_queue_limit { + let start = output_queue_start_index as usize; + let end = std::cmp::min( + start + limit as usize, + address_tree_bundle.queue_elements.len(), + ); + let queue_elements = address_tree_bundle.queue_elements[start..end].to_vec(); + + let merkle_proofs_with_context = queue_elements + .iter() + .enumerate() + .map(|(i, element)| MerkleProofWithContext { + proof: Vec::new(), + leaf: [0u8; 32], + leaf_index: 0, + merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), + root: address_tree_bundle.root(), + tx_hash: None, + root_seq: output_queue_start_index + i as u64, + account_hash: *element, + }) + .collect(); + Some(merkle_proofs_with_context) + } else { + None + }; + + let output_queue_index = if output_queue_elements.is_some() { + Some(output_queue_start_index) + } else { + None + }; - let merkle_proofs_with_context = queue_elements - .iter() - .map(|element| MerkleProofWithContext { - proof: Vec::new(), - leaf: [0u8; 32], - leaf_index: 0, - merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), - root: address_tree_bundle.root(), - tx_hash: None, - root_seq: 0, - account_hash: *element, - }) - .collect(); return Ok(Response { context: Context { slot: self.get_current_slot(), }, value: QueueElementsResult { - elements: merkle_proofs_with_context, - first_value_queue_index: None, + output_queue_elements, + output_queue_index, + input_queue_elements: None, + input_queue_index: None, }, }); } + // Check if this is a state tree let state_tree_bundle = self .state_merkle_trees .iter_mut() .find(|x| x.accounts.merkle_tree == pubkey); - if queue_type == QueueType::InputStateV2 { - if let Some(state_tree_bundle) = state_tree_bundle { - let end_offset = std::cmp::min( - num_elements as usize, + + if let Some(state_tree_bundle) = state_tree_bundle { + // For state trees, return both input and output queues + + // Build input queue elements if requested + let input_queue_elements = if let Some(limit) = input_queue_limit { + let start = input_queue_start_index as usize; + let end = std::cmp::min( + start + limit as usize, state_tree_bundle.input_leaf_indices.len(), ); - let queue_elements = - state_tree_bundle.input_leaf_indices[0..end_offset].to_vec(); + let queue_elements = state_tree_bundle.input_leaf_indices[start..end].to_vec(); + let merkle_proofs = queue_elements .iter() .map(|leaf_info| { @@ -705,6 +732,7 @@ impl Indexer for TestIndexer { } }) .collect::>(); + let leaves = queue_elements .iter() .map(|leaf_info| { @@ -714,6 +742,7 @@ impl Indexer for TestIndexer { .unwrap_or_default() }) .collect::>(); + let merkle_proofs_with_context = merkle_proofs .iter() .zip(queue_elements.iter()) @@ -730,30 +759,26 @@ impl Indexer for TestIndexer { }) .collect(); - return Ok(Response { - context: Context { - slot: self.get_current_slot(), - }, - value: QueueElementsResult { - elements: merkle_proofs_with_context, - first_value_queue_index: None, - }, - }); - } - } + Some(merkle_proofs_with_context) + } else { + None + }; - if queue_type == QueueType::OutputStateV2 { - if let Some(state_tree_bundle) = state_tree_bundle { - let end_offset = std::cmp::min( - num_elements as usize, + // Build output queue elements if requested + let output_queue_elements = if let Some(limit) = output_queue_limit { + let start = output_queue_start_index as usize; + let end = std::cmp::min( + start + limit as usize, state_tree_bundle.output_queue_elements.len(), ); let queue_elements = - state_tree_bundle.output_queue_elements[0..end_offset].to_vec(); + state_tree_bundle.output_queue_elements[start..end].to_vec(); + let indices = queue_elements .iter() .map(|(_, index)| index) .collect::>(); + let merkle_proofs = indices .iter() .map(|index| { @@ -780,6 +805,7 @@ impl Indexer for TestIndexer { } }) .collect::>(); + let leaves = indices .iter() .map(|index| { @@ -789,6 +815,7 @@ impl Indexer for TestIndexer { .unwrap_or_default() }) .collect::>(); + let merkle_proofs_with_context = merkle_proofs .iter() .zip(queue_elements.iter()) @@ -804,20 +831,41 @@ impl Indexer for TestIndexer { account_hash: *element, }) .collect(); - return Ok(Response { - context: Context { - slot: self.get_current_slot(), - }, - value: QueueElementsResult { - elements: merkle_proofs_with_context, - first_value_queue_index: if queue_elements.is_empty() { - None - } else { - Some(queue_elements[0].1) - }, - }, - }); - } + + Some(merkle_proofs_with_context) + } else { + None + }; + + let output_queue_index = if output_queue_elements.is_some() + && output_queue_start_index + < state_tree_bundle.output_queue_elements.len() as u64 + { + Some( + state_tree_bundle.output_queue_elements[output_queue_start_index as usize] + .1, + ) + } else { + None + }; + + let input_queue_index = if input_queue_elements.is_some() { + Some(input_queue_start_index) + } else { + None + }; + + let slot = self.get_current_slot(); + + return Ok(Response { + context: Context { slot }, + value: QueueElementsResult { + output_queue_elements, + output_queue_index, + input_queue_elements, + input_queue_index, + }, + }); } Err(IndexerError::InvalidParameters( @@ -902,8 +950,9 @@ impl Indexer for TestIndexer { let address_proof_items = self .get_queue_elements( merkle_tree_pubkey.to_bytes(), - QueueType::AddressV2, - zkp_batch_size, + Some(0), + Some(zkp_batch_size), + None, None, None, ) @@ -911,8 +960,11 @@ impl Indexer for TestIndexer { .map_err(|_| IndexerError::Unknown("Failed to get queue elements".into()))? .value; - let addresses: Vec = address_proof_items - .elements + let output_elements = address_proof_items + .output_queue_elements + .ok_or(IndexerError::Unknown("No output queue elements".into()))?; + + let addresses: Vec = output_elements .iter() .enumerate() .map(|(i, proof)| AddressQueueIndex { @@ -923,11 +975,7 @@ impl Indexer for TestIndexer { let non_inclusion_proofs = self .get_multiple_new_address_proofs( merkle_tree_pubkey.to_bytes(), - address_proof_items - .elements - .iter() - .map(|x| x.account_hash) - .collect(), + output_elements.iter().map(|x| x.account_hash).collect(), None, ) .await diff --git a/sdk-libs/program-test/src/program_test/indexer.rs b/sdk-libs/program-test/src/program_test/indexer.rs index 9020961b19..86191b1972 100644 --- a/sdk-libs/program-test/src/program_test/indexer.rs +++ b/sdk-libs/program-test/src/program_test/indexer.rs @@ -7,7 +7,6 @@ use light_client::indexer::{ OwnerBalance, PaginatedOptions, QueueElementsResult, Response, RetryConfig, SignatureWithMetadata, TokenBalance, ValidityProofWithContext, }; -use light_compressed_account::QueueType; use solana_sdk::pubkey::Pubkey; use crate::program_test::LightProgramTest; @@ -201,9 +200,10 @@ impl Indexer for LightProgramTest { async fn get_queue_elements( &mut self, merkle_tree_pubkey: [u8; 32], - queue_type: QueueType, - num_elements: u16, - start_queue_index: Option, + output_queue_start_index: Option, + output_queue_limit: Option, + input_queue_start_index: Option, + input_queue_limit: Option, config: Option, ) -> Result, IndexerError> { Ok(self @@ -212,9 +212,10 @@ impl Indexer for LightProgramTest { .ok_or(IndexerError::NotInitialized)? .get_queue_elements( merkle_tree_pubkey, - queue_type, - num_elements, - start_queue_index, + output_queue_start_index, + output_queue_limit, + input_queue_start_index, + input_queue_limit, config, ) .await?) From 0c35d3892d608e0f9c09c5818e1d6b65a1647073 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 12:55:12 +0000 Subject: [PATCH 03/12] feat: add grpc_port configuration to tests --- sdk-libs/client/src/indexer/photon_indexer.rs | 90 ++++++++++--------- .../program-test/src/indexer/test_indexer.rs | 15 +--- 2 files changed, 51 insertions(+), 54 deletions(-) diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index b2dcb4984b..ca9fcc64c3 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -1623,62 +1623,65 @@ impl Indexer for PhotonIndexer { } // Parse output queue elements - let output_queue_elements = - api_result.output_queue_elements.map(|elements| { + let output_queue_elements = api_result + .output_queue_elements + .map(|elements| { elements - .iter() - .map(|x| { - let proof = x - .proof .iter() - .map(|x| Hash::from_base58(x).unwrap()) - .collect(); - let root = Hash::from_base58(&x.root).unwrap(); - let leaf = Hash::from_base58(&x.leaf).unwrap(); - let merkle_tree = Hash::from_base58(&x.tree).unwrap(); + .map(|x| -> Result<_, IndexerError> { + let proof: Vec = x + .proof + .iter() + .map(|p| Hash::from_base58(p)) + .collect::, _>>()?; + let root = Hash::from_base58(&x.root)?; + let leaf = Hash::from_base58(&x.leaf)?; + let merkle_tree = Hash::from_base58(&x.tree)?; let tx_hash = x .tx_hash .as_ref() - .map(|x| Hash::from_base58(x).unwrap()); - let account_hash = - Hash::from_base58(&x.account_hash).unwrap(); - - MerkleProofWithContext { - proof, - root, - leaf_index: x.leaf_index, - leaf, - merkle_tree, - root_seq: x.root_seq, - tx_hash, - account_hash, - } + .map(|h| Hash::from_base58(h)) + .transpose()?; + let account_hash = Hash::from_base58(&x.account_hash)?; + + Ok(MerkleProofWithContext { + proof, + root, + leaf_index: x.leaf_index, + leaf, + merkle_tree, + root_seq: x.root_seq, + tx_hash, + account_hash, + }) + }) + .collect::, _>>() }) - .collect() - }); + .transpose()?; // Parse input queue elements - let input_queue_elements = - api_result.input_queue_elements.map(|elements| { + let input_queue_elements = api_result + .input_queue_elements + .map(|elements| { elements .iter() - .map(|x| { - let proof = x + .map(|x| -> Result<_, IndexerError> { + let proof: Vec = x .proof .iter() - .map(|x| Hash::from_base58(x).unwrap()) - .collect(); - let root = Hash::from_base58(&x.root).unwrap(); - let leaf = Hash::from_base58(&x.leaf).unwrap(); - let merkle_tree = Hash::from_base58(&x.tree).unwrap(); + .map(|p| Hash::from_base58(p)) + .collect::, _>>()?; + let root = Hash::from_base58(&x.root)?; + let leaf = Hash::from_base58(&x.leaf)?; + let merkle_tree = Hash::from_base58(&x.tree)?; let tx_hash = x .tx_hash .as_ref() - .map(|x| Hash::from_base58(x).unwrap()); - let account_hash = - Hash::from_base58(&x.account_hash).unwrap(); + .map(|h| Hash::from_base58(h)) + .transpose()?; + let account_hash = Hash::from_base58(&x.account_hash)?; - MerkleProofWithContext { + Ok(MerkleProofWithContext { proof, root, leaf_index: x.leaf_index, @@ -1687,10 +1690,11 @@ impl Indexer for PhotonIndexer { root_seq: x.root_seq, tx_hash, account_hash, - } }) - .collect() - }); + }) + .collect::, _>>() + }) + .transpose()?; Ok(Response { context: Context { diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index 7dfa69cd32..b5ae74e3d0 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -652,15 +652,14 @@ impl Indexer for TestIndexer { let merkle_proofs_with_context = queue_elements .iter() - .enumerate() - .map(|(i, element)| MerkleProofWithContext { + .map(|element| MerkleProofWithContext { proof: Vec::new(), leaf: [0u8; 32], leaf_index: 0, merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), root: address_tree_bundle.root(), tx_hash: None, - root_seq: output_queue_start_index + i as u64, + root_seq: output_queue_start_index, account_hash: *element, }) .collect(); @@ -837,14 +836,8 @@ impl Indexer for TestIndexer { None }; - let output_queue_index = if output_queue_elements.is_some() - && output_queue_start_index - < state_tree_bundle.output_queue_elements.len() as u64 - { - Some( - state_tree_bundle.output_queue_elements[output_queue_start_index as usize] - .1, - ) + let output_queue_index = if output_queue_elements.is_some() { + Some(output_queue_start_index) } else { None }; From d877458cda8ab1f9797cdf4d711534897166873f Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 13:13:47 +0000 Subject: [PATCH 04/12] cleanup --- cli/src/commands/test-validator/index.ts | 6 ------ cli/src/utils/processPhotonIndexer.ts | 3 --- sdk-libs/client/src/indexer/photon_indexer.rs | 4 ++-- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/cli/src/commands/test-validator/index.ts b/cli/src/commands/test-validator/index.ts index 10558b1c26..dee831c578 100644 --- a/cli/src/commands/test-validator/index.ts +++ b/cli/src/commands/test-validator/index.ts @@ -81,12 +81,6 @@ class SetupCommand extends Command { default: 3001, exclusive: ["skip-prover"], }), - "grpc-port": Flags.integer({ - description: "Enable Photon indexer gRPC on this port.", - required: false, - default: 50051, - exclusive: ["skip-indexer"], - }), "limit-ledger-size": Flags.integer({ description: "Keep this amount of shreds in root slots.", required: false, diff --git a/cli/src/utils/processPhotonIndexer.ts b/cli/src/utils/processPhotonIndexer.ts index 7a03ebcc40..5e31ec861a 100644 --- a/cli/src/utils/processPhotonIndexer.ts +++ b/cli/src/utils/processPhotonIndexer.ts @@ -42,7 +42,6 @@ export async function startIndexer( grpcPort: number = 50051, checkPhotonVersion: boolean = true, photonDatabaseUrl?: string, - grpcPort: number = 50051, ) { await killIndexer(); const resolvedOrNull = which.sync("photon", { nothrow: true }); @@ -61,8 +60,6 @@ export async function startIndexer( grpcPort.toString(), "--rpc-url", rpcUrl, - "--grpc-port", - grpcPort.toString(), ]; if (photonDatabaseUrl) { args.push("--db-url", photonDatabaseUrl); diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index ca9fcc64c3..ec70157acd 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -1653,7 +1653,7 @@ impl Indexer for PhotonIndexer { root_seq: x.root_seq, tx_hash, account_hash, - }) + }) }) .collect::, _>>() }) @@ -1690,7 +1690,7 @@ impl Indexer for PhotonIndexer { root_seq: x.root_seq, tx_hash, account_hash, - }) + }) }) .collect::, _>>() }) From 023568c9fc5e188a5953e6974a03662be644391e Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 13:44:09 +0000 Subject: [PATCH 05/12] cleanup --- cli/src/utils/initTestEnv.ts | 1 - cli/src/utils/processPhotonIndexer.ts | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cli/src/utils/initTestEnv.ts b/cli/src/utils/initTestEnv.ts index b361f52591..e298f7c2cc 100644 --- a/cli/src/utils/initTestEnv.ts +++ b/cli/src/utils/initTestEnv.ts @@ -133,7 +133,6 @@ export async function initTestEnv({ grpcPort, checkPhotonVersion, photonDatabaseUrl, - grpcPort, ); } diff --git a/cli/src/utils/processPhotonIndexer.ts b/cli/src/utils/processPhotonIndexer.ts index 5e31ec861a..2f7b1c33df 100644 --- a/cli/src/utils/processPhotonIndexer.ts +++ b/cli/src/utils/processPhotonIndexer.ts @@ -56,10 +56,10 @@ export async function startIndexer( const args: string[] = [ "--port", indexerPort.toString(), - "--grpc-port", - grpcPort.toString(), "--rpc-url", rpcUrl, + "--grpc-port", + grpcPort.toString(), ]; if (photonDatabaseUrl) { args.push("--db-url", photonDatabaseUrl); From e78a06065b198f46818ebef6bb8b4d59c28c5bca Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 14:25:33 +0000 Subject: [PATCH 06/12] feat: add sleep configuration options to GeneralConfig --- forester/src/config.rs | 12 ++++++++++++ forester/src/epoch_manager.rs | 11 ++++------- forester/tests/e2e_test.rs | 2 ++ forester/tests/legacy/test_utils.rs | 3 +++ forester/tests/test_utils.rs | 2 ++ 5 files changed, 23 insertions(+), 7 deletions(-) diff --git a/forester/src/config.rs b/forester/src/config.rs index d4369ff31d..9cd2eab8e2 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -83,6 +83,8 @@ pub struct GeneralConfig { pub skip_v2_state_trees: bool, pub skip_v2_address_trees: bool, pub tree_id: Option, + pub sleep_after_processing_ms: u64, + pub sleep_when_idle_ms: u64, } impl Default for GeneralConfig { @@ -96,6 +98,8 @@ impl Default for GeneralConfig { skip_v2_state_trees: false, skip_v2_address_trees: false, tree_id: None, + sleep_after_processing_ms: 10_000, + sleep_when_idle_ms: 45_000, } } } @@ -111,6 +115,8 @@ impl GeneralConfig { skip_v2_state_trees: true, skip_v2_address_trees: false, tree_id: None, + sleep_after_processing_ms: 50, + sleep_when_idle_ms: 100, } } @@ -124,6 +130,8 @@ impl GeneralConfig { skip_v2_state_trees: false, skip_v2_address_trees: true, tree_id: None, + sleep_after_processing_ms: 50, + sleep_when_idle_ms: 100, } } } @@ -276,6 +284,8 @@ impl ForesterConfig { .tree_id .as_ref() .and_then(|id| Pubkey::from_str(id).ok()), + sleep_after_processing_ms: 10_000, + sleep_when_idle_ms: 45_000, }, rpc_pool_config: RpcPoolConfig { max_size: args.rpc_pool_size, @@ -332,6 +342,8 @@ impl ForesterConfig { skip_v1_address_trees: false, skip_v2_address_trees: false, tree_id: None, + sleep_after_processing_ms: 10_000, + sleep_when_idle_ms: 45_000, }, rpc_pool_config: RpcPoolConfig { max_size: 10, diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 4d2fcaa752..c40b7d00d5 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -1260,11 +1260,6 @@ impl EpochManager { forester_slot_details.start_solana_slot, ) .await?; - info!( - "Slot {} started, beginning processing", - forester_slot_details.slot - ); - let mut estimated_slot = self.slot_tracker.estimated_current_slot(); 'inner_processing_loop: loop { @@ -1334,9 +1329,9 @@ impl EpochManager { estimated_slot = self.slot_tracker.estimated_current_slot(); let sleep_duration_ms = if items_processed_this_iteration > 0 { - 10_000 + self.config.general_config.sleep_after_processing_ms } else { - 45_000 + self.config.general_config.sleep_when_idle_ms }; tokio::time::sleep(Duration::from_millis(sleep_duration_ms)).await; @@ -2184,6 +2179,8 @@ mod tests { skip_v2_state_trees: skip_v2_state, skip_v2_address_trees: skip_v2_address, tree_id: None, + sleep_after_processing_ms: 50, + sleep_when_idle_ms: 100, }, rpc_pool_config: Default::default(), registry_pubkey: Pubkey::default(), diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 873b8f4261..9c3b3047d9 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -231,6 +231,8 @@ async fn e2e_test() { skip_v1_address_trees: false, skip_v2_address_trees: false, tree_id: None, + sleep_after_processing_ms: 50, + sleep_when_idle_ms: 100, }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/forester/tests/legacy/test_utils.rs b/forester/tests/legacy/test_utils.rs index a93a410b64..ad0d7a6666 100644 --- a/forester/tests/legacy/test_utils.rs +++ b/forester/tests/legacy/test_utils.rs @@ -94,6 +94,9 @@ pub fn forester_config() -> ForesterConfig { skip_v2_state_trees: false, skip_v1_address_trees: false, skip_v2_address_trees: false, + tree_id: None, + sleep_after_processing_ms: 50, + sleep_when_idle_ms: 100, }, rpc_pool_config: RpcPoolConfig { max_size: 50, diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index 409df4d339..c60030a1f1 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -109,6 +109,8 @@ pub fn forester_config() -> ForesterConfig { skip_v1_address_trees: false, skip_v2_address_trees: false, tree_id: None, + sleep_after_processing_ms: 50, + sleep_when_idle_ms: 100, }, rpc_pool_config: RpcPoolConfig { max_size: 50, From cf16f62dc355a563ecffd703bd4aa8696f4d1e74 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 14:53:15 +0000 Subject: [PATCH 07/12] update prover version to 2.0.6 --- prover/server/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prover/server/main.go b/prover/server/main.go index b0eac6e726..1b7122ffb7 100644 --- a/prover/server/main.go +++ b/prover/server/main.go @@ -21,7 +21,7 @@ import ( "github.com/urfave/cli/v2" ) -const Version = "2.0.0" +const Version = "2.0.6" func main() { runCli() From b292967717cee116e916f820ace66cc7536d9c7d Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 14:53:22 +0000 Subject: [PATCH 08/12] chore: update PHOTON_COMMIT to the latest version --- scripts/devenv/versions.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/devenv/versions.sh b/scripts/devenv/versions.sh index b38cc9c033..37dc48a1ad 100755 --- a/scripts/devenv/versions.sh +++ b/scripts/devenv/versions.sh @@ -13,7 +13,7 @@ export SOLANA_VERSION="2.2.15" export ANCHOR_VERSION="0.31.1" export JQ_VERSION="1.8.0" export PHOTON_VERSION="0.51.0" -export PHOTON_COMMIT="06862b290f32025bc150f82a4acba4961ee24178" +export PHOTON_COMMIT="94b3688b08477668bb946a689b0267319f5c1ae1" export REDIS_VERSION="8.0.1" export ANCHOR_TAG="anchor-v${ANCHOR_VERSION}" From 8a0fd3c5a5425aca8e65952fdf5b733229d386bb Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 7 Nov 2025 09:45:55 +0000 Subject: [PATCH 09/12] feat: `get_queue_elements` input + output queue --- sdk-libs/client/src/indexer/photon_indexer.rs | 24 ++++---- .../program-test/src/indexer/test_indexer.rs | 58 ++++++------------- 2 files changed, 31 insertions(+), 51 deletions(-) diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index ec70157acd..b05c7b1173 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -1664,11 +1664,11 @@ impl Indexer for PhotonIndexer { .input_queue_elements .map(|elements| { elements - .iter() + .iter() .map(|x| -> Result<_, IndexerError> { let proof: Vec = x - .proof - .iter() + .proof + .iter() .map(|p| Hash::from_base58(p)) .collect::, _>>()?; let root = Hash::from_base58(&x.root)?; @@ -1682,15 +1682,15 @@ impl Indexer for PhotonIndexer { let account_hash = Hash::from_base58(&x.account_hash)?; Ok(MerkleProofWithContext { - proof, - root, - leaf_index: x.leaf_index, - leaf, - merkle_tree, - root_seq: x.root_seq, - tx_hash, - account_hash, - }) + proof, + root, + leaf_index: x.leaf_index, + leaf, + merkle_tree, + root_seq: x.root_seq, + tx_hash, + account_hash, + }) }) .collect::, _>>() }) diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index b5ae74e3d0..284f03fba8 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -646,23 +646,23 @@ impl Indexer for TestIndexer { let start = output_queue_start_index as usize; let end = std::cmp::min( start + limit as usize, - address_tree_bundle.queue_elements.len(), - ); + address_tree_bundle.queue_elements.len(), + ); let queue_elements = address_tree_bundle.queue_elements[start..end].to_vec(); - let merkle_proofs_with_context = queue_elements - .iter() + let merkle_proofs_with_context = queue_elements + .iter() .map(|element| MerkleProofWithContext { - proof: Vec::new(), - leaf: [0u8; 32], - leaf_index: 0, - merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), - root: address_tree_bundle.root(), - tx_hash: None, + proof: Vec::new(), + leaf: [0u8; 32], + leaf_index: 0, + merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), + root: address_tree_bundle.root(), + tx_hash: None, root_seq: output_queue_start_index, - account_hash: *element, - }) - .collect(); + account_hash: *element, + }) + .collect(); Some(merkle_proofs_with_context) } else { None @@ -693,7 +693,7 @@ impl Indexer for TestIndexer { .iter_mut() .find(|x| x.accounts.merkle_tree == pubkey); - if let Some(state_tree_bundle) = state_tree_bundle { + if let Some(state_tree_bundle) = state_tree_bundle { // For state trees, return both input and output queues // Build input queue elements if requested @@ -830,36 +830,16 @@ impl Indexer for TestIndexer { account_hash: *element, }) .collect(); - - Some(merkle_proofs_with_context) - } else { - None - }; - - let output_queue_index = if output_queue_elements.is_some() { - Some(output_queue_start_index) - } else { - None - }; - - let input_queue_index = if input_queue_elements.is_some() { - Some(input_queue_start_index) - } else { - None - }; - - let slot = self.get_current_slot(); - - return Ok(Response { + return Ok(Response { context: Context { slot }, - value: QueueElementsResult { + value: QueueElementsResult { output_queue_elements, output_queue_index, input_queue_elements, input_queue_index, - }, - }); - } + }, + }); + } Err(IndexerError::InvalidParameters( "Merkle tree not found".to_string(), From dacb13a165bf8af158017585b7980b5bc5fdf90e Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Fri, 7 Nov 2025 10:37:10 +0000 Subject: [PATCH 10/12] feat: forester subscriber for gRPC queue updates --- Cargo.lock | 2 + forester/Cargo.toml | 4 +- forester/package.json | 2 +- forester/proto/photon.proto | 60 ++++++++ forester/src/epoch_manager.rs | 101 ++++++------- forester/src/grpc/mod.rs | 3 + forester/src/grpc/router.rs | 214 ++++++++++++++++++++++++++++ forester/src/lib.rs | 1 + forester/src/processor/v2/common.rs | 4 + 9 files changed, 332 insertions(+), 59 deletions(-) create mode 100644 forester/src/grpc/mod.rs create mode 100644 forester/src/grpc/router.rs diff --git a/Cargo.lock b/Cargo.lock index 2a86a3f4d2..b6b17aa310 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2297,9 +2297,11 @@ dependencies = [ "light-prover-client", "light-registry", "light-sdk", + "light-sparse-merkle-tree", "light-system-program-anchor", "light-test-utils", "num-bigint 0.4.6", + "once_cell", "photon-api", "prometheus", "prost", diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 40d9cb802b..a3922aff95 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -19,11 +19,13 @@ light-system-program-anchor = { workspace = true, features = ["cpi"] } light-hash-set = { workspace = true, features = ["solana"] } light-hasher = { workspace = true, features = ["poseidon"] } light-merkle-tree-reference = { workspace = true } +light-prover-client = { workspace = true } light-registry = { workspace = true } photon-api = { workspace = true } forester-utils = { workspace = true } light-client = { workspace = true, features = ["v2"] } light-merkle-tree-metadata = { workspace = true } +light-sparse-merkle-tree = { workspace = true } light-sdk = { workspace = true, features = ["anchor"] } light-program-test = { workspace = true } solana-transaction-status = { workspace = true } @@ -53,12 +55,12 @@ scopeguard = "1.2.0" itertools = "0.14.0" num-bigint = { workspace = true } -# gRPC client for Photon queue subscriptions (match Photon versions) tonic = "0.14.2" prost = "0.14.1" prost-types = "0.14.1" tonic-prost = "0.14.2" tokio-stream = { version = "0.1", features = ["sync"] } +once_cell = "1.21.3" [build-dependencies] tonic-prost-build = "0.14.2" diff --git a/forester/package.json b/forester/package.json index 667f754e84..cbe6f3613d 100644 --- a/forester/package.json +++ b/forester/package.json @@ -4,7 +4,7 @@ "license": "GPL-3.0", "scripts": { "build": "cargo build", - "test": "redis-start && RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester e2e_test -- --nocapture", + "test": "redis-start && TEST_MODE=local TEST_V1_STATE=true TEST_V2_STATE=true TEST_V1_ADDRESS=true TEST_V2_ADDRESS=true RUST_LOG=forester=debug,forester_utils=debug cargo test --package forester e2e_test -- --nocapture", "docker:build": "docker build --tag forester -f Dockerfile .." }, "devDependencies": { diff --git a/forester/proto/photon.proto b/forester/proto/photon.proto index 0069b4a09d..ca37ac6a41 100644 --- a/forester/proto/photon.proto +++ b/forester/proto/photon.proto @@ -4,28 +4,48 @@ package photon; // Queue information service service QueueService { +<<<<<<< HEAD // Get current queue information for all or specific trees rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse); // Subscribe to queue updates rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate); +======= + // Get current queue information for all or specific trees + rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse); + + // Subscribe to queue updates + rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate); +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Request message for GetQueueInfo message GetQueueInfoRequest { +<<<<<<< HEAD // Optional list of tree pubkeys to filter by (base58 encoded) // If empty, returns info for all trees repeated string trees = 1; +======= + // Optional list of tree pubkeys to filter by (base58 encoded) + // If empty, returns info for all trees + repeated string trees = 1; +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Response message for GetQueueInfo message GetQueueInfoResponse { +<<<<<<< HEAD repeated QueueInfo queues = 1; uint64 slot = 2; +======= + repeated QueueInfo queues = 1; + uint64 slot = 2; +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Information about a single queue message QueueInfo { +<<<<<<< HEAD // Tree public key (base58 encoded) string tree = 1; @@ -37,20 +57,43 @@ message QueueInfo { // Current number of items in the queue uint64 queue_size = 4; +======= + // Tree public key (base58 encoded) + string tree = 1; + + // Queue public key (base58 encoded) + string queue = 2; + + // Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2 + uint32 queue_type = 3; + + // Current number of items in the queue + uint64 queue_size = 4; +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Request message for SubscribeQueueUpdates message SubscribeQueueUpdatesRequest { +<<<<<<< HEAD // Optional list of tree pubkeys to subscribe to (base58 encoded) // If empty, subscribes to all trees repeated string trees = 1; // Whether to send initial state before streaming updates bool send_initial_state = 2; +======= + // Optional list of tree pubkeys to subscribe to (base58 encoded) + // If empty, subscribes to all trees + repeated string trees = 1; + + // Whether to send initial state before streaming updates + bool send_initial_state = 2; +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Streamed queue update message message QueueUpdate { +<<<<<<< HEAD // The queue that was updated QueueInfo queue_info = 1; @@ -59,12 +102,29 @@ message QueueUpdate { // Type of update UpdateType update_type = 3; +======= + // The queue that was updated + QueueInfo queue_info = 1; + + // Slot at which the update occurred + uint64 slot = 2; + + // Type of update + UpdateType update_type = 3; +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Type of queue update enum UpdateType { +<<<<<<< HEAD UPDATE_TYPE_UNSPECIFIED = 0; UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue +======= + UPDATE_TYPE_UNSPECIFIED = 0; + UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription + UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue + UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue +>>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index c40b7d00d5..27702539f4 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -43,6 +43,7 @@ use crate::{ errors::{ ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError, }, + grpc::{QueueUpdateMessage, QueueEventRouter}, metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, pagerduty::send_pagerduty_alert, processor::{ @@ -62,7 +63,6 @@ use crate::{ slot_tracker::{slot_duration, wait_until_slot_reached, SlotTracker}, tree_data_sync::fetch_trees, tree_finder::TreeFinder, - work_coordinator::{QueueUpdateMessage, WorkCoordinator}, ForesterConfig, ForesterEpochInfo, Result, }; @@ -107,8 +107,7 @@ pub struct EpochManager { new_tree_sender: broadcast::Sender, tx_cache: Arc>, ops_cache: Arc>, - /// Persistent WorkCoordinator for event-driven V2 processing (created at startup) - coordinator: Option>, + coordinator: Option>, } impl Clone for EpochManager { @@ -144,30 +143,24 @@ impl EpochManager { ops_cache: Arc>, ) -> Result { let coordinator = if let Some(url) = &config.external_services.photon_grpc_url { - info!( - "Creating persistent WorkCoordinator at startup for gRPC URL: {}", - url - ); - match WorkCoordinator::new(url.clone()).await { + match QueueEventRouter::new(url.clone()).await { Ok(coord) => { let coord_arc = Arc::new(coord); tokio::spawn({ let coord_clone = Arc::clone(&coord_arc); async move { - info!("Starting persistent WorkCoordinator dispatcher"); if let Err(e) = coord_clone.run_dispatcher().await { - error!("WorkCoordinator dispatcher error: {:?}", e); + error!("dispatcher error: {:?}", e); } } }); - info!("Persistent WorkCoordinator created successfully"); Some(coord_arc) } Err(e) => { warn!( - "Failed to create WorkCoordinator at startup: {:?}. V2 trees will use polling fallback.", + "{:?}. V2 trees will use polling fallback.", e ); None @@ -987,7 +980,7 @@ impl EpochManager { let has_channel = queue_update_rx.is_some(); info!( - "Creating thread for tree {} (type: {:?}, event_driven: {})", + "Creating thread for tree {} (type: {:?}, event: {})", tree.tree_accounts.merkle_tree, tree.tree_accounts.tree_type, has_channel ); @@ -997,7 +990,7 @@ impl EpochManager { let coordinator_clone = coordinator.clone(); let handle = tokio::spawn(async move { - let result = self_clone + self_clone .process_queue_v2( &epoch_info_clone.epoch, &epoch_info_clone.forester_epoch_pda, @@ -1005,13 +998,7 @@ impl EpochManager { queue_update_rx, coordinator_clone.clone(), ) - .await; - - if let Some(coord) = coordinator_clone { - coord.unregister_tree(&tree.tree_accounts.merkle_tree).await; - } - - result + .await }); handles.push(handle); @@ -1053,38 +1040,13 @@ impl EpochManager { mut tree_schedule: TreeForesterSchedule, ) -> Result<()> { let mut current_slot = self.slot_tracker.estimated_current_slot(); - - let total_slots = tree_schedule.slots.len(); - let eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(); - info!( - "Starting process_queue for tree {}: total_slots={}, eligible_slots={}, current_slot={}, active_phase_end={}", - tree_schedule.tree_accounts.merkle_tree, - total_slots, - eligible_slots, - current_slot, - epoch_info.phases.active.end - ); 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { - info!( - "Searching for next slot to process. Current slot: {}", - current_slot - ); let next_slot_to_process = tree_schedule .slots .iter_mut() .enumerate() .find_map(|(idx, opt_slot)| opt_slot.as_ref().map(|s| (idx, s.clone()))); - info!( - "Next slot to process: {:?}", - next_slot_to_process.as_ref().map(|(idx, slot)| ( - idx, - slot.slot, - slot.start_solana_slot, - slot.end_solana_slot - )) - ); - if let Some((slot_idx, light_slot_details)) = next_slot_to_process { match self .process_light_slot( @@ -1139,7 +1101,7 @@ impl EpochManager { epoch_pda: &ForesterEpochPda, mut tree_schedule: TreeForesterSchedule, mut queue_update_rx: Option>, - coordinator: Option>, + coordinator: Option>, ) -> Result<()> { let mut current_slot = self.slot_tracker.estimated_current_slot(); @@ -1148,12 +1110,10 @@ impl EpochManager { let tree_type = tree_schedule.tree_accounts.tree_type; info!( - "process_queue_v2 tree={}: type={:?}, total_slots={}, eligible_slots={}, event_driven={}, current_slot={}, active_phase_end={}", + "process_queue_v2 tree={}: type={:?}, total_slots={}, eligible_slots={}, event={}, current_slot={}, active_phase_end={}", tree_schedule.tree_accounts.merkle_tree, - tree_type, total_slots, eligible_slots, - queue_update_rx.is_some(), current_slot, epoch_info.phases.active.end ); @@ -1274,7 +1234,7 @@ impl EpochManager { let current_light_slot = (estimated_slot - epoch_info.phases.active.start) / epoch_pda.protocol_config.slot_length; if current_light_slot != forester_slot_details.slot { - warn!("Slot mismatch. Exiting processing for this slot."); + warn!("Light slot mismatch. Exiting processing for this slot."); break 'inner_processing_loop; } @@ -1299,6 +1259,7 @@ impl EpochManager { tree_accounts, forester_slot_details, estimated_slot, + None, ) .await { @@ -1351,7 +1312,7 @@ impl EpochManager { tree_accounts: &TreeAccounts, forester_slot_details: &ForesterSlot, queue_update_rx: &mut mpsc::Receiver, - coordinator: Option>, + coordinator: Option>, ) -> Result<()> { info!( "Processing V2 light slot {} ({}-{})", @@ -1418,6 +1379,7 @@ impl EpochManager { tree_accounts, forester_slot_details, estimated_slot, + Some(&update), // Pass gRPC queue update hint ).await { Ok(count) => { if count > 0 { @@ -1453,6 +1415,7 @@ impl EpochManager { tree_accounts, forester_slot_details, estimated_slot, + None, // No queue update hint in fallback path ).await { Ok(count) if count > 0 => { info!("V2 fallback found {} items", count); @@ -1540,6 +1503,7 @@ impl EpochManager { tree_accounts, forester_slot_details, estimated_slot, + None, // No queue update hint for regular processing ) .await { @@ -1635,6 +1599,7 @@ impl EpochManager { tree_accounts: &TreeAccounts, forester_slot_details: &ForesterSlot, current_solana_slot: u64, + queue_update: Option<&QueueUpdateMessage>, ) -> Result { match tree_accounts.tree_type { TreeType::StateV1 | TreeType::AddressV1 => { @@ -1648,7 +1613,8 @@ impl EpochManager { .await } TreeType::StateV2 | TreeType::AddressV2 => { - self.process_v2(epoch_info, tree_accounts).await + self.process_v2(epoch_info, tree_accounts, queue_update) + .await } } } @@ -1702,8 +1668,8 @@ impl EpochManager { .await?; if num_sent > 0 { - info!( - "Processed {} items from tree {}", + debug!( + "processed {} items v1 tree {}", num_sent, tree_accounts.merkle_tree ); } @@ -1717,8 +1683,28 @@ impl EpochManager { } } - async fn process_v2(&self, epoch_info: &Epoch, tree_accounts: &TreeAccounts) -> Result { + async fn process_v2( + &self, + epoch_info: &Epoch, + tree_accounts: &TreeAccounts, + queue_update: Option<&QueueUpdateMessage>, + ) -> Result { let default_prover_url = "http://127.0.0.1:3001".to_string(); + + let (input_queue_hint, output_queue_hint) = if let Some(update) = queue_update { + match update.queue_type { + light_compressed_account::QueueType::InputStateV2 => { + (Some(update.queue_size), None) + } + light_compressed_account::QueueType::OutputStateV2 => { + (None, Some(update.queue_size)) + } + _ => (None, None), + } + } else { + (None, None) + }; + let batch_context = BatchContext { rpc_pool: self.rpc_pool.clone(), authority: self.config.payer_keypair.insecure_clone(), @@ -1750,6 +1736,8 @@ impl EpochManager { ops_cache: self.ops_cache.clone(), epoch_phases: epoch_info.phases.clone(), slot_tracker: self.slot_tracker.clone(), + input_queue_hint, + output_queue_hint, }; process_batched_operations(batch_context, tree_accounts.tree_type) @@ -1982,7 +1970,6 @@ fn calculate_remaining_time_or_default( .unwrap_or(Duration::ZERO) } -/// Helper function to check if a tree should be skipped based on configuration fn should_skip_tree(config: &ForesterConfig, tree_type: &TreeType) -> bool { match tree_type { TreeType::AddressV1 => config.general_config.skip_v1_address_trees, diff --git a/forester/src/grpc/mod.rs b/forester/src/grpc/mod.rs new file mode 100644 index 0000000000..b9ff82f951 --- /dev/null +++ b/forester/src/grpc/mod.rs @@ -0,0 +1,3 @@ +mod router; + +pub use router::{QueueUpdateMessage, QueueEventRouter}; diff --git a/forester/src/grpc/router.rs b/forester/src/grpc/router.rs new file mode 100644 index 0000000000..e2b41d5408 --- /dev/null +++ b/forester/src/grpc/router.rs @@ -0,0 +1,214 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use anyhow::{anyhow, Context, Result}; +use light_compressed_account::QueueType; +use proto::{queue_service_client::QueueServiceClient, SubscribeQueueUpdatesRequest}; +use solana_sdk::pubkey::Pubkey; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::StreamExt; +use tonic::transport::Channel; +use tracing::{debug, error, info, trace, warn}; + +// Generated protobuf code +pub mod proto { + tonic::include_proto!("photon"); +} + +/// Message sent to tree tasks when queue updates occur +#[derive(Debug, Clone)] +pub struct QueueUpdateMessage { + pub tree: Pubkey, + pub queue: Pubkey, + pub queue_type: QueueType, + pub queue_size: u64, + pub slot: u64, + pub update_type: proto::UpdateType, +} + +#[derive(Debug)] +pub struct QueueEventRouter { + grpc_client: RwLock>, + tree_notifiers: Arc>>>, + connection_healthy: Arc, + photon_grpc_url: String, +} + +impl QueueEventRouter { + pub async fn new(photon_grpc_url: String) -> Result { + info!("Connecting to Photon gRPC at {}", photon_grpc_url); + + let grpc_client = QueueServiceClient::connect(photon_grpc_url.clone()) + .await + .context("Failed to connect to Photon gRPC service")?; + + info!("Successfully connected to Photon gRPC"); + + Ok(Self { + grpc_client: RwLock::new(grpc_client), + tree_notifiers: Arc::new(RwLock::new(HashMap::new())), + // Initialize as healthy since connection just succeeded + // Will be set to false if subscription fails in run_dispatcher + connection_healthy: Arc::new(AtomicBool::new(true)), + photon_grpc_url, + }) + } + + pub async fn register_tree(&self, tree_pubkey: Pubkey) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(100); + self.tree_notifiers.write().await.insert(tree_pubkey, tx); + debug!("Registered tree {} for queue updates", tree_pubkey); + rx + } + + pub async fn unregister_tree(&self, tree_pubkey: &Pubkey) { + self.tree_notifiers.write().await.remove(tree_pubkey); + debug!("Unregistered tree {}", tree_pubkey); + } + + pub async fn run_dispatcher(self: Arc) -> Result<()> { + let mut reconnect_delay = Duration::from_secs(1); + const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30); + + loop { + match self.dispatch_loop().await { + Ok(()) => { + warn!("gRPC stream ended; attempting to reconnect…"); + self.connection_healthy.store(false, Ordering::Relaxed); + tokio::time::sleep(reconnect_delay).await; + let _ = self.reconnect().await; + reconnect_delay = Duration::from_secs(1); + continue; + } + Err(e) => { + error!("gRPC dispatcher error: {:?}", e); + self.connection_healthy.store(false, Ordering::Relaxed); + + warn!("Reconnecting in {:?}...", reconnect_delay); + tokio::time::sleep(reconnect_delay).await; + reconnect_delay = std::cmp::min(reconnect_delay * 2, MAX_RECONNECT_DELAY); + + match self.reconnect().await { + Ok(()) => { + info!("Successfully reconnected to Photon gRPC"); + reconnect_delay = Duration::from_secs(1); + } + Err(e) => { + error!("Failed to reconnect: {:?}", e); + } + } + } + } + } + } + + async fn reconnect(&self) -> Result<()> { + let new_client = QueueServiceClient::connect(self.photon_grpc_url.clone()) + .await + .context("Failed to reconnect to Photon gRPC service")?; + *self.grpc_client.write().await = new_client; + Ok(()) + } + + async fn dispatch_loop(&self) -> Result<()> { + info!("Starting gRPC queue update subscription"); + + let request = SubscribeQueueUpdatesRequest { + trees: vec![], + send_initial_state: true, + }; + + let mut stream = self + .grpc_client + .read() + .await + .clone() + .subscribe_queue_updates(request) + .await + .context("Failed to subscribe to queue updates")? + .into_inner(); + + self.connection_healthy.store(true, Ordering::Relaxed); + info!("gRPC subscription established successfully"); + + while let Some(update_result) = stream.next().await { + let update = update_result.context("Error receiving queue update")?; + + let queue_info = update + .queue_info + .ok_or_else(|| anyhow!("Missing queue_info in update"))?; + + let tree_pubkey = queue_info + .tree + .parse::() + .context("Failed to parse tree pubkey")?; + + let queue_pubkey = queue_info + .queue + .parse::() + .context("Failed to parse queue pubkey")?; + + let queue_type = QueueType::from(queue_info.queue_type as u64); + + let update_type = proto::UpdateType::try_from(update.update_type) + .unwrap_or(proto::UpdateType::Unspecified); + + let message = QueueUpdateMessage { + tree: tree_pubkey, + queue: queue_pubkey, + queue_type, + queue_size: queue_info.queue_size, + slot: update.slot, + update_type, + }; + + let notifiers = self.tree_notifiers.read().await; + if let Some(tx) = notifiers.get(&tree_pubkey) { + match tx.try_send(message.clone()) { + Ok(()) => { + trace!( + "Routed update to tree {}: {} items (type: {:?})", + tree_pubkey, + message.queue_size, + queue_type + ); + } + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "Tree {} channel full, dropping update (tree processing slower than updates)", + tree_pubkey + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!("Tree {} channel closed (task likely finished)", tree_pubkey); + } + } + } else { + trace!("Received update for unregistered tree {}", tree_pubkey); + } + } + + warn!("gRPC stream ended"); + self.connection_healthy.store(false, Ordering::Relaxed); + Ok(()) + } + + pub fn is_healthy(&self) -> bool { + self.connection_healthy.load(Ordering::Relaxed) + } + + pub async fn registered_tree_count(&self) -> usize { + self.tree_notifiers.read().await.len() + } + + pub async fn shutdown(&self) { + info!("Shutting down WorkCoordinator"); + self.connection_healthy.store(false, Ordering::Relaxed); + } +} diff --git a/forester/src/lib.rs b/forester/src/lib.rs index f3e61a8768..ed7cde6971 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -5,6 +5,7 @@ pub mod config; pub mod epoch_manager; pub mod errors; pub mod forester_status; +pub mod grpc; pub mod health_check; pub mod helius_priority_fee_types; pub mod metrics; diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index 08464a0e66..5e1906027d 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -54,6 +54,10 @@ pub struct BatchContext { pub ops_cache: Arc>, pub epoch_phases: EpochPhases, pub slot_tracker: Arc, + /// input queue size from gRPC + pub input_queue_hint: Option, + /// output queue size from gRPC + pub output_queue_hint: Option, } #[derive(Debug)] From 78b8b841491f2eb63b21d49596182b02cd7bec16 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 13:41:44 +0000 Subject: [PATCH 11/12] cleanup --- forester/proto/photon.proto | 60 ------------------- forester/src/epoch_manager.rs | 41 ++++++------- forester/src/grpc/mod.rs | 2 +- sdk-libs/client/src/indexer/photon_indexer.rs | 24 ++++---- .../program-test/src/indexer/test_indexer.rs | 58 ++++++++++++------ 5 files changed, 71 insertions(+), 114 deletions(-) diff --git a/forester/proto/photon.proto b/forester/proto/photon.proto index ca37ac6a41..8ba5a52c0e 100644 --- a/forester/proto/photon.proto +++ b/forester/proto/photon.proto @@ -4,60 +4,28 @@ package photon; // Queue information service service QueueService { -<<<<<<< HEAD - // Get current queue information for all or specific trees - rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse); - - // Subscribe to queue updates - rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate); -======= // Get current queue information for all or specific trees rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse); // Subscribe to queue updates rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate); ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Request message for GetQueueInfo message GetQueueInfoRequest { -<<<<<<< HEAD - // Optional list of tree pubkeys to filter by (base58 encoded) - // If empty, returns info for all trees - repeated string trees = 1; -======= // Optional list of tree pubkeys to filter by (base58 encoded) // If empty, returns info for all trees repeated string trees = 1; ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Response message for GetQueueInfo message GetQueueInfoResponse { -<<<<<<< HEAD - repeated QueueInfo queues = 1; - uint64 slot = 2; -======= repeated QueueInfo queues = 1; uint64 slot = 2; ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Information about a single queue message QueueInfo { -<<<<<<< HEAD - // Tree public key (base58 encoded) - string tree = 1; - - // Queue public key (base58 encoded) - string queue = 2; - - // Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2 - uint32 queue_type = 3; - - // Current number of items in the queue - uint64 queue_size = 4; -======= // Tree public key (base58 encoded) string tree = 1; @@ -69,40 +37,20 @@ message QueueInfo { // Current number of items in the queue uint64 queue_size = 4; ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Request message for SubscribeQueueUpdates message SubscribeQueueUpdatesRequest { -<<<<<<< HEAD - // Optional list of tree pubkeys to subscribe to (base58 encoded) - // If empty, subscribes to all trees - repeated string trees = 1; - - // Whether to send initial state before streaming updates - bool send_initial_state = 2; -======= // Optional list of tree pubkeys to subscribe to (base58 encoded) // If empty, subscribes to all trees repeated string trees = 1; // Whether to send initial state before streaming updates bool send_initial_state = 2; ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Streamed queue update message message QueueUpdate { -<<<<<<< HEAD - // The queue that was updated - QueueInfo queue_info = 1; - - // Slot at which the update occurred - uint64 slot = 2; - - // Type of update - UpdateType update_type = 3; -======= // The queue that was updated QueueInfo queue_info = 1; @@ -111,20 +59,12 @@ message QueueUpdate { // Type of update UpdateType update_type = 3; ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } // Type of queue update enum UpdateType { -<<<<<<< HEAD - UPDATE_TYPE_UNSPECIFIED = 0; - UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription - UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue - UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue -======= UPDATE_TYPE_UNSPECIFIED = 0; UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue ->>>>>>> 18e1c9a57 (feat: forester subscriber for gRPC queue updates) } diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 27702539f4..d8834e678e 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -43,7 +43,7 @@ use crate::{ errors::{ ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError, }, - grpc::{QueueUpdateMessage, QueueEventRouter}, + grpc::{QueueEventRouter, QueueUpdateMessage}, metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, pagerduty::send_pagerduty_alert, processor::{ @@ -159,10 +159,7 @@ impl EpochManager { Some(coord_arc) } Err(e) => { - warn!( - "{:?}. V2 trees will use polling fallback.", - e - ); + warn!("{:?}. V2 trees will use polling fallback.", e); None } } @@ -556,23 +553,23 @@ impl EpochManager { { Ok(info) => info, Err(ForesterError::Registration( - RegistrationError::RegistrationPhaseEnded { - epoch: failed_epoch, - current_slot, - registration_end, - }, + RegistrationError::RegistrationPhaseEnded { + epoch: failed_epoch, + current_slot, + registration_end, + }, )) => { let next_epoch = failed_epoch + 1; let next_phases = get_epoch_phases(&self.protocol_config, next_epoch); let slots_to_wait = next_phases.registration.start.saturating_sub(current_slot); - info!( + info!( "Too late to register for epoch {} (registration ended at slot {}, current slot: {}). Next available epoch: {}. Registration opens at slot {} ({} slots to wait).", failed_epoch, registration_end, current_slot, next_epoch, next_phases.registration.start, slots_to_wait - ); - return Ok(()); - } + ); + return Ok(()); + } Err(e) => return Err(e.into()), } } @@ -973,7 +970,7 @@ impl EpochManager { Some(coord.register_tree(tree.tree_accounts.merkle_tree).await) } else { None - } + } } else { None }; @@ -1110,7 +1107,7 @@ impl EpochManager { let tree_type = tree_schedule.tree_accounts.tree_type; info!( - "process_queue_v2 tree={}: type={:?}, total_slots={}, eligible_slots={}, event={}, current_slot={}, active_phase_end={}", + "process_queue_v2 tree={}, total_slots={}, eligible_slots={}, current_slot={}, active_phase_end={}", tree_schedule.tree_accounts.merkle_tree, total_slots, eligible_slots, @@ -1151,12 +1148,12 @@ impl EpochManager { .await } else { self.process_light_slot_v2_fallback( - epoch_info, - epoch_pda, - &tree_schedule.tree_accounts, - &light_slot_details, - ) - .await + epoch_info, + epoch_pda, + &tree_schedule.tree_accounts, + &light_slot_details, + ) + .await } } }; diff --git a/forester/src/grpc/mod.rs b/forester/src/grpc/mod.rs index b9ff82f951..7ec8ce7b16 100644 --- a/forester/src/grpc/mod.rs +++ b/forester/src/grpc/mod.rs @@ -1,3 +1,3 @@ mod router; -pub use router::{QueueUpdateMessage, QueueEventRouter}; +pub use router::{QueueEventRouter, QueueUpdateMessage}; diff --git a/sdk-libs/client/src/indexer/photon_indexer.rs b/sdk-libs/client/src/indexer/photon_indexer.rs index b05c7b1173..ec70157acd 100644 --- a/sdk-libs/client/src/indexer/photon_indexer.rs +++ b/sdk-libs/client/src/indexer/photon_indexer.rs @@ -1664,11 +1664,11 @@ impl Indexer for PhotonIndexer { .input_queue_elements .map(|elements| { elements - .iter() + .iter() .map(|x| -> Result<_, IndexerError> { let proof: Vec = x - .proof - .iter() + .proof + .iter() .map(|p| Hash::from_base58(p)) .collect::, _>>()?; let root = Hash::from_base58(&x.root)?; @@ -1682,15 +1682,15 @@ impl Indexer for PhotonIndexer { let account_hash = Hash::from_base58(&x.account_hash)?; Ok(MerkleProofWithContext { - proof, - root, - leaf_index: x.leaf_index, - leaf, - merkle_tree, - root_seq: x.root_seq, - tx_hash, - account_hash, - }) + proof, + root, + leaf_index: x.leaf_index, + leaf, + merkle_tree, + root_seq: x.root_seq, + tx_hash, + account_hash, + }) }) .collect::, _>>() }) diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index 284f03fba8..b5ae74e3d0 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -646,23 +646,23 @@ impl Indexer for TestIndexer { let start = output_queue_start_index as usize; let end = std::cmp::min( start + limit as usize, - address_tree_bundle.queue_elements.len(), - ); + address_tree_bundle.queue_elements.len(), + ); let queue_elements = address_tree_bundle.queue_elements[start..end].to_vec(); - let merkle_proofs_with_context = queue_elements - .iter() + let merkle_proofs_with_context = queue_elements + .iter() .map(|element| MerkleProofWithContext { - proof: Vec::new(), - leaf: [0u8; 32], - leaf_index: 0, - merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), - root: address_tree_bundle.root(), - tx_hash: None, + proof: Vec::new(), + leaf: [0u8; 32], + leaf_index: 0, + merkle_tree: address_tree_bundle.accounts.merkle_tree.to_bytes(), + root: address_tree_bundle.root(), + tx_hash: None, root_seq: output_queue_start_index, - account_hash: *element, - }) - .collect(); + account_hash: *element, + }) + .collect(); Some(merkle_proofs_with_context) } else { None @@ -693,7 +693,7 @@ impl Indexer for TestIndexer { .iter_mut() .find(|x| x.accounts.merkle_tree == pubkey); - if let Some(state_tree_bundle) = state_tree_bundle { + if let Some(state_tree_bundle) = state_tree_bundle { // For state trees, return both input and output queues // Build input queue elements if requested @@ -830,16 +830,36 @@ impl Indexer for TestIndexer { account_hash: *element, }) .collect(); - return Ok(Response { + + Some(merkle_proofs_with_context) + } else { + None + }; + + let output_queue_index = if output_queue_elements.is_some() { + Some(output_queue_start_index) + } else { + None + }; + + let input_queue_index = if input_queue_elements.is_some() { + Some(input_queue_start_index) + } else { + None + }; + + let slot = self.get_current_slot(); + + return Ok(Response { context: Context { slot }, - value: QueueElementsResult { + value: QueueElementsResult { output_queue_elements, output_queue_index, input_queue_elements, input_queue_index, - }, - }); - } + }, + }); + } Err(IndexerError::InvalidParameters( "Merkle tree not found".to_string(), From 01a9ccf1f6aeea805d774d05c92afdb435248929 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sat, 8 Nov 2025 14:29:30 +0000 Subject: [PATCH 12/12] format --- forester/src/epoch_manager.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index d8834e678e..bf6d299e54 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -553,23 +553,23 @@ impl EpochManager { { Ok(info) => info, Err(ForesterError::Registration( - RegistrationError::RegistrationPhaseEnded { - epoch: failed_epoch, - current_slot, - registration_end, - }, + RegistrationError::RegistrationPhaseEnded { + epoch: failed_epoch, + current_slot, + registration_end, + }, )) => { let next_epoch = failed_epoch + 1; let next_phases = get_epoch_phases(&self.protocol_config, next_epoch); let slots_to_wait = next_phases.registration.start.saturating_sub(current_slot); - info!( + info!( "Too late to register for epoch {} (registration ended at slot {}, current slot: {}). Next available epoch: {}. Registration opens at slot {} ({} slots to wait).", failed_epoch, registration_end, current_slot, next_epoch, next_phases.registration.start, slots_to_wait ); - return Ok(()); - } + return Ok(()); + } Err(e) => return Err(e.into()), } } @@ -970,7 +970,7 @@ impl EpochManager { Some(coord.register_tree(tree.tree_accounts.merkle_tree).await) } else { None - } + } } else { None }; @@ -1148,12 +1148,12 @@ impl EpochManager { .await } else { self.process_light_slot_v2_fallback( - epoch_info, - epoch_pda, - &tree_schedule.tree_accounts, - &light_slot_details, - ) - .await + epoch_info, + epoch_pda, + &tree_schedule.tree_accounts, + &light_slot_details, + ) + .await } } };