Skip to content

Commit 950ccdd

Browse files
wip
1 parent a82c13d commit 950ccdd

8 files changed

Lines changed: 593 additions & 43 deletions

File tree

Cargo.lock

Lines changed: 40 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ rand = "0.8.5"
133133
bincode = "1.3.3"
134134
rust-s3 = "0.34.0"
135135
cloud-storage = "0.11.1"
136+
jsonwebtoken = "9"
136137

137138
[dev-dependencies]
138139
function_name = "0.3.0"

src/api/method/get_queue_elements.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::api::error::PhotonApiError;
22
use crate::api::method::get_multiple_new_address_proofs::{
3-
get_multiple_new_address_proofs_helper, AddressWithTree, MAX_ADDRESSES,
3+
get_multiple_new_address_proofs_helper, AddressWithTree,
44
};
55
use crate::common::format_bytes;
66
use crate::common::typedefs::context::Context;
@@ -26,6 +26,10 @@ use std::collections::HashMap;
2626
use utoipa::ToSchema;
2727

2828
const MAX_QUEUE_ELEMENTS: u16 = 30_000;
29+
// SQLite has a limit of 999 SQL variables. Each address proof requires ~26 nodes (tree height),
30+
// and each node needs 2 params (tree, node_idx). So max addresses ≈ 999 / (26 * 2) ≈ 19.
31+
// We use 15 to be safe and account for other query overhead.
32+
const MAX_QUEUE_ELEMENTS_SQLITE: u16 = 15;
2933

3034
/// Encode tree node position as a single u64
3135
/// Format: [level: u8][position: 56 bits]
@@ -674,10 +678,14 @@ async fn fetch_address_queue_v2(
674678
limit: u16,
675679
zkp_batch_size: u16,
676680
) -> Result<AddressQueueData, PhotonApiError> {
677-
if limit as usize > MAX_ADDRESSES {
681+
let max_allowed = match tx.get_database_backend() {
682+
sea_orm::DatabaseBackend::Sqlite => MAX_QUEUE_ELEMENTS_SQLITE,
683+
_ => MAX_QUEUE_ELEMENTS,
684+
};
685+
if limit > max_allowed {
678686
return Err(PhotonApiError::ValidationError(format!(
679687
"Too many addresses requested {}. Maximum allowed: {}",
680-
limit, MAX_ADDRESSES
688+
limit, max_allowed
681689
)));
682690
}
683691

@@ -761,9 +769,13 @@ async fn fetch_address_queue_v2(
761769
});
762770
}
763771

764-
let non_inclusion_proofs =
765-
get_multiple_new_address_proofs_helper(tx, addresses_with_trees, MAX_ADDRESSES, false)
766-
.await?;
772+
let non_inclusion_proofs = get_multiple_new_address_proofs_helper(
773+
tx,
774+
addresses_with_trees,
775+
max_allowed as usize,
776+
false,
777+
)
778+
.await?;
767779

768780
if non_inclusion_proofs.len() != queue_results.len() {
769781
return Err(PhotonApiError::ValidationError(format!(

src/monitor/queue_hash_cache.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,31 @@ where
100100
chains.sort_by_key(|c| c.zkp_batch_index);
101101
Ok(chains)
102102
}
103+
104+
pub async fn delete_hash_chains<C>(
105+
db: &C,
106+
tree_pubkey: Pubkey,
107+
queue_type: QueueType,
108+
batch_start_index: u64,
109+
zkp_batch_indices: Vec<i32>,
110+
) -> Result<u64, DbErr>
111+
where
112+
C: ConnectionTrait,
113+
{
114+
if zkp_batch_indices.is_empty() {
115+
return Ok(0);
116+
}
117+
118+
let queue_type_int = queue_type as i32;
119+
let tree_bytes = tree_pubkey.to_bytes().to_vec();
120+
121+
let result = queue_hash_chains::Entity::delete_many()
122+
.filter(queue_hash_chains::Column::TreePubkey.eq(tree_bytes))
123+
.filter(queue_hash_chains::Column::QueueType.eq(queue_type_int))
124+
.filter(queue_hash_chains::Column::BatchStartIndex.eq(batch_start_index as i64))
125+
.filter(queue_hash_chains::Column::ZkpBatchIndex.is_in(zkp_batch_indices))
126+
.exec(db)
127+
.await?;
128+
129+
Ok(result.rows_affected)
130+
}

src/monitor/queue_monitor.rs

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,15 @@ async fn verify_queue_hash_chains(
234234
let batch_start_index = on_chain_batches
235235
.map(|batches| batches[pending_batch_index].start_index)
236236
.unwrap_or(0);
237-
let start_offset = batch_start_index + (num_inserted_zkps * zkp_batch_size);
237+
238+
// For AddressV2 queues, batch.start_index is 1-based (tree leaf index) but
239+
// address_queues.queue_index is 0-based. Apply -1 offset when querying.
240+
// See: src/ingester/persist/persisted_batch_event/address.rs lines 51-55
241+
let start_offset = if queue_type == QueueType::AddressV2 {
242+
batch_start_index.saturating_sub(1) + (num_inserted_zkps * zkp_batch_size)
243+
} else {
244+
batch_start_index + (num_inserted_zkps * zkp_batch_size)
245+
};
238246

239247
let cached_chains =
240248
queue_hash_cache::get_cached_hash_chains(db, tree_pubkey, queue_type, batch_start_index)
@@ -249,13 +257,15 @@ async fn verify_queue_hash_chains(
249257
let start_zkp_batch_idx = num_inserted_zkps as usize;
250258

251259
let mut computed_chains = Vec::with_capacity(on_chain_chains.len());
252-
let mut chains_to_cache = Vec::new();
260+
let mut newly_computed: Vec<(usize, u64, [u8; 32])> = Vec::new();
261+
let mut used_cached_indices: Vec<i32> = Vec::new();
253262

254263
for zkp_batch_idx in 0..on_chain_chains.len() {
255264
let actual_zkp_idx = start_zkp_batch_idx + zkp_batch_idx;
256265

257266
if let Some(&cached_chain) = cached_map.get(&(actual_zkp_idx as i32)) {
258267
computed_chains.push(cached_chain);
268+
used_cached_indices.push(actual_zkp_idx as i32);
259269
} else {
260270
let chain_offset = start_offset + (zkp_batch_idx as u64 * zkp_batch_size);
261271
let chains = compute_hash_chains_from_db(
@@ -270,30 +280,22 @@ async fn verify_queue_hash_chains(
270280

271281
if !chains.is_empty() {
272282
computed_chains.push(chains[0]);
273-
chains_to_cache.push((actual_zkp_idx, chain_offset, chains[0]));
283+
newly_computed.push((actual_zkp_idx, chain_offset, chains[0]));
274284
}
275285
}
276286
}
277287

278-
if !chains_to_cache.is_empty() {
279-
if let Err(e) = queue_hash_cache::store_hash_chains_batch(
280-
db,
281-
tree_pubkey,
282-
queue_type,
283-
batch_start_index,
284-
chains_to_cache,
285-
)
286-
.await
287-
{
288-
error!("Failed to cache hash chains: {:?}", e);
289-
}
290-
}
288+
// Validate computed chains against on-chain values BEFORE caching
289+
let mut valid_chains_to_cache: Vec<(usize, u64, [u8; 32])> = Vec::new();
290+
let mut invalid_cached_indices: Vec<i32> = Vec::new();
291291

292292
for (zkp_batch_idx, (on_chain, computed)) in on_chain_chains
293293
.iter()
294294
.zip(computed_chains.iter())
295295
.enumerate()
296296
{
297+
let actual_zkp_idx = start_zkp_batch_idx + zkp_batch_idx;
298+
297299
if on_chain != computed {
298300
divergences.push(HashChainDivergence {
299301
queue_info: QueueHashChainInfo {
@@ -306,6 +308,55 @@ async fn verify_queue_hash_chains(
306308
actual_hash_chain: *on_chain,
307309
zkp_batch_index: zkp_batch_idx,
308310
});
311+
312+
// If this was from cache, mark for deletion
313+
if used_cached_indices.contains(&(actual_zkp_idx as i32)) {
314+
invalid_cached_indices.push(actual_zkp_idx as i32);
315+
}
316+
} else {
317+
// Only cache newly computed chains that match on-chain
318+
if let Some(entry) = newly_computed
319+
.iter()
320+
.find(|(idx, _, _)| *idx == actual_zkp_idx)
321+
{
322+
valid_chains_to_cache.push(*entry);
323+
}
324+
}
325+
}
326+
327+
// Delete invalid cached chains
328+
if !invalid_cached_indices.is_empty() {
329+
debug!(
330+
"Deleting {} invalid cached hash chains for tree {} type {:?}",
331+
invalid_cached_indices.len(),
332+
tree_pubkey,
333+
queue_type
334+
);
335+
if let Err(e) = queue_hash_cache::delete_hash_chains(
336+
db,
337+
tree_pubkey,
338+
queue_type,
339+
batch_start_index,
340+
invalid_cached_indices,
341+
)
342+
.await
343+
{
344+
error!("Failed to delete invalid cached hash chains: {:?}", e);
345+
}
346+
}
347+
348+
// Only cache validated chains
349+
if !valid_chains_to_cache.is_empty() {
350+
if let Err(e) = queue_hash_cache::store_hash_chains_batch(
351+
db,
352+
tree_pubkey,
353+
queue_type,
354+
batch_start_index,
355+
valid_chains_to_cache,
356+
)
357+
.await
358+
{
359+
error!("Failed to cache hash chains: {:?}", e);
309360
}
310361
}
311362

src/snapshot/gcs_utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod resumable_upload;

0 commit comments

Comments
 (0)