Skip to content

Commit cd1c3b2

Browse files
committed
feat: disable v1 state multi-nullify when queue exceeds 10,000 items
Adds queue_item_count to BuildTransactionBatchConfig and disables multi-nullify when the queue is too large, falling back to single nullify for more reliable throughput. Renames use_dedup to use_multi_nullify for consistency.
1 parent 82e0a6b commit cd1c3b2

18 files changed

Lines changed: 160 additions & 345 deletions

File tree

forester/src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,14 @@ pub struct StartArgs {
295295
)]
296296
pub min_queue_items: Option<usize>,
297297

298+
#[arg(
299+
long,
300+
env = "ENABLE_V1_MULTI_NULLIFY",
301+
help = "Enable nullify_state_v1_multi instruction for batching 2-4 V1 state nullifications per instruction. Requires --lookup-table-address.",
302+
default_value = "false"
303+
)]
304+
pub enable_v1_multi_nullify: bool,
305+
298306
#[arg(
299307
long,
300308
env = "API_SERVER_PORT",

forester/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub struct ForesterConfig {
3333
/// Minimum queue items before processing V1 state nullifications.
3434
/// Delays processing to allow dedup grouping. Only applies when lookup_table_address is set.
3535
pub min_queue_items: Option<usize>,
36+
/// Enable nullify_state_v1_multi instruction for batching 2-4 V1 state nullifications.
37+
/// Requires lookup_table_address to be set.
38+
pub enable_v1_multi_nullify: bool,
3639
}
3740

3841
#[derive(Debug, Clone)]
@@ -425,6 +428,7 @@ impl ForesterConfig {
425428
})
426429
.transpose()?,
427430
min_queue_items: args.min_queue_items,
431+
enable_v1_multi_nullify: args.enable_v1_multi_nullify,
428432
})
429433
}
430434

@@ -480,6 +484,7 @@ impl ForesterConfig {
480484
compressible_config: None,
481485
lookup_table_address: None,
482486
min_queue_items: None,
487+
enable_v1_multi_nullify: false,
483488
})
484489
}
485490
}
@@ -501,6 +506,7 @@ impl Clone for ForesterConfig {
501506
compressible_config: self.compressible_config.clone(),
502507
lookup_table_address: self.lookup_table_address,
503508
min_queue_items: self.min_queue_items,
509+
enable_v1_multi_nullify: self.enable_v1_multi_nullify,
504510
}
505511
}
506512
}

forester/src/epoch_manager.rs

Lines changed: 31 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ pub struct EpochManager<R: Rpc + Indexer> {
275275
mint_tracker: Option<Arc<crate::compressible::mint::MintAccountTracker>>,
276276
/// Cached zkp_batch_size per tree to filter queue updates below threshold
277277
zkp_batch_sizes: Arc<DashMap<Pubkey, u64>>,
278-
address_lookup_tables: Arc<tokio::sync::RwLock<Vec<AddressLookupTableAccount>>>,
278+
address_lookup_tables: Arc<Vec<AddressLookupTableAccount>>,
279279
heartbeat: Arc<ServiceHeartbeat>,
280280
run_id: Arc<str>,
281281
/// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch
@@ -328,7 +328,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
328328
compressible_tracker: Option<Arc<CTokenAccountTracker>>,
329329
pda_tracker: Option<Arc<crate::compressible::pda::PdaAccountTracker>>,
330330
mint_tracker: Option<Arc<crate::compressible::mint::MintAccountTracker>>,
331-
address_lookup_tables: Arc<tokio::sync::RwLock<Vec<AddressLookupTableAccount>>>,
331+
address_lookup_tables: Arc<Vec<AddressLookupTableAccount>>,
332332
heartbeat: Arc<ServiceHeartbeat>,
333333
run_id: String,
334334
) -> Result<Self> {
@@ -1096,21 +1096,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
10961096
debug!("Recovered registration info for epoch {}", epoch);
10971097
update_epoch_registered(epoch);
10981098

1099-
// Extend ALT with new forester epoch PDA if ALT is configured
1100-
let forester_epoch_pda_pubkey =
1101-
get_forester_epoch_pda_from_authority(&self.config.derivation_pubkey, epoch).0;
1102-
if let Err(e) = self
1103-
.extend_alt_with_forester_pda(forester_epoch_pda_pubkey)
1104-
.await
1105-
{
1106-
warn!(
1107-
event = "extend_alt_failed",
1108-
epoch,
1109-
error = ?e,
1110-
"Failed to extend ALT with forester PDA, continuing with static account"
1111-
);
1112-
}
1113-
11141099
// Wait for the active phase
11151100
registration_info = match self.wait_for_active_phase(&registration_info).await? {
11161101
Some(info) => info,
@@ -1395,51 +1380,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
13951380
}
13961381
}
13971382

1398-
async fn extend_alt_with_forester_pda(&self, forester_epoch_pda: Pubkey) -> anyhow::Result<()> {
1399-
let alt_address = match self.config.lookup_table_address {
1400-
Some(addr) => addr,
1401-
None => return Ok(()),
1402-
};
1403-
1404-
// Check if the PDA is already in the ALT
1405-
{
1406-
let alt = self.address_lookup_tables.read().await;
1407-
if alt
1408-
.iter()
1409-
.any(|t| t.addresses.contains(&forester_epoch_pda))
1410-
{
1411-
return Ok(());
1412-
}
1413-
}
1414-
1415-
let extend_ix = light_client::rpc::lut::instruction::extend_lookup_table(
1416-
alt_address,
1417-
self.config.payer_keypair.pubkey(),
1418-
Some(self.config.payer_keypair.pubkey()),
1419-
vec![forester_epoch_pda],
1420-
);
1421-
let payer_pubkey = self.config.payer_keypair.pubkey();
1422-
let mut rpc = self.rpc_pool.get_connection().await?;
1423-
rpc.create_and_send_transaction(&[extend_ix], &payer_pubkey, &[&self.config.payer_keypair])
1424-
.await
1425-
.map_err(|e| anyhow::anyhow!("Failed to extend ALT: {e}"))?;
1426-
1427-
// Reload the ALT from on-chain
1428-
let updated_lut = load_lookup_table_async(&*rpc, alt_address).await?;
1429-
info!(
1430-
event = "alt_extended",
1431-
lookup_table = %alt_address,
1432-
new_address = %forester_epoch_pda,
1433-
address_count = updated_lut.addresses.len(),
1434-
"Extended ALT with forester epoch PDA"
1435-
);
1436-
1437-
let mut alt = self.address_lookup_tables.write().await;
1438-
*alt = vec![updated_lut];
1439-
1440-
Ok(())
1441-
}
1442-
14431383
async fn recover_registration_info_internal(
14441384
&self,
14451385
epoch: u64,
@@ -1574,14 +1514,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
15741514
};
15751515
let payer = self.config.payer_keypair.pubkey();
15761516
let signers = [&self.config.payer_keypair];
1577-
let alt_guard = self.address_lookup_tables.read().await;
15781517
send_smart_transaction(
15791518
&mut *rpc,
15801519
SendSmartTransactionConfig {
15811520
instructions: vec![ix],
15821521
payer: &payer,
15831522
signers: &signers,
1584-
address_lookup_tables: &alt_guard,
1523+
address_lookup_tables: &self.address_lookup_tables,
15851524
compute_budget: ComputeBudgetConfig {
15861525
compute_unit_price: priority_fee,
15871526
compute_unit_limit: Some(self.config.transaction_config.cu_limit),
@@ -1986,14 +1925,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
19861925
};
19871926
let payer = self.config.payer_keypair.pubkey();
19881927
let signers = [&self.config.payer_keypair];
1989-
let alt_guard = self.address_lookup_tables.read().await;
19901928
match send_smart_transaction(
19911929
&mut *rpc,
19921930
SendSmartTransactionConfig {
19931931
instructions: vec![ix],
19941932
payer: &payer,
19951933
signers: &signers,
1996-
address_lookup_tables: &alt_guard,
1934+
address_lookup_tables: &self.address_lookup_tables,
19971935
compute_budget: ComputeBudgetConfig {
19981936
compute_unit_price: priority_fee,
19991937
compute_unit_limit: Some(self.config.transaction_config.cu_limit),
@@ -3055,6 +2993,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
30552993
compute_unit_limit: Some(self.config.transaction_config.cu_limit),
30562994
enable_priority_fees: self.config.transaction_config.enable_priority_fees,
30572995
max_concurrent_sends: Some(self.config.transaction_config.max_concurrent_sends),
2996+
queue_item_count: 0,
30582997
},
30592998
queue_config: self.config.queue_config,
30602999
retry_config: RetryConfig {
@@ -3070,15 +3009,16 @@ impl<R: Rpc + Indexer> EpochManager<R> {
30703009
min_queue_items: None, // set below after reading ALT
30713010
};
30723011

3073-
let alt_snapshot = self.address_lookup_tables.read().await.clone();
3074-
if !alt_snapshot.is_empty() {
3012+
let alt_snapshot = (*self.address_lookup_tables).clone();
3013+
if self.config.enable_v1_multi_nullify && !alt_snapshot.is_empty() {
30753014
batched_tx_config.min_queue_items = self.config.min_queue_items;
30763015
}
30773016
let transaction_builder = Arc::new(EpochManagerTransactions::new(
30783017
self.rpc_pool.clone(),
30793018
epoch_info.epoch,
30803019
self.tx_cache.clone(),
30813020
alt_snapshot,
3021+
self.config.enable_v1_multi_nullify,
30823022
));
30833023

30843024
let num_sent = send_batched_transactions(
@@ -3280,14 +3220,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
32803220
}
32813221

32823222
// No existing processor - create new one
3283-
let alt_guard = self.address_lookup_tables.read().await;
32843223
let batch_context = self.build_batch_context(
32853224
epoch_info,
32863225
tree_accounts,
32873226
None,
32883227
None,
32893228
None,
3290-
Arc::new(alt_guard.clone()),
3229+
self.address_lookup_tables.clone(),
32913230
);
32923231
let processor = Arc::new(Mutex::new(
32933232
QueueProcessor::new(batch_context, StateTreeStrategy).await?,
@@ -3342,14 +3281,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
33423281
}
33433282

33443283
// No existing processor - create new one
3345-
let alt_guard = self.address_lookup_tables.read().await;
33463284
let batch_context = self.build_batch_context(
33473285
epoch_info,
33483286
tree_accounts,
33493287
None,
33503288
None,
33513289
None,
3352-
Arc::new(alt_guard.clone()),
3290+
self.address_lookup_tables.clone(),
33533291
);
33543292
let processor = Arc::new(Mutex::new(
33553293
QueueProcessor::new(batch_context, AddressTreeStrategy).await?,
@@ -3927,14 +3865,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
39273865
let instruction_count = instructions.len();
39283866
let payer = self.config.payer_keypair.pubkey();
39293867
let signers = [&self.config.payer_keypair];
3930-
let alt_guard = self.address_lookup_tables.read().await;
39313868
match send_smart_transaction(
39323869
&mut *rpc,
39333870
SendSmartTransactionConfig {
39343871
instructions,
39353872
payer: &payer,
39363873
signers: &signers,
3937-
address_lookup_tables: &alt_guard,
3874+
address_lookup_tables: &self.address_lookup_tables,
39383875
compute_budget: ComputeBudgetConfig {
39393876
compute_unit_price: priority_fee,
39403877
compute_unit_limit: Some(self.config.transaction_config.cu_limit),
@@ -4103,14 +4040,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
41034040
.await?;
41044041
let payer = self.config.payer_keypair.pubkey();
41054042
let signers = [&self.config.payer_keypair];
4106-
let alt_guard = self.address_lookup_tables.read().await;
41074043
match send_smart_transaction(
41084044
&mut rpc,
41094045
SendSmartTransactionConfig {
41104046
instructions: vec![ix],
41114047
payer: &payer,
41124048
signers: &signers,
4113-
address_lookup_tables: &alt_guard,
4049+
address_lookup_tables: &self.address_lookup_tables,
41144050
compute_budget: ComputeBudgetConfig {
41154051
compute_unit_price: priority_fee,
41164052
compute_unit_limit: Some(self.config.transaction_config.cu_limit),
@@ -4546,28 +4482,28 @@ pub async fn run_service<R: Rpc + Indexer>(
45464482
let address_lookup_tables = {
45474483
if let Some(lut_address) = config.lookup_table_address {
45484484
let rpc = rpc_pool.get_connection().await?;
4549-
match load_lookup_table_async(&*rpc, lut_address).await {
4550-
Ok(lut) => {
4551-
info!(
4552-
event = "lookup_table_loaded",
4485+
let lut = load_lookup_table_async(&*rpc, lut_address).await
4486+
.map_err(|e| {
4487+
error!(
4488+
event = "lookup_table_load_failed",
45534489
run_id = %run_id_for_logs,
45544490
lookup_table = %lut_address,
4555-
address_count = lut.addresses.len(),
4556-
"Loaded lookup table"
4491+
error = %e,
4492+
"Failed to load lookup table"
45574493
);
4558-
Arc::new(tokio::sync::RwLock::new(vec![lut]))
4559-
}
4560-
Err(e) => {
4561-
debug!(
4562-
"Lookup table {} not available: {}. Using legacy transactions.",
4563-
lut_address, e
4564-
);
4565-
Arc::new(tokio::sync::RwLock::new(Vec::new()))
4566-
}
4567-
}
4494+
e
4495+
})?;
4496+
info!(
4497+
event = "lookup_table_loaded",
4498+
run_id = %run_id_for_logs,
4499+
lookup_table = %lut_address,
4500+
address_count = lut.addresses.len(),
4501+
"Loaded lookup table"
4502+
);
4503+
Arc::new(vec![lut])
45684504
} else {
4569-
debug!("No lookup table address configured. Using legacy transactions.");
4570-
Arc::new(tokio::sync::RwLock::new(Vec::new()))
4505+
debug!("No lookup table address configured. Using v1 state single nullify transactions.");
4506+
Arc::new(Vec::new())
45714507
}
45724508
};
45734509

@@ -4738,6 +4674,7 @@ mod tests {
47384674
compressible_config: None,
47394675
lookup_table_address: None,
47404676
min_queue_items: None,
4677+
enable_v1_multi_nullify: false,
47414678
}
47424679
}
47434680

forester/src/processor/v1/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ pub struct BuildTransactionBatchConfig {
3232
pub compute_unit_limit: Option<u32>,
3333
pub enable_priority_fees: bool,
3434
pub max_concurrent_sends: Option<usize>,
35+
/// Number of items in the queue at the time of batch preparation.
36+
/// Used to disable multi-nullify when queue is very large (>10,000 items).
37+
pub queue_item_count: usize,
3538
}

0 commit comments

Comments
 (0)