diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index fe0b61c8..a44cc56a 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -109,6 +109,26 @@ pub struct FlashblocksArgs { )] pub flashblocks_disable_async_calculate_state_root: bool, + /// Enable async trie precalculation during flashblock building. + /// When enabled and disable_state_root is true, background trie calculations + /// are spawned after each flashblock to speed up final state root resolution. + #[arg( + long = "flashblocks.enable-async-trie-precalc", + default_value = "false", + env = "FLASHBLOCKS_ENABLE_ASYNC_TRIE_PRECALC" + )] + pub flashblocks_enable_async_trie_precalc: bool, + + /// Which flashblock index to start async trie precalculation from (0-indexed). + /// For example, with 5 flashblocks and start=2, precalculation begins after + /// flashblock 2 (skipping 0 and 1). + #[arg( + long = "flashblocks.async-trie-precalc-start-flashblock", + default_value = "1", + env = "FLASHBLOCKS_ASYNC_TRIE_PRECALC_START_FLASHBLOCK" + )] + pub flashblocks_async_trie_precalc_start_flashblock: u64, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/builder/src/metrics/builder.rs b/crates/builder/src/metrics/builder.rs index 8332d60b..86fa09da 100644 --- a/crates/builder/src/metrics/builder.rs +++ b/crates/builder/src/metrics/builder.rs @@ -40,6 +40,8 @@ pub struct BuilderMetrics { pub state_root_calculation_duration: Histogram, /// Latest state root calculation duration pub state_root_calculation_gauge: Gauge, + /// Histogram of async trie precalculation duration (background worker) + pub trie_precalc_duration: Histogram, /// Histogram of sequencer transaction execution duration pub sequencer_tx_duration: Histogram, /// Latest sequencer transaction execution duration diff --git a/crates/builder/src/payload/flashblocks/config.rs b/crates/builder/src/payload/flashblocks/config.rs index 932cb245..fdcb095c 100644 --- a/crates/builder/src/payload/flashblocks/config.rs +++ b/crates/builder/src/payload/flashblocks/config.rs @@ -60,6 +60,14 @@ pub struct FlashblocksConfig { /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, + + /// Enable async trie precalculation during flashblock building. + /// When enabled and disable_state_root is true, background trie calculations + /// are spawned after each flashblock to speed up final state root resolution. + pub enable_async_trie_precalc: bool, + + /// Which flashblock index to start async trie precalculation from (0-indexed). + pub async_trie_precalc_start_flashblock: u64, } impl Default for FlashblocksConfig { @@ -80,6 +88,8 @@ impl Default for FlashblocksConfig { p2p_send_full_payload: false, p2p_process_full_payload: false, ws_subscriber_limit: None, + enable_async_trie_precalc: false, + async_trie_precalc_start_flashblock: 1, } } } @@ -118,6 +128,10 @@ impl TryFrom for FlashblocksConfig { p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, + enable_async_trie_precalc: args.flashblocks.flashblocks_enable_async_trie_precalc, + async_trie_precalc_start_flashblock: args + .flashblocks + .flashblocks_async_trie_precalc_start_flashblock, }) } } diff --git a/crates/builder/src/payload/flashblocks/payload.rs b/crates/builder/src/payload/flashblocks/payload.rs index b41185e2..f275d5bf 100644 --- a/crates/builder/src/payload/flashblocks/payload.rs +++ b/crates/builder/src/payload/flashblocks/payload.rs @@ -28,7 +28,7 @@ use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, }; -use reth::{payload::PayloadBuilderAttributes, tasks::TaskSpawner}; +use reth::{payload::PayloadBuilderAttributes, providers::StateProvider, tasks::TaskSpawner}; use reth_basic_payload_builder::BuildOutcome; use reth_chainspec::EthChainSpec; use reth_evm::{execute::BlockBuilder, ConfigureEvm}; @@ -54,7 +54,7 @@ use reth_revm::{ use reth_transaction_pool::TransactionPool; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use revm::Database; -use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use std::{collections::BTreeMap, sync::mpsc::TrySendError, sync::Arc, time::Instant}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; @@ -185,6 +185,39 @@ impl FlashblocksState { } } +/// Result of an async trie precalculation for a single flashblock. +#[derive(Debug)] +struct TriePrecalcResult { + /// The flashblock index this result corresponds to. + flashblock_index: u64, + /// The computed state root for this flashblock's cumulative state. + state_root: B256, + /// The computed trie updates. + trie_updates: Arc, + /// The hashed post state at the time of precalculation. + hashed_state: HashedPostState, +} + +/// Work item sent from the main flashblock loop to the background trie worker. +#[derive(Debug)] +struct TriePrecalcWorkItem { + flashblock_index: u64, + bundle_state: Arc, +} + +/// Manages the async trie precalculation pipeline. +/// +/// A background worker computes incremental trie updates sequentially. +/// Each computation uses the previous one's `TrieUpdates` to maintain +/// an incremental chain. Results are collected here and used during +/// final state root resolution. +struct AsyncTriePrecalcPipeline { + /// Receiver for completed precalculation results from the background worker. + result_rx: std::sync::mpsc::Receiver, + /// Sender for providing BundleState snapshots to the background worker. + work_tx: std::sync::mpsc::SyncSender, +} + /// Optimism's payload builder #[derive(Debug, Clone)] pub(super) struct OpPayloadBuilder { @@ -428,7 +461,7 @@ where .try_send(fb_payload.clone()) .map_err(PayloadBuilderError::other)?; } - let mut best_payload = (fallback_payload.clone(), bundle_state); + let mut best_payload = (fallback_payload.clone(), Arc::new(bundle_state)); info!( target: "payload_builder", @@ -464,7 +497,14 @@ where ctx.metrics.payload_num_tx_gauge.set(info.executed_transactions.len() as f64); // return early since we don't need to build a block with transactions from the pool - self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload); + self.resolve_best_payload( + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + None, + 0, + ); return Ok(()); } @@ -540,6 +580,39 @@ where fb_payload.payload_id, ))); + // Initialize async trie precalculation pipeline if enabled + let mut precalc_pipeline: Option = if disable_state_root + && self.config.specific.enable_async_trie_precalc + { + match self.client.state_by_block_hash(ctx.parent().hash()) { + Ok(worker_state_provider) => { + let (work_tx, work_rx) = + std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); + let (result_tx, result_rx) = + std::sync::mpsc::sync_channel((expected_flashblocks + 1) as usize); + let metrics = self.metrics.clone(); + self.task_executor.spawn_blocking(Box::pin(async move { + run_trie_precalc_worker(work_rx, result_tx, worker_state_provider, metrics); + })); + debug!( + target: "payload_builder", + "Async trie precalculation pipeline started" + ); + Some(AsyncTriePrecalcPipeline { result_rx, work_tx }) + } + Err(err) => { + warn!( + target: "payload_builder", + error = %err, + "Failed to create state provider for async trie precalc, disabling" + ); + None + } + } + } else { + None + }; + // Process flashblocks - block on async channel receive loop { // Wait for signal before building flashblock. @@ -554,7 +627,14 @@ where ctx = ctx.with_cancel(new_fb_cancel); } else { // Channel closed - block building cancelled - self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload); + self.resolve_best_payload( + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + precalc_pipeline.take(), + fb_state.flashblock_index(), + ); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks); return Ok(()); } @@ -577,6 +657,8 @@ where best_payload, fallback_payload, &resolve_payload, + precalc_pipeline.take(), + fb_state.flashblock_index(), ); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks); return Ok(()); @@ -595,11 +677,47 @@ where best_payload, fallback_payload, &resolve_payload, + precalc_pipeline.take(), + fb_state.flashblock_index(), ); return Err(PayloadBuilderError::Other(err.into())); } }; + // Feed work item to async trie precalc pipeline + if let Some(pipeline) = &precalc_pipeline { + let fb_index = fb_state.flashblock_index(); + if fb_index >= self.config.specific.async_trie_precalc_start_flashblock { + match pipeline.work_tx.try_send(TriePrecalcWorkItem { + flashblock_index: fb_index, + bundle_state: best_payload.1.clone(), + }) { + Ok(()) => { + debug!( + target: "payload_builder", + flashblock_index = fb_index, + "Sent work item to async trie precalc pipeline" + ); + } + Err(TrySendError::Full(_)) => { + warn!( + target: "payload_builder", + flashblock_index = fb_index, + "Async trie precalc pipeline full, skipping" + ); + } + Err(TrySendError::Disconnected(_)) => { + warn!( + target: "payload_builder", + flashblock_index = fb_index, + "Async trie precalc worker disconnected" + ); + precalc_pipeline = None; + } + } + } + } + fb_state = next_fb_state; } } @@ -614,17 +732,17 @@ where fb_state: &mut FlashblocksState, info: &mut ExecutionInfo, state: &mut State, - state_provider: impl reth::providers::StateProvider + Clone, + state_provider: impl StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, - best_payload: &mut (OpBuiltPayload, BundleState), + best_payload: &mut (OpBuiltPayload, Arc), ) -> eyre::Result> { let flashblock_index = fb_state.flashblock_index(); let mut target_gas_for_batch = fb_state.target_gas_for_batch; let mut target_da_for_batch = fb_state.target_da_for_batch; let mut target_da_footprint_for_batch = fb_state.target_da_footprint_for_batch; - info!( + debug!( target: "payload_builder", block_number = ctx.block_number(), flashblock_index, @@ -751,7 +869,7 @@ where self.built_fb_payload_tx .try_send(fb_payload) .wrap_err("failed to send built payload to handler")?; - *best_payload = (new_payload, bundle_state); + *best_payload = (new_payload, Arc::new(bundle_state)); // Record flashblock build duration ctx.metrics.flashblock_build_duration.record(flashblock_build_start_time.elapsed()); @@ -810,9 +928,11 @@ where fn resolve_best_payload( &self, ctx: &OpPayloadBuilderCtx, - best_payload: (OpBuiltPayload, BundleState), + best_payload: (OpBuiltPayload, Arc), fallback_payload: OpBuiltPayload, resolve_payload: &BlockCell, + precalc_pipeline: Option, + flashblock_count: u64, ) { if resolve_payload.get().is_some() { return; @@ -820,6 +940,34 @@ where let payload = match best_payload.0.block().header().state_root { B256::ZERO => { + // Block-wait for the worker to finish the immediately prior flashblock. + // Drop work_tx so the worker finishes remaining items and exits. + let target_index = flashblock_count.saturating_sub(1); + let wait_start = Instant::now(); + let precalc_result = precalc_pipeline.and_then(|pipeline| { + drop(pipeline.work_tx); + let mut latest = None; + let timeout = std::time::Duration::from_secs(30); + // First recv with timeout to avoid hanging if worker is stuck + while let Ok(result) = pipeline.result_rx.recv_timeout(timeout) { + let is_target = result.flashblock_index == target_index; + latest = Some(result); + if is_target { + break; + } + } + latest + }); + let wait_elapsed = wait_start.elapsed(); + debug!( + target: "payload_builder", + wait_ms = wait_elapsed.as_millis(), + target_index, + got_result = precalc_result.is_some(), + got_flashblock_index = precalc_result.as_ref().map(|r| r.flashblock_index), + "resolve_best_payload: precalc wait completed" + ); + // Get the fallback payload for payload resolution let fallback_payload_for_resolve = if self.config.specific.disable_async_calculate_state_root { @@ -835,13 +983,14 @@ where parent_hash: ctx.parent().hash(), built_payload_tx: self.built_payload_tx.clone(), metrics: self.metrics.clone(), + flashblock_count, }; // Async calculate state root match self.client.state_by_block_hash(ctx.parent().hash()) { Ok(state_provider) => { if self.config.specific.disable_async_calculate_state_root { - resolve_zero_state_root(state_root_ctx, state_provider) + resolve_zero_state_root(state_root_ctx, state_provider, precalc_result, wait_elapsed) .unwrap_or_else(|err| { warn!( target: "payload_builder", @@ -852,7 +1001,12 @@ where }) } else { self.task_executor.spawn_blocking(Box::pin(async move { - let _ = resolve_zero_state_root(state_root_ctx, state_provider); + let _ = resolve_zero_state_root( + state_root_ctx, + state_provider, + precalc_result, + wait_elapsed, + ); })); fallback_payload_for_resolve } @@ -982,9 +1136,9 @@ where // calculate the state root let state_root_start_time = Instant::now(); let mut state_root = B256::ZERO; - let mut trie_output = TrieUpdates::default(); + let mut trie_output_arc = Arc::new(TrieUpdates::default()); let mut hashed_state = HashedPostState::default(); - let mut trie_updates_to_cache: Option = None; + let mut trie_updates_to_cache: Option> = None; if calculate_state_root { let state_provider = state.database.as_ref(); @@ -995,7 +1149,7 @@ where hashed_state = state_provider.hashed_post_state(&state.bundle_state); - (state_root, trie_output) = if let Some(prev_trie) = prev_trie { + if let Some(prev_trie) = prev_trie { // Incremental path: Use cached trie from previous flashblock debug!( target: "payload_builder", @@ -1009,9 +1163,11 @@ where hashed_state.construct_prefix_sets(), ); - state_provider + let trie_output; + (state_root, trie_output) = state_provider .state_root_from_nodes_with_updates(trie_input) - .map_err(PayloadBuilderError::other)? + .map_err(PayloadBuilderError::other)?; + trie_output_arc = Arc::new(trie_output); } else { debug!( target: "payload_builder", @@ -1019,20 +1175,23 @@ where "Using full state root calculation" ); - state.database.as_ref().state_root_with_updates(hashed_state.clone()).inspect_err( - |err| { - warn!( - target: "payload_builder", - parent_header=%ctx.parent().hash(), - %err, - "failed to calculate state root for payload" - ); - }, - )? + let trie_output; + (state_root, trie_output) = + state.database.as_ref().state_root_with_updates(hashed_state.clone()).inspect_err( + |err| { + warn!( + target: "payload_builder", + parent_header=%ctx.parent().hash(), + %err, + "failed to calculate state root for payload" + ); + }, + )?; + trie_output_arc = Arc::new(trie_output); }; // Cache trie updates to apply in fb_state later (avoids mut on fb_state parameter). - trie_updates_to_cache = Some(trie_output.clone()); + trie_updates_to_cache = Some(trie_output_arc.clone()); let state_root_calculation_time = state_root_start_time.elapsed(); ctx.metrics.state_root_calculation_duration.record(state_root_calculation_time); @@ -1131,7 +1290,7 @@ where let executed = BuiltPayloadExecutedBlock { recovered_block: Arc::new(recovered_block), execution_output: Arc::new(execution_output), - trie_updates: either::Either::Left(Arc::new(trie_output)), + trie_updates: either::Either::Left(trie_output_arc), hashed_state: either::Either::Left(Arc::new(hashed_state)), }; debug!( @@ -1163,7 +1322,7 @@ where let new_receipts = info.receipts[last_idx..].to_vec(); if let Some(fb) = fb_state { if let Some(updates) = trie_updates_to_cache.take() { - fb.prev_trie_updates = Some(Arc::new(updates)); + fb.prev_trie_updates = Some(updates); } fb.set_last_flashblock_tx_index(info.executed_transactions.len()); } @@ -1229,18 +1388,23 @@ where } struct CalculateStateRootContext { - best_payload: (OpBuiltPayload, BundleState), + best_payload: (OpBuiltPayload, Arc), parent_hash: BlockHash, built_payload_tx: mpsc::Sender, metrics: Arc, + flashblock_count: u64, } fn resolve_zero_state_root( ctx: CalculateStateRootContext, - state_provider: Box, + state_provider: Box, + precalc_result: Option, + precalc_wait: std::time::Duration, ) -> Result { + let resolve_start_time = Instant::now(); + let (state_root, trie_updates, hashed_state) = - calculate_state_root_on_resolve(&ctx, state_provider)?; + calculate_state_root_on_resolve(&ctx, state_provider, precalc_result, precalc_wait)?; let payload_id = ctx.best_payload.0.id(); let fees = ctx.best_payload.0.fees(); @@ -1276,34 +1440,146 @@ fn resolve_zero_state_root( "Failed to send updated payload" ); } + + let resolve_total_time = resolve_start_time.elapsed(); debug!( target: "payload_builder", state_root = %state_root, - "Updated payload with calculated state root" + resolve_total_ms = resolve_total_time.as_millis(), + "resolve_zero_state_root completed" ); Ok(updated_payload) } -/// Calculates only the state root for an existing payload +/// Calculates only the state root for an existing payload. +/// +/// If `precalc_result` is available and is for the last built flashblock, +/// directly reuses the worker's already-computed state root, trie updates, and hashed +/// state. The worker operates on the same `Arc` so its results are correct. +/// Otherwise falls back to a cold full calculation via the provided state_provider. fn calculate_state_root_on_resolve( ctx: &CalculateStateRootContext, - state_provider: Box, + state_provider: Box, + precalc_result: Option, + precalc_wait: std::time::Duration, ) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { - let state_root_start_time = Instant::now(); - let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); - let state_root_updates = - state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { - warn!(target: "payload_builder", - parent_header=%ctx.parent_hash, - %err, - "failed to calculate state root for payload" + let calc_start_time = Instant::now(); + + // Only use precalc if the worker computed the state root for the last built flashblock + let eligible_precalc = + precalc_result.filter(|p| p.flashblock_index + 1 == ctx.flashblock_count); + + let (state_root, trie_updates, hashed_state, method) = if let Some(precalc) = eligible_precalc { + // The worker already computed the correct state root for this BundleState. + // Both worker and resolve share the same Arc, so the worker's + // state_root is exactly what we need. No cross-provider recomputation required. + let trie_updates = + Arc::try_unwrap(precalc.trie_updates).unwrap_or_else(|arc| arc.as_ref().clone()); + + (precalc.state_root, trie_updates, precalc.hashed_state, "incremental") + } else { + let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); + let (root, updates) = + state_provider.state_root_with_updates(hashed_state.clone()).inspect_err(|err| { + warn!(target: "payload_builder", + parent_header=%ctx.parent_hash, + %err, + "failed to calculate state root for payload" + ); + })?; + (root, updates, hashed_state, "cold") + }; + + let calc_time = calc_start_time.elapsed(); + let total_time = precalc_wait + calc_time; + debug!( + target: "payload_builder", + precalc_wait_ms = precalc_wait.as_millis(), + calc_ms = calc_time.as_millis(), + total_ms = total_time.as_millis(), + state_root = %state_root, + method, + "calculate_state_root_on_resolve timing" + ); + + ctx.metrics.state_root_calculation_duration.record(total_time); + ctx.metrics.state_root_calculation_gauge.set(total_time); + + Ok((state_root, trie_updates, hashed_state)) +} + +/// Runs the async trie precalculation worker in a blocking context. +/// +/// Processes work items sequentially, maintaining an incremental trie update chain. +/// The first item does a full `state_root_with_updates`, subsequent items use +/// `state_root_from_nodes_with_updates` with the previous result's cached trie nodes. +fn run_trie_precalc_worker( + work_rx: std::sync::mpsc::Receiver, + result_tx: std::sync::mpsc::SyncSender, + state_provider: Box, + metrics: Arc, +) { + let mut prev_trie_updates: Option = None; + + while let Ok(work_item) = work_rx.recv() { + let start_time = Instant::now(); + + let hashed_state = state_provider.hashed_post_state(&work_item.bundle_state); + + let result = if let Some(prev_trie) = prev_trie_updates.take() { + // Incremental path: reuse cached trie nodes from previous flashblock + let trie_input = TrieInput::new( + prev_trie, + hashed_state.clone(), + hashed_state.construct_prefix_sets(), ); - })?; + state_provider.state_root_from_nodes_with_updates(trie_input) + } else { + // First calculation: full trie computation + state_provider.state_root_with_updates(hashed_state.clone()) + }; - let state_root_calculation_time = state_root_start_time.elapsed(); - ctx.metrics.state_root_calculation_duration.record(state_root_calculation_time); - ctx.metrics.state_root_calculation_gauge.set(state_root_calculation_time); + match result { + Ok((state_root, trie_output)) => { + let elapsed = start_time.elapsed(); + debug!( + target: "payload_builder", + flashblock_index = work_item.flashblock_index, + state_root = %state_root, + duration_ms = elapsed.as_millis(), + "Async trie precalculation completed" + ); + metrics.trie_precalc_duration.record(elapsed); + + // Clone for our incremental chain, wrap in Arc for cross-thread transfer + prev_trie_updates = Some(trie_output.clone()); + + if result_tx + .send(TriePrecalcResult { + flashblock_index: work_item.flashblock_index, + state_root, + trie_updates: Arc::new(trie_output), + hashed_state, + }) + .is_err() + { + // Main loop dropped the receiver — stop worker + break; + } + } + Err(err) => { + warn!( + target: "payload_builder", + flashblock_index = work_item.flashblock_index, + error = %err, + "Async trie precalculation failed, resetting chain" + ); + // Reset chain: next item will do a full calculation + prev_trie_updates = None; + } + } + } - Ok((state_root_updates.0, state_root_updates.1, hashed_state)) + debug!(target: "payload_builder", "Trie precalc worker exiting"); } diff --git a/deps/optimism b/deps/optimism new file mode 160000 index 00000000..e9c20bc0 --- /dev/null +++ b/deps/optimism @@ -0,0 +1 @@ +Subproject commit e9c20bc0c77592945607fb85c8b3b3fde68d33a5