From f1198797e25f23512970a65caf66fa1c34e1f18a Mon Sep 17 00:00:00 2001 From: illuzen Date: Thu, 22 Jan 2026 17:08:19 +0800 Subject: [PATCH 1/6] new block trigger --- client/consensus/qpow/src/lib.rs | 54 ++++++--- client/consensus/qpow/src/worker.rs | 168 +++++++++++++++++++++++----- node/src/service.rs | 33 +++--- 3 files changed, 200 insertions(+), 55 deletions(-) diff --git a/client/consensus/qpow/src/lib.rs b/client/consensus/qpow/src/lib.rs index 8fc459ee..88f7d83c 100644 --- a/client/consensus/qpow/src/lib.rs +++ b/client/consensus/qpow/src/lib.rs @@ -10,9 +10,9 @@ use sp_consensus_qpow::QPoWApi; use sp_runtime::{generic::BlockId, traits::Block as BlockT, AccountId32}; use std::{sync::Arc, time::Duration}; -use crate::worker::UntilImportedOrTimeout; -pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata}; -use futures::{Future, StreamExt}; +use crate::worker::UntilImportedOrTransaction; +pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata, RebuildTrigger}; +use futures::{Future, Stream, StreamExt}; use log::*; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; @@ -296,12 +296,13 @@ where let header = &mut block.header; let block_hash = hash; let seal_item = match header.digest_mut().pop() { - Some(DigestItem::Seal(id, seal)) => + Some(DigestItem::Seal(id, seal)) => { if id == POW_ENGINE_ID { DigestItem::Seal(id, seal) } else { return Err(Error::::WrongEngine(id).into()); - }, + } + }, _ => return Err(Error::::HeaderUnsealed(block_hash).into()), }; @@ -342,6 +343,11 @@ where Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry)) } +/// Configuration for transaction-triggered block rebuilds. +/// These are hardcoded for now but could be made configurable later. +const MAX_REBUILDS_PER_SEC: u32 = 2; +const MIN_TXS_FOR_REBUILD: usize = 1; + /// Start the mining worker for QPoW. This function provides the necessary helper functions that can /// be used to implement a miner. However, it does not do the CPU-intensive mining itself. /// @@ -349,11 +355,16 @@ where /// mining metadata and submitting mined blocks, and a future, which must be polled to fill in /// information in the worker. /// -/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted -/// for blocks being built. This can encode authorship information, or just be a graffiti. +/// The worker will rebuild blocks when: +/// - A new block is imported from the network +/// - New transactions arrive (rate limited to MAX_REBUILDS_PER_SEC, requiring MIN_TXS_FOR_REBUILD txs) +/// +/// This allows transactions to be included faster since we don't wait for the next block import +/// to rebuild. Mining on a new block vs the old block has the same probability of success per nonce, +/// so the only cost is the overhead of rebuilding (which is minimal compared to mining time). #[allow(clippy::too_many_arguments)] #[allow(clippy::type_complexity)] -pub fn start_mining_worker( +pub fn start_mining_worker( block_import: BoxBlockImport, client: Arc, select_chain: S, @@ -362,7 +373,7 @@ pub fn start_mining_worker( justification_sync_link: L, rewards_address: AccountId32, create_inherent_data_providers: CIDP, - timeout: Duration, + tx_notifications: TxStream, build_time: Duration, ) -> (MiningHandle>::Proof>, impl Future) where @@ -381,16 +392,24 @@ where SO: SyncOracle + Clone + Send + Sync + 'static, L: JustificationSyncLink, CIDP: CreateInherentDataProviders, + TxHash: Send + 'static, + TxStream: Stream + Send + Unpin + 'static, { - let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); + let mut trigger_stream = UntilImportedOrTransaction::new( + client.import_notification_stream(), + tx_notifications, + MAX_REBUILDS_PER_SEC, + MIN_TXS_FOR_REBUILD, + ); let worker = MiningHandle::new(client.clone(), block_import, justification_sync_link); let worker_ret = worker.clone(); let task = async move { loop { - if timer.next().await.is_none() { - break; - } + let trigger = match trigger_stream.next().await { + Some(t) => t, + None => break, + }; if sync_oracle.is_major_syncing() { debug!(target: LOG_TARGET, "Skipping proposal due to sync."); @@ -412,7 +431,9 @@ where }; let best_hash = best_header.hash(); - if worker.best_hash() == Some(best_hash) { + // For block imports (not initial), skip if we're already on the best hash. + // For transaction triggers and initial trigger, we always want to rebuild. + if trigger == RebuildTrigger::BlockImported && worker.best_hash() == Some(best_hash) { continue; } @@ -521,8 +542,9 @@ fn fetch_seal(digest: Option<&DigestItem>, hash: B::Hash) -> Result>(parent: &BlockId) -> Result> { match parent { BlockId::Hash(hash) => Ok(*hash), - BlockId::Number(_) => - Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())), + BlockId::Number(_) => { + Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())) + }, } } diff --git a/client/consensus/qpow/src/worker.rs b/client/consensus/qpow/src/worker.rs index 7fd05f96..6127a4a7 100644 --- a/client/consensus/qpow/src/worker.rs +++ b/client/consensus/qpow/src/worker.rs @@ -35,6 +35,7 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, AccountId32, DigestItem, }; +use std::time::Instant; use std::{ pin::Pin, sync::{ @@ -198,51 +199,166 @@ where } } -/// A stream that waits for a block import or timeout. -pub struct UntilImportedOrTimeout { +/// Reason why the stream fired - either a block was imported or enough transactions arrived. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RebuildTrigger { + /// Initial trigger to bootstrap mining (fires once on first poll). + Initial, + /// A new block was imported from the network. + BlockImported, + /// Enough new transactions arrived to trigger a rebuild. + NewTransactions, +} + +/// A stream that waits for a block import or new transactions (with rate limiting). +/// +/// This enables block producers to include new transactions faster by rebuilding +/// the block being mined when transactions arrive, rather than waiting for the +/// next block import or timeout. +/// +/// Rate limiting prevents excessive rebuilds - we limit to `max_rebuilds_per_sec` +/// and require at least `min_txs_for_rebuild` transactions before triggering. +pub struct UntilImportedOrTransaction { + /// Block import notifications stream. import_notifications: ImportNotifications, - timeout: Duration, - inner_delay: Option, + /// Transaction pool import notifications stream. + tx_notifications: Pin + Send>>, + /// Minimum interval between transaction-triggered rebuilds. + min_rebuild_interval: Duration, + /// Last time we triggered a rebuild due to transactions. + last_tx_rebuild: Option, + /// Number of transactions accumulated since last rebuild. + pending_tx_count: usize, + /// Minimum number of transactions required to trigger a rebuild. + min_txs_for_rebuild: usize, + /// Rate limit delay - if set, we're waiting before we can fire again. + rate_limit_delay: Option, + /// Whether we've fired the initial trigger yet. + initial_fired: bool, } -impl UntilImportedOrTimeout { - /// Create a new stream using the given import notification and timeout duration. - pub fn new(import_notifications: ImportNotifications, timeout: Duration) -> Self { - Self { import_notifications, timeout, inner_delay: None } +impl UntilImportedOrTransaction { + /// Create a new stream. + /// + /// # Arguments + /// * `import_notifications` - Stream of block import notifications + /// * `tx_notifications` - Stream of transaction import notifications + /// * `max_rebuilds_per_sec` - Maximum transaction-triggered rebuilds per second + /// * `min_txs_for_rebuild` - Minimum transactions needed to trigger a rebuild + pub fn new( + import_notifications: ImportNotifications, + tx_notifications: impl Stream + Send + 'static, + max_rebuilds_per_sec: u32, + min_txs_for_rebuild: usize, + ) -> Self { + let min_rebuild_interval = if max_rebuilds_per_sec > 0 { + Duration::from_millis(1000 / max_rebuilds_per_sec as u64) + } else { + Duration::from_secs(u64::MAX) // Effectively disable tx-triggered rebuilds + }; + + Self { + import_notifications, + tx_notifications: Box::pin(tx_notifications), + min_rebuild_interval, + last_tx_rebuild: None, + pending_tx_count: 0, + min_txs_for_rebuild, + rate_limit_delay: None, + initial_fired: false, + } } } -impl Stream for UntilImportedOrTimeout { - type Item = (); +impl Stream for UntilImportedOrTransaction { + type Item = RebuildTrigger; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut fire = false; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // Fire immediately on first poll to bootstrap mining at genesis + if !self.initial_fired { + self.initial_fired = true; + debug!(target: LOG_TARGET, "Initial trigger, bootstrapping block production"); + return Poll::Ready(Some(RebuildTrigger::Initial)); + } + // Check for block imports first - these always trigger immediately loop { match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { Poll::Pending => break, Poll::Ready(Some(_)) => { - fire = true; + // Block import resets the transaction counter since we'll build fresh + self.pending_tx_count = 0; + self.rate_limit_delay = None; + debug!(target: LOG_TARGET, "Block imported, triggering rebuild"); + return Poll::Ready(Some(RebuildTrigger::BlockImported)); }, Poll::Ready(None) => return Poll::Ready(None), } } - let timeout = self.timeout; - let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout)); - - match Future::poll(Pin::new(inner_delay), cx) { - Poll::Pending => (), - Poll::Ready(()) => { - fire = true; - }, + // Drain all pending transaction notifications and count them + loop { + match Stream::poll_next(Pin::new(&mut self.tx_notifications), cx) { + Poll::Pending => break, + Poll::Ready(Some(_)) => { + self.pending_tx_count += 1; + }, + Poll::Ready(None) => { + // Transaction stream closed, but we can still listen for block imports + break; + }, + } } - if fire { - self.inner_delay = None; - Poll::Ready(Some(())) - } else { - Poll::Pending + // Check if we have enough transactions and rate limiting allows us to fire + if self.pending_tx_count >= self.min_txs_for_rebuild { + let now = Instant::now(); + let can_fire = match self.last_tx_rebuild { + None => true, + Some(last) => now.duration_since(last) >= self.min_rebuild_interval, + }; + + if can_fire { + self.last_tx_rebuild = Some(now); + let tx_count = self.pending_tx_count; + self.pending_tx_count = 0; + self.rate_limit_delay = None; + debug!( + target: LOG_TARGET, + "New transactions ({} txs), triggering rebuild", + tx_count + ); + return Poll::Ready(Some(RebuildTrigger::NewTransactions)); + } else { + // We have enough txs but need to wait for rate limit + // Set up a delay to wake us when we can fire + let time_since_last = now.duration_since(self.last_tx_rebuild.unwrap()); + let wait_time = self.min_rebuild_interval.saturating_sub(time_since_last); + + if self.rate_limit_delay.is_none() { + self.rate_limit_delay = Some(Delay::new(wait_time)); + } + + if let Some(ref mut delay) = self.rate_limit_delay { + match Future::poll(Pin::new(delay), cx) { + Poll::Ready(()) => { + self.last_tx_rebuild = Some(Instant::now()); + let tx_count = self.pending_tx_count; + self.pending_tx_count = 0; + self.rate_limit_delay = None; + debug!( + target: LOG_TARGET, + "Rate limit expired, triggering rebuild for {} txs", + tx_count + ); + return Poll::Ready(Some(RebuildTrigger::NewTransactions)); + }, + Poll::Pending => {}, + } + } + } } + + Poll::Pending } } diff --git a/node/src/service.rs b/node/src/service.rs index df6f15bd..27a9b662 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -166,7 +166,8 @@ pub fn new_full< other: (pow_block_import, mut telemetry), } = new_partial(&config)?; - let mut tx_stream = transaction_pool.clone().import_notification_stream(); + let tx_stream_for_worker = transaction_pool.clone().import_notification_stream(); + let tx_stream_for_logger = transaction_pool.clone().import_notification_stream(); let net_config = sc_network::config::FullNetworkConfiguration::< Block, @@ -276,7 +277,7 @@ pub fn new_full< sync_service.clone(), rewards_address, inherent_data_providers, - Duration::from_secs(10), + tx_stream_for_worker, Duration::from_secs(10), ); @@ -357,17 +358,22 @@ pub fn new_full< // If external miner URL is provided, use external mining if let Some(miner_url) = &external_miner_url { - // Cancel previous job if metadata has changed - if let Some(job_id) = ¤t_job_id { - if let Err(e) = external_miner_client::cancel_mining_job( - &http_client, - miner_url, - job_id, - ) - .await - { - log::warn!("⛏️Failed to cancel previous mining job: {}", e); - } + // Fire-and-forget cancellation of previous job - don't wait for confirmation + // This reduces latency when switching to a new block + if let Some(old_job_id) = current_job_id.take() { + let cancel_client = http_client.clone(); + let cancel_url = miner_url.clone(); + tokio::spawn(async move { + if let Err(e) = external_miner_client::cancel_mining_job( + &cancel_client, + &cancel_url, + &old_job_id, + ) + .await + { + log::debug!("⛏️ Failed to cancel previous mining job {}: {}", old_job_id, e); + } + }); } // Get current distance_threshold from runtime @@ -514,6 +520,7 @@ pub fn new_full< }); task_manager.spawn_handle().spawn("tx-logger", None, async move { + let mut tx_stream = tx_stream_for_logger; while let Some(tx_hash) = tx_stream.next().await { if let Some(tx) = transaction_pool.ready_transaction(&tx_hash) { log::trace!(target: "miner", "New transaction: Hash = {:?}", tx_hash); From 637a609f265e786deb1e60539a2b863b134b8b22 Mon Sep 17 00:00:00 2001 From: illuzen Date: Thu, 22 Jan 2026 19:18:03 +0800 Subject: [PATCH 2/6] dedupe logic, remove unnecessary field --- client/consensus/qpow/src/worker.rs | 66 ++++++----------------------- 1 file changed, 14 insertions(+), 52 deletions(-) diff --git a/client/consensus/qpow/src/worker.rs b/client/consensus/qpow/src/worker.rs index 6127a4a7..d5358828 100644 --- a/client/consensus/qpow/src/worker.rs +++ b/client/consensus/qpow/src/worker.rs @@ -225,16 +225,14 @@ pub struct UntilImportedOrTransaction { tx_notifications: Pin + Send>>, /// Minimum interval between transaction-triggered rebuilds. min_rebuild_interval: Duration, - /// Last time we triggered a rebuild due to transactions. - last_tx_rebuild: Option, - /// Number of transactions accumulated since last rebuild. - pending_tx_count: usize, - /// Minimum number of transactions required to trigger a rebuild. - min_txs_for_rebuild: usize, /// Rate limit delay - if set, we're waiting before we can fire again. rate_limit_delay: Option, /// Whether we've fired the initial trigger yet. initial_fired: bool, + /// Number of transactions accumulated since last rebuild. + pending_tx_count: usize, + /// Minimum number of transactions required to trigger a rebuild. + min_txs_for_rebuild: usize, } impl UntilImportedOrTransaction { @@ -261,11 +259,10 @@ impl UntilImportedOrTransaction { import_notifications, tx_notifications: Box::pin(tx_notifications), min_rebuild_interval, - last_tx_rebuild: None, - pending_tx_count: 0, - min_txs_for_rebuild, rate_limit_delay: None, initial_fired: false, + pending_tx_count: 0, + min_txs_for_rebuild, } } } @@ -286,7 +283,7 @@ impl Stream for UntilImportedOrTransaction match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { Poll::Pending => break, Poll::Ready(Some(_)) => { - // Block import resets the transaction counter since we'll build fresh + // Block import resets pending state since we'll build fresh self.pending_tx_count = 0; self.rate_limit_delay = None; debug!(target: LOG_TARGET, "Block imported, triggering rebuild"); @@ -303,59 +300,24 @@ impl Stream for UntilImportedOrTransaction Poll::Ready(Some(_)) => { self.pending_tx_count += 1; }, - Poll::Ready(None) => { - // Transaction stream closed, but we can still listen for block imports - break; - }, + Poll::Ready(None) => break, } } - // Check if we have enough transactions and rate limiting allows us to fire + // If we have enough pending transactions, check rate limit if self.pending_tx_count >= self.min_txs_for_rebuild { - let now = Instant::now(); - let can_fire = match self.last_tx_rebuild { + // Check if rate limit allows firing (no delay or delay expired) + let can_fire = match self.rate_limit_delay.as_mut() { None => true, - Some(last) => now.duration_since(last) >= self.min_rebuild_interval, + Some(delay) => Future::poll(Pin::new(delay), cx).is_ready(), }; if can_fire { - self.last_tx_rebuild = Some(now); let tx_count = self.pending_tx_count; self.pending_tx_count = 0; - self.rate_limit_delay = None; - debug!( - target: LOG_TARGET, - "New transactions ({} txs), triggering rebuild", - tx_count - ); + self.rate_limit_delay = Some(Delay::new(self.min_rebuild_interval)); + debug!(target: LOG_TARGET, "New transactions ({} txs), triggering rebuild", tx_count); return Poll::Ready(Some(RebuildTrigger::NewTransactions)); - } else { - // We have enough txs but need to wait for rate limit - // Set up a delay to wake us when we can fire - let time_since_last = now.duration_since(self.last_tx_rebuild.unwrap()); - let wait_time = self.min_rebuild_interval.saturating_sub(time_since_last); - - if self.rate_limit_delay.is_none() { - self.rate_limit_delay = Some(Delay::new(wait_time)); - } - - if let Some(ref mut delay) = self.rate_limit_delay { - match Future::poll(Pin::new(delay), cx) { - Poll::Ready(()) => { - self.last_tx_rebuild = Some(Instant::now()); - let tx_count = self.pending_tx_count; - self.pending_tx_count = 0; - self.rate_limit_delay = None; - debug!( - target: LOG_TARGET, - "Rate limit expired, triggering rebuild for {} txs", - tx_count - ); - return Poll::Ready(Some(RebuildTrigger::NewTransactions)); - }, - Poll::Pending => {}, - } - } } } From 9ac117e7b160d2b7786c3a4580a6e51d7fc72f06 Mon Sep 17 00:00:00 2001 From: illuzen Date: Thu, 22 Jan 2026 19:33:44 +0800 Subject: [PATCH 3/6] simplify again --- client/consensus/qpow/src/lib.rs | 18 +++++++++-------- client/consensus/qpow/src/worker.rs | 30 +++++++++++------------------ 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/client/consensus/qpow/src/lib.rs b/client/consensus/qpow/src/lib.rs index 88f7d83c..bc511bb5 100644 --- a/client/consensus/qpow/src/lib.rs +++ b/client/consensus/qpow/src/lib.rs @@ -343,10 +343,9 @@ where Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry)) } -/// Configuration for transaction-triggered block rebuilds. -/// These are hardcoded for now but could be made configurable later. +/// Maximum transaction-triggered rebuilds per second. +/// Hardcoded for now but could be made configurable later. const MAX_REBUILDS_PER_SEC: u32 = 2; -const MIN_TXS_FOR_REBUILD: usize = 1; /// Start the mining worker for QPoW. This function provides the necessary helper functions that can /// be used to implement a miner. However, it does not do the CPU-intensive mining itself. @@ -357,7 +356,7 @@ const MIN_TXS_FOR_REBUILD: usize = 1; /// /// The worker will rebuild blocks when: /// - A new block is imported from the network -/// - New transactions arrive (rate limited to MAX_REBUILDS_PER_SEC, requiring MIN_TXS_FOR_REBUILD txs) +/// - New transactions arrive (rate limited to MAX_REBUILDS_PER_SEC) /// /// This allows transactions to be included faster since we don't wait for the next block import /// to rebuild. Mining on a new block vs the old block has the same probability of success per nonce, @@ -399,16 +398,19 @@ where client.import_notification_stream(), tx_notifications, MAX_REBUILDS_PER_SEC, - MIN_TXS_FOR_REBUILD, ); let worker = MiningHandle::new(client.clone(), block_import, justification_sync_link); let worker_ret = worker.clone(); let task = async move { + // Main block building loop - runs until trigger stream closes loop { + // Wait for a trigger (Initial, BlockImported, or NewTransactions) + // break exits the loop entirely, ending this task + // continue skips to the next iteration to wait for another trigger let trigger = match trigger_stream.next().await { Some(t) => t, - None => break, + None => break, // Stream closed, shut down the worker }; if sync_oracle.is_major_syncing() { @@ -431,8 +433,8 @@ where }; let best_hash = best_header.hash(); - // For block imports (not initial), skip if we're already on the best hash. - // For transaction triggers and initial trigger, we always want to rebuild. + // Skip redundant block import triggers if we're already building on this hash. + // Initial and NewTransactions triggers should proceed to rebuild. if trigger == RebuildTrigger::BlockImported && worker.best_hash() == Some(best_hash) { continue; } diff --git a/client/consensus/qpow/src/worker.rs b/client/consensus/qpow/src/worker.rs index d5358828..4e172469 100644 --- a/client/consensus/qpow/src/worker.rs +++ b/client/consensus/qpow/src/worker.rs @@ -35,7 +35,6 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, AccountId32, DigestItem, }; -use std::time::Instant; use std::{ pin::Pin, sync::{ @@ -216,8 +215,7 @@ pub enum RebuildTrigger { /// the block being mined when transactions arrive, rather than waiting for the /// next block import or timeout. /// -/// Rate limiting prevents excessive rebuilds - we limit to `max_rebuilds_per_sec` -/// and require at least `min_txs_for_rebuild` transactions before triggering. +/// Rate limiting prevents excessive rebuilds - we limit to `max_rebuilds_per_sec`. pub struct UntilImportedOrTransaction { /// Block import notifications stream. import_notifications: ImportNotifications, @@ -229,10 +227,8 @@ pub struct UntilImportedOrTransaction { rate_limit_delay: Option, /// Whether we've fired the initial trigger yet. initial_fired: bool, - /// Number of transactions accumulated since last rebuild. - pending_tx_count: usize, - /// Minimum number of transactions required to trigger a rebuild. - min_txs_for_rebuild: usize, + /// Whether we have pending transactions waiting to trigger a rebuild. + has_pending_tx: bool, } impl UntilImportedOrTransaction { @@ -242,12 +238,10 @@ impl UntilImportedOrTransaction { /// * `import_notifications` - Stream of block import notifications /// * `tx_notifications` - Stream of transaction import notifications /// * `max_rebuilds_per_sec` - Maximum transaction-triggered rebuilds per second - /// * `min_txs_for_rebuild` - Minimum transactions needed to trigger a rebuild pub fn new( import_notifications: ImportNotifications, tx_notifications: impl Stream + Send + 'static, max_rebuilds_per_sec: u32, - min_txs_for_rebuild: usize, ) -> Self { let min_rebuild_interval = if max_rebuilds_per_sec > 0 { Duration::from_millis(1000 / max_rebuilds_per_sec as u64) @@ -261,8 +255,7 @@ impl UntilImportedOrTransaction { min_rebuild_interval, rate_limit_delay: None, initial_fired: false, - pending_tx_count: 0, - min_txs_for_rebuild, + has_pending_tx: false, } } } @@ -284,7 +277,7 @@ impl Stream for UntilImportedOrTransaction Poll::Pending => break, Poll::Ready(Some(_)) => { // Block import resets pending state since we'll build fresh - self.pending_tx_count = 0; + self.has_pending_tx = false; self.rate_limit_delay = None; debug!(target: LOG_TARGET, "Block imported, triggering rebuild"); return Poll::Ready(Some(RebuildTrigger::BlockImported)); @@ -293,19 +286,19 @@ impl Stream for UntilImportedOrTransaction } } - // Drain all pending transaction notifications and count them + // Drain all pending transaction notifications loop { match Stream::poll_next(Pin::new(&mut self.tx_notifications), cx) { Poll::Pending => break, Poll::Ready(Some(_)) => { - self.pending_tx_count += 1; + self.has_pending_tx = true; }, Poll::Ready(None) => break, } } - // If we have enough pending transactions, check rate limit - if self.pending_tx_count >= self.min_txs_for_rebuild { + // If we have pending transactions, check rate limit + if self.has_pending_tx { // Check if rate limit allows firing (no delay or delay expired) let can_fire = match self.rate_limit_delay.as_mut() { None => true, @@ -313,10 +306,9 @@ impl Stream for UntilImportedOrTransaction }; if can_fire { - let tx_count = self.pending_tx_count; - self.pending_tx_count = 0; + self.has_pending_tx = false; self.rate_limit_delay = Some(Delay::new(self.min_rebuild_interval)); - debug!(target: LOG_TARGET, "New transactions ({} txs), triggering rebuild", tx_count); + debug!(target: LOG_TARGET, "New transaction(s), triggering rebuild"); return Poll::Ready(Some(RebuildTrigger::NewTransactions)); } } From 9f30e7989263798a7caa3913068db3401b0df866 Mon Sep 17 00:00:00 2001 From: illuzen Date: Thu, 22 Jan 2026 19:34:32 +0800 Subject: [PATCH 4/6] fmt --- client/consensus/qpow/src/lib.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/client/consensus/qpow/src/lib.rs b/client/consensus/qpow/src/lib.rs index bc511bb5..ccbe7014 100644 --- a/client/consensus/qpow/src/lib.rs +++ b/client/consensus/qpow/src/lib.rs @@ -296,13 +296,12 @@ where let header = &mut block.header; let block_hash = hash; let seal_item = match header.digest_mut().pop() { - Some(DigestItem::Seal(id, seal)) => { + Some(DigestItem::Seal(id, seal)) => if id == POW_ENGINE_ID { DigestItem::Seal(id, seal) } else { return Err(Error::::WrongEngine(id).into()); - } - }, + }, _ => return Err(Error::::HeaderUnsealed(block_hash).into()), }; @@ -359,8 +358,9 @@ const MAX_REBUILDS_PER_SEC: u32 = 2; /// - New transactions arrive (rate limited to MAX_REBUILDS_PER_SEC) /// /// This allows transactions to be included faster since we don't wait for the next block import -/// to rebuild. Mining on a new block vs the old block has the same probability of success per nonce, -/// so the only cost is the overhead of rebuilding (which is minimal compared to mining time). +/// to rebuild. Mining on a new block vs the old block has the same probability of success per +/// nonce, so the only cost is the overhead of rebuilding (which is minimal compared to mining +/// time). #[allow(clippy::too_many_arguments)] #[allow(clippy::type_complexity)] pub fn start_mining_worker( @@ -544,9 +544,8 @@ fn fetch_seal(digest: Option<&DigestItem>, hash: B::Hash) -> Result>(parent: &BlockId) -> Result> { match parent { BlockId::Hash(hash) => Ok(*hash), - BlockId::Number(_) => { - Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())) - }, + BlockId::Number(_) => + Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())), } } From c290f287ed7b77514463b4367ff2328e4dc7aa9c Mon Sep 17 00:00:00 2001 From: illuzen Date: Thu, 22 Jan 2026 22:34:16 +0800 Subject: [PATCH 5/6] clippy --- client/consensus/qpow/src/lib.rs | 22 +++++++++------------- client/consensus/qpow/src/worker.rs | 22 +++++++++------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/client/consensus/qpow/src/lib.rs b/client/consensus/qpow/src/lib.rs index ccbe7014..f556f052 100644 --- a/client/consensus/qpow/src/lib.rs +++ b/client/consensus/qpow/src/lib.rs @@ -296,12 +296,13 @@ where let header = &mut block.header; let block_hash = hash; let seal_item = match header.digest_mut().pop() { - Some(DigestItem::Seal(id, seal)) => + Some(DigestItem::Seal(id, seal)) => { if id == POW_ENGINE_ID { DigestItem::Seal(id, seal) } else { return Err(Error::::WrongEngine(id).into()); - }, + } + }, _ => return Err(Error::::HeaderUnsealed(block_hash).into()), }; @@ -404,15 +405,9 @@ where let task = async move { // Main block building loop - runs until trigger stream closes - loop { - // Wait for a trigger (Initial, BlockImported, or NewTransactions) - // break exits the loop entirely, ending this task - // continue skips to the next iteration to wait for another trigger - let trigger = match trigger_stream.next().await { - Some(t) => t, - None => break, // Stream closed, shut down the worker - }; - + // Wait for a trigger (Initial, BlockImported, or NewTransactions) + // continue skips to the next iteration to wait for another trigger + while let Some(trigger) = trigger_stream.next().await { if sync_oracle.is_major_syncing() { debug!(target: LOG_TARGET, "Skipping proposal due to sync."); worker.on_major_syncing(); @@ -544,8 +539,9 @@ fn fetch_seal(digest: Option<&DigestItem>, hash: B::Hash) -> Result>(parent: &BlockId) -> Result> { match parent { BlockId::Hash(hash) => Ok(*hash), - BlockId::Number(_) => - Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())), + BlockId::Number(_) => { + Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())) + }, } } diff --git a/client/consensus/qpow/src/worker.rs b/client/consensus/qpow/src/worker.rs index 4e172469..e819b8c4 100644 --- a/client/consensus/qpow/src/worker.rs +++ b/client/consensus/qpow/src/worker.rs @@ -272,29 +272,25 @@ impl Stream for UntilImportedOrTransaction } // Check for block imports first - these always trigger immediately - loop { - match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { - Poll::Pending => break, - Poll::Ready(Some(_)) => { + if let Poll::Ready(notification) = + Stream::poll_next(Pin::new(&mut self.import_notifications), cx) + { + match notification { + Some(_) => { // Block import resets pending state since we'll build fresh self.has_pending_tx = false; self.rate_limit_delay = None; debug!(target: LOG_TARGET, "Block imported, triggering rebuild"); return Poll::Ready(Some(RebuildTrigger::BlockImported)); }, - Poll::Ready(None) => return Poll::Ready(None), + None => return Poll::Ready(None), } } // Drain all pending transaction notifications - loop { - match Stream::poll_next(Pin::new(&mut self.tx_notifications), cx) { - Poll::Pending => break, - Poll::Ready(Some(_)) => { - self.has_pending_tx = true; - }, - Poll::Ready(None) => break, - } + while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.tx_notifications), cx) + { + self.has_pending_tx = true; } // If we have pending transactions, check rate limit From 2bdfd9db63ea04cd342931862d70af2a440bd86b Mon Sep 17 00:00:00 2001 From: illuzen Date: Thu, 22 Jan 2026 22:42:04 +0800 Subject: [PATCH 6/6] fmt --- client/consensus/qpow/src/lib.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/client/consensus/qpow/src/lib.rs b/client/consensus/qpow/src/lib.rs index f556f052..4a8e36ac 100644 --- a/client/consensus/qpow/src/lib.rs +++ b/client/consensus/qpow/src/lib.rs @@ -296,13 +296,12 @@ where let header = &mut block.header; let block_hash = hash; let seal_item = match header.digest_mut().pop() { - Some(DigestItem::Seal(id, seal)) => { + Some(DigestItem::Seal(id, seal)) => if id == POW_ENGINE_ID { DigestItem::Seal(id, seal) } else { return Err(Error::::WrongEngine(id).into()); - } - }, + }, _ => return Err(Error::::HeaderUnsealed(block_hash).into()), }; @@ -539,9 +538,8 @@ fn fetch_seal(digest: Option<&DigestItem>, hash: B::Hash) -> Result>(parent: &BlockId) -> Result> { match parent { BlockId::Hash(hash) => Ok(*hash), - BlockId::Number(_) => { - Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())) - }, + BlockId::Number(_) => + Err(Error::Runtime("Expected BlockId::Hash, but got BlockId::Number".into())), } }