Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 138 additions & 85 deletions sdk-libs/client/src/indexer/photon_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt::Debug, str::FromStr};
use std::{fmt::Debug, str::FromStr, time::Duration};

use async_trait::async_trait;
use light_compressed_account::compressed_account::{
Expand All @@ -17,7 +17,7 @@ use photon_api::{
};
use solana_program::pubkey::Pubkey;
use solana_sdk::bs58;
use tracing::{debug, error};
use tracing::{debug, error, warn};

use super::{AddressQueueIndex, BatchAddressUpdateIndexerResponse, MerkleProofWithContext};
use crate::{
Expand Down Expand Up @@ -78,15 +78,86 @@ impl<R: RpcConnection> PhotonIndexer<R> {
&mut self.rpc
}

async fn rate_limited_request<F, Fut, T>(&self, operation: F) -> Result<T, IndexerError>
async fn rate_limited_request_with_retry<F, Fut, T>(
&self,
mut operation: F,
) -> Result<T, IndexerError>
where
F: FnOnce() -> Fut,
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, IndexerError>>,
{
if let Some(limiter) = &self.rate_limiter {
limiter.acquire_with_wait().await;
let max_retries = 10;
let mut attempts = 0;
let mut delay_ms = 100;
let max_delay_ms = 4000;

loop {
attempts += 1;

if let Some(limiter) = &self.rate_limiter {
debug!(
"Attempt {}/{}: Acquiring rate limiter",
attempts, max_retries
);
limiter.acquire_with_wait().await;
debug!(
"Attempt {}/{}: Rate limiter acquired",
attempts, max_retries
);
} else {
debug!(
"Attempt {}/{}: No rate limiter configured",
attempts, max_retries
);
}

debug!("Attempt {}/{}: Executing operation", attempts, max_retries);
let result = operation().await;

match result {
Ok(value) => {
debug!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
return Ok(value);
}
Err(e) => {
let is_retryable = match &e {
IndexerError::ApiError(_) => {
warn!("API Error: {}", e);
true
}
IndexerError::PhotonError {
context: _,
message: _,
} => {
warn!("Operation failed, checking if retryable...");
true
}
IndexerError::Base58DecodeError { .. } => false,
IndexerError::AccountNotFound => false,
IndexerError::InvalidParameters(_) => false,
IndexerError::NotImplemented(_) => false,
_ => false,
};

if is_retryable && attempts < max_retries {
warn!(
"Attempt {}/{}: Operation failed. Retrying",
attempts, max_retries
);

tokio::time::sleep(Duration::from_millis(delay_ms)).await;
delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
} else {
if is_retryable {
error!("Operation failed after max retries.");
} else {
error!("Operation failed with non-retryable error.");
}
return Err(e);
}
}
}
}
operation().await
}

fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
Expand Down Expand Up @@ -130,7 +201,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
num_elements: u16,
start_offset: Option<u64>,
) -> Result<Vec<MerkleProofWithContext>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request: photon_api::models::GetQueueElementsPostRequest =
photon_api::models::GetQueueElementsPostRequest {
params: Box::from(photon_api::models::GetQueueElementsPostRequestParams {
Expand Down Expand Up @@ -225,10 +296,12 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
&self,
hashes: Vec<String>,
) -> Result<Vec<MerkleProof>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let hashes_for_async = hashes.clone();

let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
params: hashes,
params: hashes_for_async,
..Default::default()
};

Expand All @@ -240,7 +313,6 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
request,
)
.await?;
debug!("Raw API response: {:?}", result);

if let Some(error) = &result.error {
let error_msg = error.message.as_deref().unwrap_or("Unknown error");
Expand Down Expand Up @@ -295,7 +367,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
&self,
owner: &Pubkey,
) -> Result<Vec<CompressedAccountWithMerkleContext>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
cursor: None,
Expand Down Expand Up @@ -361,7 +433,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
owner: &Pubkey,
mint: Option<Pubkey>,
) -> Result<Vec<TokenDataWithMerkleContext>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = GetCompressedTokenAccountsByOwnerV2PostRequest {
params: Box::from(GetCompressedTokenAccountsByOwnerPostRequestParams {
cursor: None,
Expand Down Expand Up @@ -448,7 +520,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
address: Option<Address>,
hash: Option<Hash>,
) -> Result<Account, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let params = self.build_account_params(address, hash)?;
let request = photon_api::models::GetCompressedAccountPostRequest {
params: Box::new(params),
Expand All @@ -474,16 +546,14 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
owner: &Pubkey,
mint: Option<Pubkey>,
) -> Result<Vec<TokenDataWithMerkleContext>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
params: Box::new(
photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
owner: owner.to_string(),
mint: mint.map(|x| x.to_string()),
cursor: None,
limit: None,
},
),
params: Box::new(GetCompressedTokenAccountsByOwnerPostRequestParams {
owner: owner.to_string(),
mint: mint.map(|x| x.to_string()),
cursor: None,
limit: None,
}),
..Default::default()
};

Expand All @@ -506,7 +576,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
address: Option<Address>,
hash: Option<Hash>,
) -> Result<u64, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let params = self.build_account_params(address, hash)?;
let request = photon_api::models::GetCompressedAccountBalancePostRequest {
params: Box::new(params),
Expand All @@ -530,7 +600,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
address: Option<Address>,
hash: Option<Hash>,
) -> Result<u64, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
address: address.map(|x| x.to_base58()),
Expand All @@ -557,12 +627,16 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
addresses: Option<Vec<Address>>,
hashes: Option<Vec<Hash>>,
) -> Result<Vec<Account>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let addresses_for_async = addresses.clone();
let hashes_for_async = hashes.clone();

let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
params: Box::new(
photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
addresses: addresses_for_async
.map(|x| x.iter().map(|x| x.to_base58()).collect()),
hashes: hashes_for_async.map(|x| x.iter().map(|x| x.to_base58()).collect()),
},
),
..Default::default()
Expand All @@ -585,16 +659,14 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
owner: &Pubkey,
mint: Option<Pubkey>,
) -> Result<TokenBalanceList, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
params: Box::new(
photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
owner: owner.to_string(),
mint: mint.map(|x| x.to_string()),
cursor: None,
limit: None,
},
),
params: Box::new(GetCompressedTokenAccountsByOwnerPostRequestParams {
owner: owner.to_string(),
mint: mint.map(|x| x.to_string()),
cursor: None,
limit: None,
}),
..Default::default()
};

Expand All @@ -616,7 +688,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
&self,
hash: Hash,
) -> Result<Vec<String>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
params: Box::new(
photon_api::models::GetCompressedAccountProofPostRequestParams {
Expand Down Expand Up @@ -650,7 +722,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
merkle_tree_pubkey: [u8; 32],
addresses: Vec<[u8; 32]>,
) -> Result<Vec<NewAddressProofWithContext<16>>, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
.iter()
.map(|x| photon_api::models::address_with_tree::AddressWithTree {
Expand Down Expand Up @@ -758,7 +830,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
hashes: Vec<Hash>,
new_addresses_with_trees: Vec<AddressWithTree>,
) -> Result<CompressedProofWithContext, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetValidityProofPostRequest {
params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
Expand Down Expand Up @@ -792,52 +864,33 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
hashes: Vec<Hash>,
new_addresses_with_trees: Vec<AddressWithTree>,
) -> Result<CompressedProofWithContextV2, IndexerError> {
let max_retries = 4;
let mut retries = 0;
let mut delay = 1000;

loop {
match self
.rate_limited_request(|| async {
let request = photon_api::models::GetValidityProofV2PostRequest {
params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
new_addresses_with_trees: Some(
new_addresses_with_trees
.iter()
.map(|x| photon_api::models::AddressWithTree {
address: x.address.to_base58(),
tree: x.tree.to_string(),
})
.collect(),
),
}),
..Default::default()
};
self.rate_limited_request_with_retry(|| async {
let request = photon_api::models::GetValidityProofV2PostRequest {
params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
new_addresses_with_trees: Some(
new_addresses_with_trees
.iter()
.map(|x| photon_api::models::AddressWithTree {
address: x.address.to_base58(),
tree: x.tree.to_string(),
})
.collect(),
),
}),
..Default::default()
};

let result = photon_api::apis::default_api::get_validity_proof_v2_post(
&self.configuration,
request,
)
.await?;
let result = photon_api::apis::default_api::get_validity_proof_v2_post(
&self.configuration,
request,
)
.await?;

let result = Self::extract_result("get_validity_proof_v2", result.result)?;
Ok(*result.value)
})
.await
{
Ok(result) => return Ok(result),
Err(e) => {
if retries >= max_retries {
return Err(e);
}
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
retries += 1;
delay *= 2;
continue;
}
}
}
let result = Self::extract_result("get_validity_proof_v2", result.result)?;
Ok(*result.value)
})
.await
}

async fn get_indexer_slot(&self, _r: &mut R) -> Result<u64, IndexerError> {
Expand All @@ -862,7 +915,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
merkle_tree_pubkey: &Pubkey,
zkp_batch_size: u16,
) -> Result<BatchAddressUpdateIndexerResponse, IndexerError> {
self.rate_limited_request(|| async {
self.rate_limited_request_with_retry(|| async {
let merkle_tree = Hash::from_bytes(merkle_tree_pubkey.to_bytes().as_ref())?;
let request = photon_api::models::GetBatchAddressUpdateInfoPostRequest {
params: Box::new(
Expand Down