From 7cf0812aaf4d55b37fd7bc5f4120c81bf4fbda80 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 26 Feb 2026 18:36:55 +0800 Subject: [PATCH 01/17] feat: add timing calculation --- .../src/payload/flashblocks/payload.rs | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index b41185e2..b923dd96 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -1239,6 +1239,8 @@ fn resolve_zero_state_root( ctx: CalculateStateRootContext, state_provider: Box, ) -> Result { + let resolve_start_time = Instant::now(); + let (state_root, trie_updates, hashed_state) = calculate_state_root_on_resolve(&ctx, state_provider)?; @@ -1276,10 +1278,13 @@ fn resolve_zero_state_root( "Failed to send updated payload" ); } - debug!( + + let resolve_total_time = resolve_start_time.elapsed(); + info!( target: "payload_builder", state_root = %state_root, - "Updated payload with calculated state root" + resolve_total_ms = resolve_total_time.as_millis(), + "resolve_zero_state_root completed" ); Ok(updated_payload) @@ -1290,8 +1295,13 @@ fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, state_provider: Box, ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { - let state_root_start_time = Instant::now(); + let total_start_time = Instant::now(); + + let hashed_state_start = Instant::now(); let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); + let hashed_state_time = hashed_state_start.elapsed(); + + let state_root_start = Instant::now(); let state_root_updates = state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { warn!(target: "payload_builder", @@ -1300,10 +1310,20 @@ fn calculate_state_root_on_resolve( "failed to calculate state root for payload" ); })?; + let state_root_time = state_root_start.elapsed(); + + let total_time = total_start_time.elapsed(); + info!( + target: "payload_builder", + hashed_state_ms = hashed_state_time.as_millis(), + state_root_ms = state_root_time.as_millis(), + total_ms = total_time.as_millis(), + state_root = %state_root_updates.0, + "calculate_state_root_on_resolve timing (cold)" + ); - let state_root_calculation_time = state_root_start_time.elapsed(); - ctx.metrics.state_root_calculation_duration.record(state_root_calculation_time); - ctx.metrics.state_root_calculation_gauge.set(state_root_calculation_time); + ctx.metrics.state_root_calculation_duration.record(total_time); + ctx.metrics.state_root_calculation_gauge.set(total_time); Ok((state_root_updates.0, state_root_updates.1, hashed_state)) } From d1b0f72675d262b2234f890b1c371a6729d57121 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Thu, 26 Feb 2026 18:53:16 +0800 Subject: [PATCH 02/17] feat: async trie update --- crates/builder/src/args/op.rs | 20 ++ .../builder/src/payload/flashblocks/config.rs | 14 + .../src/payload/flashblocks/payload.rs | 247 +++++++++++++++++- 3 files changed, 270 insertions(+), 11 deletions(-) diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index fe0b61c8..383546f6 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -109,6 +109,26 @@ pub struct FlashblocksArgs { )] pub flashblocks_disable_async_calculate_state_root: bool, + /// Enable async trie precalculation during flashblock building. + /// When enabled and disable_state_root is true, background trie calculations + /// are spawned after each flashblock to speed up final state root resolution. + #[arg( + long = "flashblocks.enable-async-trie-precalc", + default_value = "false", + env = "FLASHBLOCKS_ENABLE_ASYNC_TRIE_PRECALC" + )] + pub flashblocks_enable_async_trie_precalc: bool, + + /// Which flashblock index to start async trie precalculation from (0-indexed). + /// For example, with 5 flashblocks and start=2, precalculation begins after + /// flashblock 2 (skipping 0 and 1). + #[arg( + long = "flashblocks.async-trie-precalc-start-flashblock", + default_value = "0", + env = "FLASHBLOCKS_ASYNC_TRIE_PRECALC_START_FLASHBLOCK" + )] + pub flashblocks_async_trie_precalc_start_flashblock: u64, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/builder/src/payload/flashblocks/config.rs b/crates/builder/src/payload/flashblocks/config.rs index 932cb245..4eabab40 100644 --- a/crates/builder/src/payload/flashblocks/config.rs +++ b/crates/builder/src/payload/flashblocks/config.rs @@ -60,6 +60,14 @@ pub struct FlashblocksConfig { /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, + + /// Enable async trie precalculation during flashblock building. + /// When enabled and disable_state_root is true, background trie calculations + /// are spawned after each flashblock to speed up final state root resolution. + pub enable_async_trie_precalc: bool, + + /// Which flashblock index to start async trie precalculation from (0-indexed). + pub async_trie_precalc_start_flashblock: u64, } impl Default for FlashblocksConfig { @@ -80,6 +88,8 @@ impl Default for FlashblocksConfig { p2p_send_full_payload: false, p2p_process_full_payload: false, ws_subscriber_limit: None, + enable_async_trie_precalc: false, + async_trie_precalc_start_flashblock: 0, } } } @@ -118,6 +128,10 @@ impl TryFrom for FlashblocksConfig { p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, + enable_async_trie_precalc: args.flashblocks.flashblocks_enable_async_trie_precalc, + async_trie_precalc_start_flashblock: args + .flashblocks + .flashblocks_async_trie_precalc_start_flashblock, }) } } diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index b923dd96..1d6f8c82 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -185,6 +185,33 @@ impl FlashblocksState { } } +/// Result of an async trie precalculation for a single flashblock. +struct TriePrecalcResult { + /// The flashblock index this result corresponds to. + flashblock_index: u64, + /// The computed trie updates that can seed the next incremental calculation. + trie_updates: Arc, +} + +/// Work item sent from the main flashblock loop to the background trie worker. +struct TriePrecalcWorkItem { + flashblock_index: u64, + bundle_state: BundleState, +} + +/// Manages the async trie precalculation pipeline. +/// +/// A background worker computes incremental trie updates sequentially. +/// Each computation uses the previous one's `TrieUpdates` to maintain +/// an incremental chain. Results are collected here and used during +/// final state root resolution. +struct AsyncTriePrecalcPipeline { + /// Receiver for completed precalculation results from the background worker. + result_rx: std::sync::mpsc::Receiver, + /// Sender for providing BundleState snapshots to the background worker. + work_tx: std::sync::mpsc::SyncSender, +} + /// Optimism's payload builder #[derive(Debug, Clone)] pub(super) struct OpPayloadBuilder { @@ -464,7 +491,7 @@ where ctx.metrics.payload_num_tx_gauge.set(info.executed_transactions.len() as f64); // return early since we don't need to build a block with transactions from the pool - self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload); + self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload, None); return Ok(()); } @@ -540,6 +567,38 @@ where fb_payload.payload_id, ))); + // Initialize async trie precalculation pipeline if enabled + let mut precalc_pipeline: Option = if disable_state_root + && self.config.specific.enable_async_trie_precalc + { + match self.client.state_by_block_hash(ctx.parent().hash()) { + Ok(worker_state_provider) => { + let (work_tx, work_rx) = std::sync::mpsc::sync_channel(2); + let (result_tx, result_rx) = + std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); + let metrics = self.metrics.clone(); + self.task_executor.spawn_blocking(Box::pin(async move { + run_trie_precalc_worker(work_rx, result_tx, worker_state_provider, metrics); + })); + info!( + target: "payload_builder", + "Async trie precalculation pipeline started" + ); + Some(AsyncTriePrecalcPipeline { result_rx, work_tx }) + } + Err(err) => { + warn!( + target: "payload_builder", + error = %err, + "Failed to create state provider for async trie precalc, disabling" + ); + None + } + } + } else { + None + }; + // Process flashblocks - block on async channel receive loop { // Wait for signal before building flashblock. @@ -554,7 +613,13 @@ where ctx = ctx.with_cancel(new_fb_cancel); } else { // Channel closed - block building cancelled - self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload); + self.resolve_best_payload( + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + precalc_pipeline.take(), + ); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks); return Ok(()); } @@ -577,6 +642,7 @@ where best_payload, fallback_payload, &resolve_payload, + precalc_pipeline.take(), ); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks); return Ok(()); @@ -595,11 +661,45 @@ where best_payload, fallback_payload, &resolve_payload, + precalc_pipeline.take(), ); return Err(PayloadBuilderError::Other(err.into())); } }; + // Feed work item to async trie precalc pipeline + if let Some(pipeline) = &precalc_pipeline { + let fb_index = fb_state.flashblock_index(); + if fb_index >= self.config.specific.async_trie_precalc_start_flashblock { + match pipeline.work_tx.try_send(TriePrecalcWorkItem { + flashblock_index: fb_index, + bundle_state: best_payload.1.clone(), + }) { + Ok(()) => { + debug!( + target: "payload_builder", + flashblock_index = fb_index, + "Sent work item to async trie precalc pipeline" + ); + } + Err(std::sync::mpsc::TrySendError::Full(_)) => { + warn!( + target: "payload_builder", + flashblock_index = fb_index, + "Async trie precalc pipeline full, skipping" + ); + } + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + warn!( + target: "payload_builder", + flashblock_index = fb_index, + "Async trie precalc worker disconnected" + ); + } + } + } + } + fb_state = next_fb_state; } } @@ -813,6 +913,7 @@ where best_payload: (OpBuiltPayload, BundleState), fallback_payload: OpBuiltPayload, resolve_payload: &BlockCell, + precalc_pipeline: Option, ) { if resolve_payload.get().is_some() { return; @@ -820,6 +921,22 @@ where let payload = match best_payload.0.block().header().state_root { B256::ZERO => { + // Drain the async trie precalc pipeline for the latest result + let precalc_result = precalc_pipeline.and_then(|pipeline| { + let mut latest = None; + while let Ok(result) = pipeline.result_rx.try_recv() { + latest = Some(result); + } + if let Some(ref result) = latest { + info!( + target: "payload_builder", + flashblock_index = result.flashblock_index, + "Using async trie precalc result for resolve" + ); + } + latest + }); + // Get the fallback payload for payload resolution let fallback_payload_for_resolve = if self.config.specific.disable_async_calculate_state_root { @@ -841,7 +958,7 @@ where match self.client.state_by_block_hash(ctx.parent().hash()) { Ok(state_provider) => { if self.config.specific.disable_async_calculate_state_root { - resolve_zero_state_root(state_root_ctx, state_provider) + resolve_zero_state_root(state_root_ctx, state_provider, precalc_result) .unwrap_or_else(|err| { warn!( target: "payload_builder", @@ -852,7 +969,11 @@ where }) } else { self.task_executor.spawn_blocking(Box::pin(async move { - let _ = resolve_zero_state_root(state_root_ctx, state_provider); + let _ = resolve_zero_state_root( + state_root_ctx, + state_provider, + precalc_result, + ); })); fallback_payload_for_resolve } @@ -1228,6 +1349,79 @@ where )) } +/// Runs the async trie precalculation worker in a blocking context. +/// +/// Processes work items sequentially, maintaining an incremental trie update chain. +/// The first item does a full `state_root_with_updates`, subsequent items use +/// `state_root_from_nodes_with_updates` with the previous result's cached trie nodes. +fn run_trie_precalc_worker( + work_rx: std::sync::mpsc::Receiver, + result_tx: std::sync::mpsc::SyncSender, + state_provider: Box, + metrics: Arc, +) { + let mut prev_trie_updates: Option> = None; + + while let Ok(work_item) = work_rx.recv() { + let start_time = Instant::now(); + + let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); + + let result = if let Some(prev_trie) = &prev_trie_updates { + // Incremental path: reuse cached trie nodes from previous flashblock + let trie_input = TrieInput::new( + prev_trie.as_ref().clone(), + hashed_state.clone(), + hashed_state.construct_prefix_sets(), + ); + state_provider.state_root_from_nodes_with_updates(trie_input) + } else { + // First calculation: full trie computation + state_provider.state_root_with_updates(hashed_state) + }; + + match result { + Ok((state_root, trie_output)) => { + let trie_updates = Arc::new(trie_output); + prev_trie_updates = Some(trie_updates.clone()); + + let elapsed = start_time.elapsed(); + info!( + target: "payload_builder", + flashblock_index = work_item.flashblock_index, + state_root = %state_root, + duration_ms = elapsed.as_millis(), + "Async trie precalculation completed" + ); + metrics.state_root_calculation_duration.record(elapsed); + + if result_tx + .send(TriePrecalcResult { + flashblock_index: work_item.flashblock_index, + trie_updates, + }) + .is_err() + { + // Main loop dropped the receiver — stop worker + break; + } + } + Err(err) => { + warn!( + target: "payload_builder", + flashblock_index = work_item.flashblock_index, + error = %err, + "Async trie precalculation failed, resetting chain" + ); + // Reset chain: next item will do a full calculation + prev_trie_updates = None; + } + } + } + + debug!(target: "payload_builder", "Trie precalc worker exiting"); +} + struct CalculateStateRootContext { best_payload: (OpBuiltPayload, BundleState), parent_hash: BlockHash, @@ -1238,11 +1432,12 @@ struct CalculateStateRootContext { fn resolve_zero_state_root( ctx: CalculateStateRootContext, state_provider: Box, + precalc_result: Option, ) -> Result { let resolve_start_time = Instant::now(); let (state_root, trie_updates, hashed_state) = - calculate_state_root_on_resolve(&ctx, state_provider)?; + calculate_state_root_on_resolve(&ctx, state_provider, precalc_result)?; let payload_id = ctx.best_payload.0.id(); let fees = ctx.best_payload.0.fees(); @@ -1290,40 +1485,70 @@ fn resolve_zero_state_root( Ok(updated_payload) } -/// Calculates only the state root for an existing payload +/// Calculates only the state root for an existing payload. +/// +/// If `precalc_result` is available, uses incremental `state_root_from_nodes_with_updates` +/// seeded by the precalculated trie updates. Otherwise falls back to a cold full calculation. fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, state_provider: Box, + precalc_result: Option, ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { let total_start_time = Instant::now(); + let used_precalc = precalc_result.is_some(); let hashed_state_start = Instant::now(); let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); let hashed_state_time = hashed_state_start.elapsed(); let state_root_start = Instant::now(); - let state_root_updates = + let (state_root, trie_updates) = if let Some(precalc) = precalc_result { + // Incremental path: use precalculated trie from background worker + info!( + target: "payload_builder", + precalc_flashblock = precalc.flashblock_index, + "Using precalculated trie for resolve" + ); + + let trie_input = TrieInput::new( + precalc.trie_updates.as_ref().clone(), + hashed_state.clone(), + hashed_state.construct_prefix_sets(), + ); + + state_provider.state_root_from_nodes_with_updates(trie_input).inspect_err(|err| { + warn!(target: "payload_builder", + parent_header=%ctx.parent_hash, + %err, + "failed incremental state root on resolve" + ); + })? + } else { + // Cold path: full trie calculation state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { warn!(target: "payload_builder", parent_header=%ctx.parent_hash, %err, "failed to calculate state root for payload" ); - })?; + })? + }; let state_root_time = state_root_start.elapsed(); let total_time = total_start_time.elapsed(); + let method = if used_precalc { "incremental" } else { "cold" }; info!( target: "payload_builder", hashed_state_ms = hashed_state_time.as_millis(), state_root_ms = state_root_time.as_millis(), total_ms = total_time.as_millis(), - state_root = %state_root_updates.0, - "calculate_state_root_on_resolve timing (cold)" + state_root = %state_root, + method, + "calculate_state_root_on_resolve timing" ); ctx.metrics.state_root_calculation_duration.record(total_time); ctx.metrics.state_root_calculation_gauge.set(total_time); - Ok((state_root_updates.0, state_root_updates.1, hashed_state)) + Ok((state_root, trie_updates, hashed_state)) } From e1c32b50e41f2410a746817171a3af2246763a10 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 27 Feb 2026 12:39:57 +0800 Subject: [PATCH 03/17] update --- crates/builder/src/args/op.rs | 16 + .../src/payload/flashblocks/payload.rs | 285 +++++++++++++----- deps/optimism | 1 + 3 files changed, 225 insertions(+), 77 deletions(-) create mode 160000 deps/optimism diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index 383546f6..6bb490b9 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -171,6 +171,22 @@ pub struct FlashblocksArgs { default_value = "256" )] pub ws_subscriber_limit: Option, + + /// Whether to enable async trie precalculation for flashblocks + #[arg( + long = "flashblocks.enable-async-trie-precalc", + default_value = "false", + env = "FLASHBLOCKS_ENABLE_ASYNC_TRIE_PRECALC" + )] + pub enable_async_trie_precalc: bool, + + /// The flashblock index at which to start async trie precalculation + #[arg( + long = "flashblocks.async-trie-precalc-start-flashblock", + default_value = "1", + env = "FLASHBLOCKS_ASYNC_TRIE_PRECALC_START_FLASHBLOCK" + )] + pub async_trie_precalc_start_flashblock: u64, } impl Default for FlashblocksArgs { diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 1d6f8c82..f5992b77 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -52,7 +52,11 @@ use reth_revm::{ State, }; use reth_transaction_pool::TransactionPool; -use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; +use reth_trie::{ + prefix_set::{PrefixSetMut, TriePrefixSetsMut}, + updates::TrieUpdates, + HashedPostState, Nibbles, TrieInput, +}; use revm::Database; use std::{collections::BTreeMap, sync::Arc, time::Instant}; use tokio::sync::mpsc; @@ -191,6 +195,9 @@ struct TriePrecalcResult { flashblock_index: u64, /// The computed trie updates that can seed the next incremental calculation. trie_updates: Arc, + /// The hashed post state at the time of precalculation, used to compute + /// delta prefix sets on resolve (only the diff since this state needs recomputation). + hashed_state: HashedPostState, } /// Work item sent from the main flashblock loop to the background trie worker. @@ -695,6 +702,7 @@ where flashblock_index = fb_index, "Async trie precalc worker disconnected" ); + precalc_pipeline = None; } } } @@ -1349,79 +1357,6 @@ where )) } -/// Runs the async trie precalculation worker in a blocking context. -/// -/// Processes work items sequentially, maintaining an incremental trie update chain. -/// The first item does a full `state_root_with_updates`, subsequent items use -/// `state_root_from_nodes_with_updates` with the previous result's cached trie nodes. -fn run_trie_precalc_worker( - work_rx: std::sync::mpsc::Receiver, - result_tx: std::sync::mpsc::SyncSender, - state_provider: Box, - metrics: Arc, -) { - let mut prev_trie_updates: Option> = None; - - while let Ok(work_item) = work_rx.recv() { - let start_time = Instant::now(); - - let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); - - let result = if let Some(prev_trie) = &prev_trie_updates { - // Incremental path: reuse cached trie nodes from previous flashblock - let trie_input = TrieInput::new( - prev_trie.as_ref().clone(), - hashed_state.clone(), - hashed_state.construct_prefix_sets(), - ); - state_provider.state_root_from_nodes_with_updates(trie_input) - } else { - // First calculation: full trie computation - state_provider.state_root_with_updates(hashed_state) - }; - - match result { - Ok((state_root, trie_output)) => { - let trie_updates = Arc::new(trie_output); - prev_trie_updates = Some(trie_updates.clone()); - - let elapsed = start_time.elapsed(); - info!( - target: "payload_builder", - flashblock_index = work_item.flashblock_index, - state_root = %state_root, - duration_ms = elapsed.as_millis(), - "Async trie precalculation completed" - ); - metrics.state_root_calculation_duration.record(elapsed); - - if result_tx - .send(TriePrecalcResult { - flashblock_index: work_item.flashblock_index, - trie_updates, - }) - .is_err() - { - // Main loop dropped the receiver — stop worker - break; - } - } - Err(err) => { - warn!( - target: "payload_builder", - flashblock_index = work_item.flashblock_index, - error = %err, - "Async trie precalculation failed, resetting chain" - ); - // Reset chain: next item will do a full calculation - prev_trie_updates = None; - } - } - } - - debug!(target: "payload_builder", "Trie precalc worker exiting"); -} - struct CalculateStateRootContext { best_payload: (OpBuiltPayload, BundleState), parent_hash: BlockHash, @@ -1488,7 +1423,8 @@ fn resolve_zero_state_root( /// Calculates only the state root for an existing payload. /// /// If `precalc_result` is available, uses incremental `state_root_from_nodes_with_updates` -/// seeded by the precalculated trie updates. Otherwise falls back to a cold full calculation. +/// seeded by the precalculated trie updates with delta prefix sets (only paths that changed +/// since the precalc flashblock). Otherwise falls back to a cold full calculation. fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, state_provider: Box, @@ -1504,16 +1440,23 @@ fn calculate_state_root_on_resolve( let state_root_start = Instant::now(); let (state_root, trie_updates) = if let Some(precalc) = precalc_result { // Incremental path: use precalculated trie from background worker + // with delta prefix sets — only recompute paths that changed since precalc + let delta_prefix_sets = compute_delta_prefix_sets(&precalc.hashed_state, &hashed_state); + info!( target: "payload_builder", precalc_flashblock = precalc.flashblock_index, - "Using precalculated trie for resolve" + full_account_count = hashed_state.accounts.len(), + delta_account_count = delta_prefix_sets.account_prefix_set.len(), + full_storage_count = hashed_state.storages.len(), + delta_storage_count = delta_prefix_sets.storage_prefix_sets.len(), + "Using delta prefix sets for resolve" ); let trie_input = TrieInput::new( precalc.trie_updates.as_ref().clone(), hashed_state.clone(), - hashed_state.construct_prefix_sets(), + delta_prefix_sets, ); state_provider.state_root_from_nodes_with_updates(trie_input).inspect_err(|err| { @@ -1552,3 +1495,191 @@ fn calculate_state_root_on_resolve( Ok((state_root, trie_updates, hashed_state)) } + +/// Runs the async trie precalculation worker in a blocking context. +/// +/// Processes work items sequentially, maintaining an incremental trie update chain. +/// The first item does a full `state_root_with_updates`, subsequent items use +/// `state_root_from_nodes_with_updates` with the previous result's cached trie nodes. +fn run_trie_precalc_worker( + work_rx: std::sync::mpsc::Receiver, + result_tx: std::sync::mpsc::SyncSender, + state_provider: Box, + metrics: Arc, +) { + let mut prev_trie_updates: Option> = None; + + while let Ok(work_item) = work_rx.recv() { + let start_time = Instant::now(); + + let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); + + let result = if let Some(prev_trie) = &prev_trie_updates { + // Incremental path: reuse cached trie nodes from previous flashblock + let trie_input = TrieInput::new( + prev_trie.as_ref().clone(), + hashed_state.clone(), + hashed_state.construct_prefix_sets(), + ); + state_provider.state_root_from_nodes_with_updates(trie_input) + } else { + // First calculation: full trie computation + state_provider.state_root_with_updates(hashed_state.clone()) + }; + + match result { + Ok((state_root, trie_output)) => { + let trie_updates = Arc::new(trie_output); + prev_trie_updates = Some(trie_updates.clone()); + + let elapsed = start_time.elapsed(); + info!( + target: "payload_builder", + flashblock_index = work_item.flashblock_index, + state_root = %state_root, + duration_ms = elapsed.as_millis(), + "Async trie precalculation completed" + ); + metrics.state_root_calculation_duration.record(elapsed); + + if result_tx + .send(TriePrecalcResult { + flashblock_index: work_item.flashblock_index, + trie_updates, + hashed_state, + }) + .is_err() + { + // Main loop dropped the receiver — stop worker + break; + } + } + Err(err) => { + warn!( + target: "payload_builder", + flashblock_index = work_item.flashblock_index, + error = %err, + "Async trie precalculation failed, resetting chain" + ); + // Reset chain: next item will do a full calculation + prev_trie_updates = None; + } + } + } + + debug!(target: "payload_builder", "Trie precalc worker exiting"); +} + +/// Computes delta prefix sets by diffing two `HashedPostState`s. +/// +/// Returns a `TriePrefixSetsMut` containing only the account/storage paths that +/// changed between `precalc_state` (from the background trie worker) and +/// `final_state` (the full cumulative state at resolve time). +/// +/// This allows the trie walker to skip paths already correctly computed by the +/// precalc worker, dramatically reducing the work needed on resolve. +fn compute_delta_prefix_sets( + precalc_state: &HashedPostState, + final_state: &HashedPostState, +) -> TriePrefixSetsMut { + let mut account_prefix_set = PrefixSetMut::default(); + let mut storage_prefix_sets = alloy_primitives::map::B256Map::::default(); + let mut destroyed_accounts = alloy_primitives::map::B256Set::default(); + + // 1. Forward pass: iterate final_state accounts, compare against precalc_state + for (hashed_address, final_account) in &final_state.accounts { + let changed = match precalc_state.accounts.get(hashed_address) { + Some(precalc_account) => precalc_account != final_account, + None => true, // New account not in precalc + }; + if changed { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + // If account was destroyed (None) in final but existed in precalc, mark it + if final_account.is_none() { + destroyed_accounts.insert(*hashed_address); + } + } + } + + // 2. Reverse pass: accounts in precalc_state but not in final_state + for hashed_address in precalc_state.accounts.keys() { + if !final_state.accounts.contains_key(hashed_address) { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + } + } + + // 3. Forward pass: iterate final_state storages, compare against precalc_state + for (hashed_address, final_storage) in &final_state.storages { + match precalc_state.storages.get(hashed_address) { + Some(precalc_storage) => { + // If wiped flag changed, include all slots for this account + if final_storage.wiped != precalc_storage.wiped { + // Include account and all its storage slots + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); + for slot_key in final_storage.storage.keys() { + storage_set.insert(Nibbles::unpack(slot_key)); + } + for slot_key in precalc_storage.storage.keys() { + storage_set.insert(Nibbles::unpack(slot_key)); + } + if final_storage.wiped { + destroyed_accounts.insert(*hashed_address); + } + } else { + // Diff slot-by-slot + let mut has_storage_diff = false; + let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); + + // Forward: slots in final that differ from precalc + for (slot_key, final_value) in &final_storage.storage { + let changed = match precalc_storage.storage.get(slot_key) { + Some(precalc_value) => precalc_value != final_value, + None => true, + }; + if changed { + storage_set.insert(Nibbles::unpack(slot_key)); + has_storage_diff = true; + } + } + + // Reverse: slots in precalc but not in final + for slot_key in precalc_storage.storage.keys() { + if !final_storage.storage.contains_key(slot_key) { + storage_set.insert(Nibbles::unpack(slot_key)); + has_storage_diff = true; + } + } + + if has_storage_diff { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + } + } + } + None => { + // Entirely new storage not in precalc — include all slots + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); + for slot_key in final_storage.storage.keys() { + storage_set.insert(Nibbles::unpack(slot_key)); + } + } + } + } + + // 4. Reverse pass: storages in precalc_state but not in final_state + for (hashed_address, precalc_storage) in &precalc_state.storages { + if !final_state.storages.contains_key(hashed_address) { + account_prefix_set.insert(Nibbles::unpack(hashed_address)); + let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); + for slot_key in precalc_storage.storage.keys() { + storage_set.insert(Nibbles::unpack(slot_key)); + } + } + } + + // Remove empty storage prefix sets + storage_prefix_sets.retain(|_, v| !v.is_empty()); + + TriePrefixSetsMut { account_prefix_set, storage_prefix_sets, destroyed_accounts } +} diff --git a/deps/optimism b/deps/optimism new file mode 160000 index 00000000..e9c20bc0 --- /dev/null +++ b/deps/optimism @@ -0,0 +1 @@ +Subproject commit e9c20bc0c77592945607fb85c8b3b3fde68d33a5 From 96329dec7a8fb38b953bfe7fa9286d250090077c Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 27 Feb 2026 13:21:00 +0800 Subject: [PATCH 04/17] fix: remove redundant code after merge --- crates/builder/src/args/op.rs | 17 +---------------- .../builder/src/payload/flashblocks/config.rs | 2 +- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index 6bb490b9..2806a8e0 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -124,7 +124,7 @@ pub struct FlashblocksArgs { /// flashblock 2 (skipping 0 and 1). #[arg( long = "flashblocks.async-trie-precalc-start-flashblock", - default_value = "0", + default_value = "1", env = "FLASHBLOCKS_ASYNC_TRIE_PRECALC_START_FLASHBLOCK" )] pub flashblocks_async_trie_precalc_start_flashblock: u64, @@ -172,21 +172,6 @@ pub struct FlashblocksArgs { )] pub ws_subscriber_limit: Option, - /// Whether to enable async trie precalculation for flashblocks - #[arg( - long = "flashblocks.enable-async-trie-precalc", - default_value = "false", - env = "FLASHBLOCKS_ENABLE_ASYNC_TRIE_PRECALC" - )] - pub enable_async_trie_precalc: bool, - - /// The flashblock index at which to start async trie precalculation - #[arg( - long = "flashblocks.async-trie-precalc-start-flashblock", - default_value = "1", - env = "FLASHBLOCKS_ASYNC_TRIE_PRECALC_START_FLASHBLOCK" - )] - pub async_trie_precalc_start_flashblock: u64, } impl Default for FlashblocksArgs { diff --git a/crates/builder/src/payload/flashblocks/config.rs b/crates/builder/src/payload/flashblocks/config.rs index 4eabab40..fdcb095c 100644 --- a/crates/builder/src/payload/flashblocks/config.rs +++ b/crates/builder/src/payload/flashblocks/config.rs @@ -89,7 +89,7 @@ impl Default for FlashblocksConfig { p2p_process_full_payload: false, ws_subscriber_limit: None, enable_async_trie_precalc: false, - async_trie_precalc_start_flashblock: 0, + async_trie_precalc_start_flashblock: 1, } } } From bbab82c059bc2ecef864651b653c8822d940a68d Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 14:13:29 +0800 Subject: [PATCH 05/17] fix: use Arc for best_payload & code cleanup --- .../src/payload/flashblocks/payload.rs | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index f5992b77..e04aad8c 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -22,7 +22,10 @@ use alloy_consensus::{ }; use alloy_eips::{eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE, Encodable2718}; use alloy_evm::block::BlockExecutionResult; -use alloy_primitives::{Address, BlockHash, B256, U256}; +use alloy_primitives::{ + map::{B256Map, B256Set}, + Address, BlockHash, B256, U256, +}; use eyre::WrapErr as _; use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, @@ -190,6 +193,7 @@ impl FlashblocksState { } /// Result of an async trie precalculation for a single flashblock. +#[derive(Debug)] struct TriePrecalcResult { /// The flashblock index this result corresponds to. flashblock_index: u64, @@ -201,9 +205,10 @@ struct TriePrecalcResult { } /// Work item sent from the main flashblock loop to the background trie worker. +#[derive(Debug)] struct TriePrecalcWorkItem { flashblock_index: u64, - bundle_state: BundleState, + bundle_state: Arc, } /// Manages the async trie precalculation pipeline. @@ -462,7 +467,7 @@ where .try_send(fb_payload.clone()) .map_err(PayloadBuilderError::other)?; } - let mut best_payload = (fallback_payload.clone(), bundle_state); + let mut best_payload = (fallback_payload.clone(), Arc::new(bundle_state)); info!( target: "payload_builder", @@ -725,7 +730,7 @@ where state_provider: impl reth::providers::StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, - best_payload: &mut (OpBuiltPayload, BundleState), + best_payload: &mut (OpBuiltPayload, Arc), ) -> eyre::Result> { let flashblock_index = fb_state.flashblock_index(); let mut target_gas_for_batch = fb_state.target_gas_for_batch; @@ -859,7 +864,7 @@ where self.built_fb_payload_tx .try_send(fb_payload) .wrap_err("failed to send built payload to handler")?; - *best_payload = (new_payload, bundle_state); + *best_payload = (new_payload, Arc::new(bundle_state)); // Record flashblock build duration ctx.metrics.flashblock_build_duration.record(flashblock_build_start_time.elapsed()); @@ -918,7 +923,7 @@ where fn resolve_best_payload( &self, ctx: &OpPayloadBuilderCtx, - best_payload: (OpBuiltPayload, BundleState), + best_payload: (OpBuiltPayload, Arc), fallback_payload: OpBuiltPayload, resolve_payload: &BlockCell, precalc_pipeline: Option, @@ -1358,7 +1363,7 @@ where } struct CalculateStateRootContext { - best_payload: (OpBuiltPayload, BundleState), + best_payload: (OpBuiltPayload, Arc), parent_hash: BlockHash, built_payload_tx: mpsc::Sender, metrics: Arc, @@ -1583,15 +1588,15 @@ fn compute_delta_prefix_sets( final_state: &HashedPostState, ) -> TriePrefixSetsMut { let mut account_prefix_set = PrefixSetMut::default(); - let mut storage_prefix_sets = alloy_primitives::map::B256Map::::default(); - let mut destroyed_accounts = alloy_primitives::map::B256Set::default(); + let mut storage_prefix_sets = B256Map::::default(); + let mut destroyed_accounts = B256Set::default(); // 1. Forward pass: iterate final_state accounts, compare against precalc_state for (hashed_address, final_account) in &final_state.accounts { - let changed = match precalc_state.accounts.get(hashed_address) { - Some(precalc_account) => precalc_account != final_account, - None => true, // New account not in precalc - }; + let changed = !precalc_state + .accounts + .get(hashed_address) + .is_some_and(|precalc| precalc == final_account); if changed { account_prefix_set.insert(Nibbles::unpack(hashed_address)); // If account was destroyed (None) in final but existed in precalc, mark it @@ -1633,10 +1638,10 @@ fn compute_delta_prefix_sets( // Forward: slots in final that differ from precalc for (slot_key, final_value) in &final_storage.storage { - let changed = match precalc_storage.storage.get(slot_key) { - Some(precalc_value) => precalc_value != final_value, - None => true, - }; + let changed = !precalc_storage + .storage + .get(slot_key) + .is_some_and(|precalc_value| precalc_value == final_value); if changed { storage_set.insert(Nibbles::unpack(slot_key)); has_storage_diff = true; From 9bf94c765045473d4b07b9c680006116ae342b71 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 14:32:20 +0800 Subject: [PATCH 06/17] fix: using try_unwrap & logging incremental only if using previous flashblock's tries --- .../src/payload/flashblocks/payload.rs | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index e04aad8c..d15d4a4c 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -503,7 +503,7 @@ where ctx.metrics.payload_num_tx_gauge.set(info.executed_transactions.len() as f64); // return early since we don't need to build a block with transactions from the pool - self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload, None); + self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload, None, 0); return Ok(()); } @@ -631,6 +631,7 @@ where fallback_payload, &resolve_payload, precalc_pipeline.take(), + fb_state.flashblock_index(), ); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks); return Ok(()); @@ -655,6 +656,7 @@ where fallback_payload, &resolve_payload, precalc_pipeline.take(), + fb_state.flashblock_index(), ); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks); return Ok(()); @@ -674,6 +676,7 @@ where fallback_payload, &resolve_payload, precalc_pipeline.take(), + fb_state.flashblock_index(), ); return Err(PayloadBuilderError::Other(err.into())); } @@ -927,6 +930,7 @@ where fallback_payload: OpBuiltPayload, resolve_payload: &BlockCell, precalc_pipeline: Option, + current_flashblock_index: u64, ) { if resolve_payload.get().is_some() { return; @@ -965,6 +969,7 @@ where parent_hash: ctx.parent().hash(), built_payload_tx: self.built_payload_tx.clone(), metrics: self.metrics.clone(), + current_flashblock_index, }; // Async calculate state root @@ -1367,6 +1372,7 @@ struct CalculateStateRootContext { parent_hash: BlockHash, built_payload_tx: mpsc::Sender, metrics: Arc, + current_flashblock_index: u64, } fn resolve_zero_state_root( @@ -1436,14 +1442,28 @@ fn calculate_state_root_on_resolve( precalc_result: Option, ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { let total_start_time = Instant::now(); - let used_precalc = precalc_result.is_some(); + + // Only use incremental path if precalc is from the immediately previous flashblock + let eligible_precalc = precalc_result.filter(|precalc| { + let eligible = precalc.flashblock_index + 1 == ctx.current_flashblock_index; + if !eligible { + info!( + target: "payload_builder", + precalc_flashblock = precalc.flashblock_index, + current_flashblock = ctx.current_flashblock_index, + "Precalc stale (not immediately previous flashblock), falling back to cold path" + ); + } + eligible + }); + let used_precalc = eligible_precalc.is_some(); let hashed_state_start = Instant::now(); let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); let hashed_state_time = hashed_state_start.elapsed(); let state_root_start = Instant::now(); - let (state_root, trie_updates) = if let Some(precalc) = precalc_result { + let (state_root, trie_updates) = if let Some(precalc) = eligible_precalc { // Incremental path: use precalculated trie from background worker // with delta prefix sets — only recompute paths that changed since precalc let delta_prefix_sets = compute_delta_prefix_sets(&precalc.hashed_state, &hashed_state); @@ -1458,11 +1478,11 @@ fn calculate_state_root_on_resolve( "Using delta prefix sets for resolve" ); - let trie_input = TrieInput::new( - precalc.trie_updates.as_ref().clone(), - hashed_state.clone(), - delta_prefix_sets, - ); + let trie_updates_owned = Arc::try_unwrap(precalc.trie_updates) + .unwrap_or_else(|arc| arc.as_ref().clone()); + + let trie_input = + TrieInput::new(trie_updates_owned, hashed_state.clone(), delta_prefix_sets); state_provider.state_root_from_nodes_with_updates(trie_input).inspect_err(|err| { warn!(target: "payload_builder", @@ -1519,10 +1539,12 @@ fn run_trie_precalc_worker( let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); - let result = if let Some(prev_trie) = &prev_trie_updates { + let result = if let Some(prev_trie) = prev_trie_updates.take() { // Incremental path: reuse cached trie nodes from previous flashblock + let trie_updates_owned = + Arc::try_unwrap(prev_trie).unwrap_or_else(|arc| arc.as_ref().clone()); let trie_input = TrieInput::new( - prev_trie.as_ref().clone(), + trie_updates_owned, hashed_state.clone(), hashed_state.construct_prefix_sets(), ); From 09736afd98578a405b8069e953d438a7dfa360cb Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 14:52:14 +0800 Subject: [PATCH 07/17] fix: change work channel size --- crates/builder/src/payload/flashblocks/payload.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index d15d4a4c..63be45d5 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -585,7 +585,8 @@ where { match self.client.state_by_block_hash(ctx.parent().hash()) { Ok(worker_state_provider) => { - let (work_tx, work_rx) = std::sync::mpsc::sync_channel(2); + let (work_tx, work_rx) = + std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); let (result_tx, result_rx) = std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); let metrics = self.metrics.clone(); @@ -1535,6 +1536,13 @@ fn run_trie_precalc_worker( let mut prev_trie_updates: Option> = None; while let Ok(work_item) = work_rx.recv() { + // Skip stale items, always compute the latest available flashblock + let mut latest_item = work_item; + while let Ok(newer_item) = work_rx.try_recv() { + latest_item = newer_item; + } + let work_item = latest_item; + let start_time = Instant::now(); let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); From 0ef18e0a297accd82412a3d3492c7e43ca117962 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 15:25:29 +0800 Subject: [PATCH 08/17] feat: add incremental, incremental_stale or cold in logs --- .../src/payload/flashblocks/payload.rs | 64 ++++++++----------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 63be45d5..ba3ef341 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -1444,39 +1444,23 @@ fn calculate_state_root_on_resolve( ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { let total_start_time = Instant::now(); - // Only use incremental path if precalc is from the immediately previous flashblock - let eligible_precalc = precalc_result.filter(|precalc| { - let eligible = precalc.flashblock_index + 1 == ctx.current_flashblock_index; - if !eligible { - info!( - target: "payload_builder", - precalc_flashblock = precalc.flashblock_index, - current_flashblock = ctx.current_flashblock_index, - "Precalc stale (not immediately previous flashblock), falling back to cold path" - ); - } - eligible - }); - let used_precalc = eligible_precalc.is_some(); - let hashed_state_start = Instant::now(); let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); let hashed_state_time = hashed_state_start.elapsed(); let state_root_start = Instant::now(); - let (state_root, trie_updates) = if let Some(precalc) = eligible_precalc { - // Incremental path: use precalculated trie from background worker - // with delta prefix sets — only recompute paths that changed since precalc + let (state_root, trie_updates, method) = if let Some(precalc) = precalc_result { let delta_prefix_sets = compute_delta_prefix_sets(&precalc.hashed_state, &hashed_state); + let is_immediate = precalc.flashblock_index + 1 == ctx.current_flashblock_index; info!( target: "payload_builder", precalc_flashblock = precalc.flashblock_index, - full_account_count = hashed_state.accounts.len(), + current_flashblock = ctx.current_flashblock_index, + is_immediate, delta_account_count = delta_prefix_sets.account_prefix_set.len(), - full_storage_count = hashed_state.storages.len(), delta_storage_count = delta_prefix_sets.storage_prefix_sets.len(), - "Using delta prefix sets for resolve" + "Using incremental resolve with delta prefix sets" ); let trie_updates_owned = Arc::try_unwrap(precalc.trie_updates) @@ -1485,27 +1469,33 @@ fn calculate_state_root_on_resolve( let trie_input = TrieInput::new(trie_updates_owned, hashed_state.clone(), delta_prefix_sets); - state_provider.state_root_from_nodes_with_updates(trie_input).inspect_err(|err| { - warn!(target: "payload_builder", - parent_header=%ctx.parent_hash, - %err, - "failed incremental state root on resolve" - ); - })? + let (root, updates) = state_provider + .state_root_from_nodes_with_updates(trie_input) + .inspect_err(|err| { + warn!(target: "payload_builder", + parent_header=%ctx.parent_hash, + %err, + "failed incremental state root on resolve" + ); + })?; + + let method = if is_immediate { "incremental" } else { "incremental_stale" }; + (root, updates, method) } else { - // Cold path: full trie calculation - state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { - warn!(target: "payload_builder", - parent_header=%ctx.parent_hash, - %err, - "failed to calculate state root for payload" - ); - })? + // Cold path: no precalc available + let (root, updates) = + state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { + warn!(target: "payload_builder", + parent_header=%ctx.parent_hash, + %err, + "failed to calculate state root for payload" + ); + })?; + (root, updates, "cold") }; let state_root_time = state_root_start.elapsed(); let total_time = total_start_time.elapsed(); - let method = if used_precalc { "incremental" } else { "cold" }; info!( target: "payload_builder", hashed_state_ms = hashed_state_time.as_millis(), From 0c4b7b4d7fabb8c348e0c34af4904524c689265e Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 16:21:54 +0800 Subject: [PATCH 09/17] ensure strict incremental --- crates/builder/src/metrics/builder.rs | 2 + .../src/payload/flashblocks/payload.rs | 87 ++++++++++--------- 2 files changed, 48 insertions(+), 41 deletions(-) diff --git a/crates/builder/src/metrics/builder.rs b/crates/builder/src/metrics/builder.rs index 8332d60b..86fa09da 100644 --- a/crates/builder/src/metrics/builder.rs +++ b/crates/builder/src/metrics/builder.rs @@ -40,6 +40,8 @@ pub struct BuilderMetrics { pub state_root_calculation_duration: Histogram, /// Latest state root calculation duration pub state_root_calculation_gauge: Gauge, + /// Histogram of async trie precalculation duration (background worker) + pub trie_precalc_duration: Histogram, /// Histogram of sequencer transaction execution duration pub sequencer_tx_duration: Histogram, /// Latest sequencer transaction execution duration diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index ba3ef341..2c8f1ab8 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -939,16 +939,24 @@ where let payload = match best_payload.0.block().header().state_root { B256::ZERO => { - // Drain the async trie precalc pipeline for the latest result + // Block-wait for the worker to finish the immediately prior flashblock. + // Drop work_tx so the worker finishes remaining items and exits. + let target_index = current_flashblock_index.saturating_sub(1); let precalc_result = precalc_pipeline.and_then(|pipeline| { + drop(pipeline.work_tx); let mut latest = None; - while let Ok(result) = pipeline.result_rx.try_recv() { + while let Ok(result) = pipeline.result_rx.recv() { + let is_target = result.flashblock_index == target_index; latest = Some(result); + if is_target { + break; + } } if let Some(ref result) = latest { info!( target: "payload_builder", flashblock_index = result.flashblock_index, + target_index, "Using async trie precalc result for resolve" ); } @@ -1122,9 +1130,9 @@ where // calculate the state root let state_root_start_time = Instant::now(); let mut state_root = B256::ZERO; - let mut trie_output = TrieUpdates::default(); + let mut trie_output_arc = Arc::new(TrieUpdates::default()); let mut hashed_state = HashedPostState::default(); - let mut trie_updates_to_cache: Option = None; + let mut trie_updates_to_cache: Option> = None; if calculate_state_root { let state_provider = state.database.as_ref(); @@ -1135,7 +1143,7 @@ where hashed_state = state_provider.hashed_post_state(&state.bundle_state); - (state_root, trie_output) = if let Some(prev_trie) = prev_trie { + if let Some(prev_trie) = prev_trie { // Incremental path: Use cached trie from previous flashblock debug!( target: "payload_builder", @@ -1149,9 +1157,11 @@ where hashed_state.construct_prefix_sets(), ); - state_provider + let trie_output; + (state_root, trie_output) = state_provider .state_root_from_nodes_with_updates(trie_input) - .map_err(PayloadBuilderError::other)? + .map_err(PayloadBuilderError::other)?; + trie_output_arc = Arc::new(trie_output); } else { debug!( target: "payload_builder", @@ -1159,20 +1169,23 @@ where "Using full state root calculation" ); - state.database.as_ref().state_root_with_updates(hashed_state.clone()).inspect_err( - |err| { - warn!( - target: "payload_builder", - parent_header=%ctx.parent().hash(), - %err, - "failed to calculate state root for payload" - ); - }, - )? + let trie_output; + (state_root, trie_output) = + state.database.as_ref().state_root_with_updates(hashed_state.clone()).inspect_err( + |err| { + warn!( + target: "payload_builder", + parent_header=%ctx.parent().hash(), + %err, + "failed to calculate state root for payload" + ); + }, + )?; + trie_output_arc = Arc::new(trie_output); }; // Cache trie updates to apply in fb_state later (avoids mut on fb_state parameter). - trie_updates_to_cache = Some(trie_output.clone()); + trie_updates_to_cache = Some(trie_output_arc.clone()); let state_root_calculation_time = state_root_start_time.elapsed(); ctx.metrics.state_root_calculation_duration.record(state_root_calculation_time); @@ -1271,7 +1284,7 @@ where let executed = BuiltPayloadExecutedBlock { recovered_block: Arc::new(recovered_block), execution_output: Arc::new(execution_output), - trie_updates: either::Either::Left(Arc::new(trie_output)), + trie_updates: either::Either::Left(trie_output_arc), hashed_state: either::Either::Left(Arc::new(hashed_state)), }; debug!( @@ -1303,7 +1316,7 @@ where let new_receipts = info.receipts[last_idx..].to_vec(); if let Some(fb) = fb_state { if let Some(updates) = trie_updates_to_cache.take() { - fb.prev_trie_updates = Some(Arc::new(updates)); + fb.prev_trie_updates = Some(updates); } fb.set_last_flashblock_tx_index(info.executed_transactions.len()); } @@ -1448,16 +1461,18 @@ fn calculate_state_root_on_resolve( let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); let hashed_state_time = hashed_state_start.elapsed(); + // Only use precalc from the immediately prior flashblock (strict incremental) + let eligible_precalc = + precalc_result.filter(|p| p.flashblock_index + 1 == ctx.current_flashblock_index); + let state_root_start = Instant::now(); - let (state_root, trie_updates, method) = if let Some(precalc) = precalc_result { + let (state_root, trie_updates, method) = if let Some(precalc) = eligible_precalc { let delta_prefix_sets = compute_delta_prefix_sets(&precalc.hashed_state, &hashed_state); - let is_immediate = precalc.flashblock_index + 1 == ctx.current_flashblock_index; info!( target: "payload_builder", precalc_flashblock = precalc.flashblock_index, current_flashblock = ctx.current_flashblock_index, - is_immediate, delta_account_count = delta_prefix_sets.account_prefix_set.len(), delta_storage_count = delta_prefix_sets.storage_prefix_sets.len(), "Using incremental resolve with delta prefix sets" @@ -1479,10 +1494,8 @@ fn calculate_state_root_on_resolve( ); })?; - let method = if is_immediate { "incremental" } else { "incremental_stale" }; - (root, updates, method) + (root, updates, "incremental") } else { - // Cold path: no precalc available let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { warn!(target: "payload_builder", @@ -1523,15 +1536,9 @@ fn run_trie_precalc_worker( state_provider: Box, metrics: Arc, ) { - let mut prev_trie_updates: Option> = None; + let mut prev_trie_updates: Option = None; while let Ok(work_item) = work_rx.recv() { - // Skip stale items, always compute the latest available flashblock - let mut latest_item = work_item; - while let Ok(newer_item) = work_rx.try_recv() { - latest_item = newer_item; - } - let work_item = latest_item; let start_time = Instant::now(); @@ -1539,10 +1546,8 @@ fn run_trie_precalc_worker( let result = if let Some(prev_trie) = prev_trie_updates.take() { // Incremental path: reuse cached trie nodes from previous flashblock - let trie_updates_owned = - Arc::try_unwrap(prev_trie).unwrap_or_else(|arc| arc.as_ref().clone()); let trie_input = TrieInput::new( - trie_updates_owned, + prev_trie, hashed_state.clone(), hashed_state.construct_prefix_sets(), ); @@ -1554,9 +1559,6 @@ fn run_trie_precalc_worker( match result { Ok((state_root, trie_output)) => { - let trie_updates = Arc::new(trie_output); - prev_trie_updates = Some(trie_updates.clone()); - let elapsed = start_time.elapsed(); info!( target: "payload_builder", @@ -1565,12 +1567,15 @@ fn run_trie_precalc_worker( duration_ms = elapsed.as_millis(), "Async trie precalculation completed" ); - metrics.state_root_calculation_duration.record(elapsed); + metrics.trie_precalc_duration.record(elapsed); + + // Clone for our incremental chain, wrap in Arc for cross-thread transfer + prev_trie_updates = Some(trie_output.clone()); if result_tx .send(TriePrecalcResult { flashblock_index: work_item.flashblock_index, - trie_updates, + trie_updates: Arc::new(trie_output), hashed_state, }) .is_err() From 44b3d28c302389806577b0bf0fad90513d09b001 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 20:36:48 +0800 Subject: [PATCH 10/17] fix: use worker's precomputed state root directly --- .../src/payload/flashblocks/payload.rs | 200 +++--------------- 1 file changed, 33 insertions(+), 167 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 2c8f1ab8..5f717902 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -22,10 +22,7 @@ use alloy_consensus::{ }; use alloy_eips::{eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE, Encodable2718}; use alloy_evm::block::BlockExecutionResult; -use alloy_primitives::{ - map::{B256Map, B256Set}, - Address, BlockHash, B256, U256, -}; +use alloy_primitives::{Address, BlockHash, B256, U256}; use eyre::WrapErr as _; use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, @@ -55,11 +52,7 @@ use reth_revm::{ State, }; use reth_transaction_pool::TransactionPool; -use reth_trie::{ - prefix_set::{PrefixSetMut, TriePrefixSetsMut}, - updates::TrieUpdates, - HashedPostState, Nibbles, TrieInput, -}; +use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use revm::Database; use std::{collections::BTreeMap, sync::Arc, time::Instant}; use tokio::sync::mpsc; @@ -197,10 +190,11 @@ impl FlashblocksState { struct TriePrecalcResult { /// The flashblock index this result corresponds to. flashblock_index: u64, - /// The computed trie updates that can seed the next incremental calculation. + /// The computed state root for this flashblock's cumulative state. + state_root: B256, + /// The computed trie updates. trie_updates: Arc, - /// The hashed post state at the time of precalculation, used to compute - /// delta prefix sets on resolve (only the diff since this state needs recomputation). + /// The hashed post state at the time of precalculation. hashed_state: HashedPostState, } @@ -942,6 +936,7 @@ where // Block-wait for the worker to finish the immediately prior flashblock. // Drop work_tx so the worker finishes remaining items and exits. let target_index = current_flashblock_index.saturating_sub(1); + let wait_start = Instant::now(); let precalc_result = precalc_pipeline.and_then(|pipeline| { drop(pipeline.work_tx); let mut latest = None; @@ -952,16 +947,17 @@ where break; } } - if let Some(ref result) = latest { - info!( - target: "payload_builder", - flashblock_index = result.flashblock_index, - target_index, - "Using async trie precalc result for resolve" - ); - } latest }); + let wait_elapsed = wait_start.elapsed(); + info!( + target: "payload_builder", + wait_ms = wait_elapsed.as_millis(), + target_index, + got_result = precalc_result.is_some(), + got_flashblock_index = precalc_result.as_ref().map(|r| r.flashblock_index), + "resolve_best_payload: precalc wait completed" + ); // Get the fallback payload for payload resolution let fallback_payload_for_resolve = @@ -1447,9 +1443,10 @@ fn resolve_zero_state_root( /// Calculates only the state root for an existing payload. /// -/// If `precalc_result` is available, uses incremental `state_root_from_nodes_with_updates` -/// seeded by the precalculated trie updates with delta prefix sets (only paths that changed -/// since the precalc flashblock). Otherwise falls back to a cold full calculation. +/// If `precalc_result` is available and matches the immediately prior flashblock, +/// directly reuses the worker's already-computed state root, trie updates, and hashed +/// state. The worker operates on the same `Arc` so its results are correct. +/// Otherwise falls back to a cold full calculation via the provided state_provider. fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, state_provider: Box, @@ -1457,45 +1454,29 @@ fn calculate_state_root_on_resolve( ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { let total_start_time = Instant::now(); - let hashed_state_start = Instant::now(); - let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); - let hashed_state_time = hashed_state_start.elapsed(); - // Only use precalc from the immediately prior flashblock (strict incremental) let eligible_precalc = precalc_result.filter(|p| p.flashblock_index + 1 == ctx.current_flashblock_index); - let state_root_start = Instant::now(); - let (state_root, trie_updates, method) = if let Some(precalc) = eligible_precalc { - let delta_prefix_sets = compute_delta_prefix_sets(&precalc.hashed_state, &hashed_state); + let (state_root, trie_updates, hashed_state, method) = if let Some(precalc) = eligible_precalc + { + // The worker already computed the correct state root for this BundleState. + // Both worker and resolve share the same Arc, so the worker's + // state_root is exactly what we need. No cross-provider recomputation required. + let trie_updates = Arc::try_unwrap(precalc.trie_updates) + .unwrap_or_else(|arc| arc.as_ref().clone()); info!( target: "payload_builder", precalc_flashblock = precalc.flashblock_index, current_flashblock = ctx.current_flashblock_index, - delta_account_count = delta_prefix_sets.account_prefix_set.len(), - delta_storage_count = delta_prefix_sets.storage_prefix_sets.len(), - "Using incremental resolve with delta prefix sets" + state_root = %precalc.state_root, + "Using worker's precomputed state root directly" ); - let trie_updates_owned = Arc::try_unwrap(precalc.trie_updates) - .unwrap_or_else(|arc| arc.as_ref().clone()); - - let trie_input = - TrieInput::new(trie_updates_owned, hashed_state.clone(), delta_prefix_sets); - - let (root, updates) = state_provider - .state_root_from_nodes_with_updates(trie_input) - .inspect_err(|err| { - warn!(target: "payload_builder", - parent_header=%ctx.parent_hash, - %err, - "failed incremental state root on resolve" - ); - })?; - - (root, updates, "incremental") + (precalc.state_root, trie_updates, precalc.hashed_state, "incremental") } else { + let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); let (root, updates) = state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { warn!(target: "payload_builder", @@ -1504,15 +1485,12 @@ fn calculate_state_root_on_resolve( "failed to calculate state root for payload" ); })?; - (root, updates, "cold") + (root, updates, hashed_state, "cold") }; - let state_root_time = state_root_start.elapsed(); let total_time = total_start_time.elapsed(); info!( target: "payload_builder", - hashed_state_ms = hashed_state_time.as_millis(), - state_root_ms = state_root_time.as_millis(), total_ms = total_time.as_millis(), state_root = %state_root, method, @@ -1575,6 +1553,7 @@ fn run_trie_precalc_worker( if result_tx .send(TriePrecalcResult { flashblock_index: work_item.flashblock_index, + state_root, trie_updates: Arc::new(trie_output), hashed_state, }) @@ -1600,116 +1579,3 @@ fn run_trie_precalc_worker( debug!(target: "payload_builder", "Trie precalc worker exiting"); } -/// Computes delta prefix sets by diffing two `HashedPostState`s. -/// -/// Returns a `TriePrefixSetsMut` containing only the account/storage paths that -/// changed between `precalc_state` (from the background trie worker) and -/// `final_state` (the full cumulative state at resolve time). -/// -/// This allows the trie walker to skip paths already correctly computed by the -/// precalc worker, dramatically reducing the work needed on resolve. -fn compute_delta_prefix_sets( - precalc_state: &HashedPostState, - final_state: &HashedPostState, -) -> TriePrefixSetsMut { - let mut account_prefix_set = PrefixSetMut::default(); - let mut storage_prefix_sets = B256Map::::default(); - let mut destroyed_accounts = B256Set::default(); - - // 1. Forward pass: iterate final_state accounts, compare against precalc_state - for (hashed_address, final_account) in &final_state.accounts { - let changed = !precalc_state - .accounts - .get(hashed_address) - .is_some_and(|precalc| precalc == final_account); - if changed { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - // If account was destroyed (None) in final but existed in precalc, mark it - if final_account.is_none() { - destroyed_accounts.insert(*hashed_address); - } - } - } - - // 2. Reverse pass: accounts in precalc_state but not in final_state - for hashed_address in precalc_state.accounts.keys() { - if !final_state.accounts.contains_key(hashed_address) { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - } - } - - // 3. Forward pass: iterate final_state storages, compare against precalc_state - for (hashed_address, final_storage) in &final_state.storages { - match precalc_state.storages.get(hashed_address) { - Some(precalc_storage) => { - // If wiped flag changed, include all slots for this account - if final_storage.wiped != precalc_storage.wiped { - // Include account and all its storage slots - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); - for slot_key in final_storage.storage.keys() { - storage_set.insert(Nibbles::unpack(slot_key)); - } - for slot_key in precalc_storage.storage.keys() { - storage_set.insert(Nibbles::unpack(slot_key)); - } - if final_storage.wiped { - destroyed_accounts.insert(*hashed_address); - } - } else { - // Diff slot-by-slot - let mut has_storage_diff = false; - let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); - - // Forward: slots in final that differ from precalc - for (slot_key, final_value) in &final_storage.storage { - let changed = !precalc_storage - .storage - .get(slot_key) - .is_some_and(|precalc_value| precalc_value == final_value); - if changed { - storage_set.insert(Nibbles::unpack(slot_key)); - has_storage_diff = true; - } - } - - // Reverse: slots in precalc but not in final - for slot_key in precalc_storage.storage.keys() { - if !final_storage.storage.contains_key(slot_key) { - storage_set.insert(Nibbles::unpack(slot_key)); - has_storage_diff = true; - } - } - - if has_storage_diff { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - } - } - } - None => { - // Entirely new storage not in precalc — include all slots - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); - for slot_key in final_storage.storage.keys() { - storage_set.insert(Nibbles::unpack(slot_key)); - } - } - } - } - - // 4. Reverse pass: storages in precalc_state but not in final_state - for (hashed_address, precalc_storage) in &precalc_state.storages { - if !final_state.storages.contains_key(hashed_address) { - account_prefix_set.insert(Nibbles::unpack(hashed_address)); - let storage_set = storage_prefix_sets.entry(*hashed_address).or_default(); - for slot_key in precalc_storage.storage.keys() { - storage_set.insert(Nibbles::unpack(slot_key)); - } - } - } - - // Remove empty storage prefix sets - storage_prefix_sets.retain(|_, v| !v.is_empty()); - - TriePrefixSetsMut { account_prefix_set, storage_prefix_sets, destroyed_accounts } -} From c8913c42bffb6ef3f18ba69d8d2b7d8919daee11 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 22:24:53 +0800 Subject: [PATCH 11/17] chore: add log --- .../src/payload/flashblocks/payload.rs | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 5f717902..ee2fe597 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -981,7 +981,7 @@ where match self.client.state_by_block_hash(ctx.parent().hash()) { Ok(state_provider) => { if self.config.specific.disable_async_calculate_state_root { - resolve_zero_state_root(state_root_ctx, state_provider, precalc_result) + resolve_zero_state_root(state_root_ctx, state_provider, precalc_result, wait_elapsed) .unwrap_or_else(|err| { warn!( target: "payload_builder", @@ -996,6 +996,7 @@ where state_root_ctx, state_provider, precalc_result, + wait_elapsed, ); })); fallback_payload_for_resolve @@ -1389,11 +1390,12 @@ fn resolve_zero_state_root( ctx: CalculateStateRootContext, state_provider: Box, precalc_result: Option, + precalc_wait: std::time::Duration, ) -> Result { let resolve_start_time = Instant::now(); let (state_root, trie_updates, hashed_state) = - calculate_state_root_on_resolve(&ctx, state_provider, precalc_result)?; + calculate_state_root_on_resolve(&ctx, state_provider, precalc_result, precalc_wait)?; let payload_id = ctx.best_payload.0.id(); let fees = ctx.best_payload.0.fees(); @@ -1451,8 +1453,9 @@ fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, state_provider: Box, precalc_result: Option, + precalc_wait: std::time::Duration, ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { - let total_start_time = Instant::now(); + let calc_start_time = Instant::now(); // Only use precalc from the immediately prior flashblock (strict incremental) let eligible_precalc = @@ -1466,14 +1469,6 @@ fn calculate_state_root_on_resolve( let trie_updates = Arc::try_unwrap(precalc.trie_updates) .unwrap_or_else(|arc| arc.as_ref().clone()); - info!( - target: "payload_builder", - precalc_flashblock = precalc.flashblock_index, - current_flashblock = ctx.current_flashblock_index, - state_root = %precalc.state_root, - "Using worker's precomputed state root directly" - ); - (precalc.state_root, trie_updates, precalc.hashed_state, "incremental") } else { let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); @@ -1488,9 +1483,12 @@ fn calculate_state_root_on_resolve( (root, updates, hashed_state, "cold") }; - let total_time = total_start_time.elapsed(); + let calc_time = calc_start_time.elapsed(); + let total_time = precalc_wait + calc_time; info!( target: "payload_builder", + precalc_wait_ms = precalc_wait.as_millis(), + calc_ms = calc_time.as_millis(), total_ms = total_time.as_millis(), state_root = %state_root, method, From 4e9d2db285b26cdf8266a3d374fc9dc58816b5b0 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 23:16:46 +0800 Subject: [PATCH 12/17] fix: add timeout --- crates/builder/src/payload/flashblocks/payload.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index ee2fe597..6565a769 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -940,7 +940,9 @@ where let precalc_result = precalc_pipeline.and_then(|pipeline| { drop(pipeline.work_tx); let mut latest = None; - while let Ok(result) = pipeline.result_rx.recv() { + let timeout = std::time::Duration::from_secs(30); + // First recv with timeout to avoid hanging if worker is stuck + while let Ok(result) = pipeline.result_rx.recv_timeout(timeout) { let is_target = result.flashblock_index == target_index; latest = Some(result); if is_target { From 541d871d5fdc91876c8dc4ab2693fd26ea81d922 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Mon, 2 Mar 2026 23:21:02 +0800 Subject: [PATCH 13/17] chore: fmt --- crates/builder/src/payload/flashblocks/payload.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 6565a769..e7d7ed4a 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -1463,13 +1463,12 @@ fn calculate_state_root_on_resolve( let eligible_precalc = precalc_result.filter(|p| p.flashblock_index + 1 == ctx.current_flashblock_index); - let (state_root, trie_updates, hashed_state, method) = if let Some(precalc) = eligible_precalc - { + let (state_root, trie_updates, hashed_state, method) = if let Some(precalc) = eligible_precalc { // The worker already computed the correct state root for this BundleState. // Both worker and resolve share the same Arc, so the worker's // state_root is exactly what we need. No cross-provider recomputation required. - let trie_updates = Arc::try_unwrap(precalc.trie_updates) - .unwrap_or_else(|arc| arc.as_ref().clone()); + let trie_updates = + Arc::try_unwrap(precalc.trie_updates).unwrap_or_else(|arc| arc.as_ref().clone()); (precalc.state_root, trie_updates, precalc.hashed_state, "incremental") } else { @@ -1517,7 +1516,6 @@ fn run_trie_precalc_worker( let mut prev_trie_updates: Option = None; while let Ok(work_item) = work_rx.recv() { - let start_time = Instant::now(); let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); @@ -1578,4 +1576,3 @@ fn run_trie_precalc_worker( debug!(target: "payload_builder", "Trie precalc worker exiting"); } - From 9a10ba74426bcc48c2c281d34132341f45d86203 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Tue, 3 Mar 2026 10:17:20 +0800 Subject: [PATCH 14/17] fix: change log level --- .../src/payload/flashblocks/payload.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index e7d7ed4a..b80dfe20 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -497,7 +497,14 @@ where ctx.metrics.payload_num_tx_gauge.set(info.executed_transactions.len() as f64); // return early since we don't need to build a block with transactions from the pool - self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload, None, 0); + self.resolve_best_payload( + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + None, + 0, + ); return Ok(()); } @@ -587,7 +594,7 @@ where self.task_executor.spawn_blocking(Box::pin(async move { run_trie_precalc_worker(work_rx, result_tx, worker_state_provider, metrics); })); - info!( + debug!( target: "payload_builder", "Async trie precalculation pipeline started" ); @@ -735,7 +742,7 @@ where let mut target_da_for_batch = fb_state.target_da_for_batch; let mut target_da_footprint_for_batch = fb_state.target_da_footprint_for_batch; - info!( + debug!( target: "payload_builder", block_number = ctx.block_number(), flashblock_index, @@ -952,7 +959,7 @@ where latest }); let wait_elapsed = wait_start.elapsed(); - info!( + debug!( target: "payload_builder", wait_ms = wait_elapsed.as_millis(), target_index, @@ -1486,7 +1493,7 @@ fn calculate_state_root_on_resolve( let calc_time = calc_start_time.elapsed(); let total_time = precalc_wait + calc_time; - info!( + debug!( target: "payload_builder", precalc_wait_ms = precalc_wait.as_millis(), calc_ms = calc_time.as_millis(), @@ -1536,7 +1543,7 @@ fn run_trie_precalc_worker( match result { Ok((state_root, trie_output)) => { let elapsed = start_time.elapsed(); - info!( + debug!( target: "payload_builder", flashblock_index = work_item.flashblock_index, state_root = %state_root, From 5094e099230257f6dc9d4cedcd40c3d8d19ce8da Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Tue, 3 Mar 2026 11:54:21 +0800 Subject: [PATCH 15/17] refactor: improve naming and comments for clarity Co-Authored-By: Claude Opus 4.6 --- crates/builder/src/payload/flashblocks/payload.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index b80dfe20..33f5b412 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -932,7 +932,7 @@ where fallback_payload: OpBuiltPayload, resolve_payload: &BlockCell, precalc_pipeline: Option, - current_flashblock_index: u64, + flashblock_count: u64, ) { if resolve_payload.get().is_some() { return; @@ -942,7 +942,7 @@ where B256::ZERO => { // Block-wait for the worker to finish the immediately prior flashblock. // Drop work_tx so the worker finishes remaining items and exits. - let target_index = current_flashblock_index.saturating_sub(1); + let target_index = flashblock_count.saturating_sub(1); let wait_start = Instant::now(); let precalc_result = precalc_pipeline.and_then(|pipeline| { drop(pipeline.work_tx); @@ -983,7 +983,7 @@ where parent_hash: ctx.parent().hash(), built_payload_tx: self.built_payload_tx.clone(), metrics: self.metrics.clone(), - current_flashblock_index, + flashblock_count, }; // Async calculate state root @@ -1392,7 +1392,7 @@ struct CalculateStateRootContext { parent_hash: BlockHash, built_payload_tx: mpsc::Sender, metrics: Arc, - current_flashblock_index: u64, + flashblock_count: u64, } fn resolve_zero_state_root( @@ -1466,9 +1466,9 @@ fn calculate_state_root_on_resolve( ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { let calc_start_time = Instant::now(); - // Only use precalc from the immediately prior flashblock (strict incremental) + // Only use precalc if the worker computed the state root for the last built flashblock let eligible_precalc = - precalc_result.filter(|p| p.flashblock_index + 1 == ctx.current_flashblock_index); + precalc_result.filter(|p| p.flashblock_index + 1 == ctx.flashblock_count); let (state_root, trie_updates, hashed_state, method) = if let Some(precalc) = eligible_precalc { // The worker already computed the correct state root for this BundleState. From 8c4a81393f6fe876a1555dca461e1ba23ef69d40 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Tue, 3 Mar 2026 14:52:09 +0800 Subject: [PATCH 16/17] chore: clean up --- crates/builder/src/payload/flashblocks/payload.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 33f5b412..71f31a6a 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -28,7 +28,7 @@ use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, }; -use reth::{payload::PayloadBuilderAttributes, tasks::TaskSpawner}; +use reth::{payload::PayloadBuilderAttributes, providers::StateProvider, tasks::TaskSpawner}; use reth_basic_payload_builder::BuildOutcome; use reth_chainspec::EthChainSpec; use reth_evm::{execute::BlockBuilder, ConfigureEvm}; @@ -732,7 +732,7 @@ where fb_state: &mut FlashblocksState, info: &mut ExecutionInfo, state: &mut State, - state_provider: impl reth::providers::StateProvider + Clone, + state_provider: impl StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, best_payload: &mut (OpBuiltPayload, Arc), @@ -1397,7 +1397,7 @@ struct CalculateStateRootContext { fn resolve_zero_state_root( ctx: CalculateStateRootContext, - state_provider: Box, + state_provider: Box, precalc_result: Option, precalc_wait: std::time::Duration, ) -> Result { @@ -1442,7 +1442,7 @@ fn resolve_zero_state_root( } let resolve_total_time = resolve_start_time.elapsed(); - info!( + debug!( target: "payload_builder", state_root = %state_root, resolve_total_ms = resolve_total_time.as_millis(), @@ -1460,7 +1460,7 @@ fn resolve_zero_state_root( /// Otherwise falls back to a cold full calculation via the provided state_provider. fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, - state_provider: Box, + state_provider: Box, precalc_result: Option, precalc_wait: std::time::Duration, ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { @@ -1517,7 +1517,7 @@ fn calculate_state_root_on_resolve( fn run_trie_precalc_worker( work_rx: std::sync::mpsc::Receiver, result_tx: std::sync::mpsc::SyncSender, - state_provider: Box, + state_provider: Box, metrics: Arc, ) { let mut prev_trie_updates: Option = None; From debf9a92e8e07fa07305a63ed2d19852e6b05fa6 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Tue, 3 Mar 2026 15:23:03 +0800 Subject: [PATCH 17/17] chore: clean up imports and comments Co-Authored-By: Claude Opus 4.6 --- crates/builder/src/args/op.rs | 1 - crates/builder/src/payload/flashblocks/payload.rs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index 2806a8e0..a44cc56a 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -171,7 +171,6 @@ pub struct FlashblocksArgs { default_value = "256" )] pub ws_subscriber_limit: Option, - } impl Default for FlashblocksArgs { diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index 71f31a6a..f275d5bf 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -54,7 +54,7 @@ use reth_revm::{ use reth_transaction_pool::TransactionPool; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use revm::Database; -use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use std::{collections::BTreeMap, sync::mpsc::TrySendError, sync::Arc, time::Instant}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -699,14 +699,14 @@ where "Sent work item to async trie precalc pipeline" ); } - Err(std::sync::mpsc::TrySendError::Full(_)) => { + Err(TrySendError::Full(_)) => { warn!( target: "payload_builder", flashblock_index = fb_index, "Async trie precalc pipeline full, skipping" ); } - Err(std::sync::mpsc::TrySendError::Disconnected(_)) => { + Err(TrySendError::Disconnected(_)) => { warn!( target: "payload_builder", flashblock_index = fb_index, @@ -1454,7 +1454,7 @@ fn resolve_zero_state_root( /// Calculates only the state root for an existing payload. /// -/// If `precalc_result` is available and matches the immediately prior flashblock, +/// If `precalc_result` is available and is for the last built flashblock, /// directly reuses the worker's already-computed state root, trie updates, and hashed /// state. The worker operates on the same `Arc` so its results are correct. /// Otherwise falls back to a cold full calculation via the provided state_provider.