diff --git a/client/consensus/qpow/src/lib.rs b/client/consensus/qpow/src/lib.rs index 8fc459ee..4a8e36ac 100644 --- a/client/consensus/qpow/src/lib.rs +++ b/client/consensus/qpow/src/lib.rs @@ -10,9 +10,9 @@ use sp_consensus_qpow::QPoWApi; use sp_runtime::{generic::BlockId, traits::Block as BlockT, AccountId32}; use std::{sync::Arc, time::Duration}; -use crate::worker::UntilImportedOrTimeout; -pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata}; -use futures::{Future, StreamExt}; +use crate::worker::UntilImportedOrTransaction; +pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata, RebuildTrigger}; +use futures::{Future, Stream, StreamExt}; use log::*; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; @@ -342,6 +342,10 @@ where Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry)) } +/// Maximum transaction-triggered rebuilds per second. +/// Hardcoded for now but could be made configurable later. +const MAX_REBUILDS_PER_SEC: u32 = 2; + /// Start the mining worker for QPoW. This function provides the necessary helper functions that can /// be used to implement a miner. However, it does not do the CPU-intensive mining itself. /// @@ -349,11 +353,17 @@ where /// mining metadata and submitting mined blocks, and a future, which must be polled to fill in /// information in the worker. /// -/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted -/// for blocks being built. This can encode authorship information, or just be a graffiti. +/// The worker will rebuild blocks when: +/// - A new block is imported from the network +/// - New transactions arrive (rate limited to MAX_REBUILDS_PER_SEC) +/// +/// This allows transactions to be included faster since we don't wait for the next block import +/// to rebuild. Mining on a new block vs the old block has the same probability of success per +/// nonce, so the only cost is the overhead of rebuilding (which is minimal compared to mining +/// time). #[allow(clippy::too_many_arguments)] #[allow(clippy::type_complexity)] -pub fn start_mining_worker( +pub fn start_mining_worker( block_import: BoxBlockImport, client: Arc, select_chain: S, @@ -362,7 +372,7 @@ pub fn start_mining_worker( justification_sync_link: L, rewards_address: AccountId32, create_inherent_data_providers: CIDP, - timeout: Duration, + tx_notifications: TxStream, build_time: Duration, ) -> (MiningHandle>::Proof>, impl Future) where @@ -381,17 +391,22 @@ where SO: SyncOracle + Clone + Send + Sync + 'static, L: JustificationSyncLink, CIDP: CreateInherentDataProviders, + TxHash: Send + 'static, + TxStream: Stream + Send + Unpin + 'static, { - let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); + let mut trigger_stream = UntilImportedOrTransaction::new( + client.import_notification_stream(), + tx_notifications, + MAX_REBUILDS_PER_SEC, + ); let worker = MiningHandle::new(client.clone(), block_import, justification_sync_link); let worker_ret = worker.clone(); let task = async move { - loop { - if timer.next().await.is_none() { - break; - } - + // Main block building loop - runs until trigger stream closes + // Wait for a trigger (Initial, BlockImported, or NewTransactions) + // continue skips to the next iteration to wait for another trigger + while let Some(trigger) = trigger_stream.next().await { if sync_oracle.is_major_syncing() { debug!(target: LOG_TARGET, "Skipping proposal due to sync."); worker.on_major_syncing(); @@ -412,7 +427,9 @@ where }; let best_hash = best_header.hash(); - if worker.best_hash() == Some(best_hash) { + // Skip redundant block import triggers if we're already building on this hash. + // Initial and NewTransactions triggers should proceed to rebuild. + if trigger == RebuildTrigger::BlockImported && worker.best_hash() == Some(best_hash) { continue; } diff --git a/client/consensus/qpow/src/worker.rs b/client/consensus/qpow/src/worker.rs index 7fd05f96..e819b8c4 100644 --- a/client/consensus/qpow/src/worker.rs +++ b/client/consensus/qpow/src/worker.rs @@ -198,51 +198,117 @@ where } } -/// A stream that waits for a block import or timeout. -pub struct UntilImportedOrTimeout { +/// Reason why the stream fired - either a block was imported or enough transactions arrived. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RebuildTrigger { + /// Initial trigger to bootstrap mining (fires once on first poll). + Initial, + /// A new block was imported from the network. + BlockImported, + /// Enough new transactions arrived to trigger a rebuild. + NewTransactions, +} + +/// A stream that waits for a block import or new transactions (with rate limiting). +/// +/// This enables block producers to include new transactions faster by rebuilding +/// the block being mined when transactions arrive, rather than waiting for the +/// next block import or timeout. +/// +/// Rate limiting prevents excessive rebuilds - we limit to `max_rebuilds_per_sec`. +pub struct UntilImportedOrTransaction { + /// Block import notifications stream. import_notifications: ImportNotifications, - timeout: Duration, - inner_delay: Option, + /// Transaction pool import notifications stream. + tx_notifications: Pin + Send>>, + /// Minimum interval between transaction-triggered rebuilds. + min_rebuild_interval: Duration, + /// Rate limit delay - if set, we're waiting before we can fire again. + rate_limit_delay: Option, + /// Whether we've fired the initial trigger yet. + initial_fired: bool, + /// Whether we have pending transactions waiting to trigger a rebuild. + has_pending_tx: bool, } -impl UntilImportedOrTimeout { - /// Create a new stream using the given import notification and timeout duration. - pub fn new(import_notifications: ImportNotifications, timeout: Duration) -> Self { - Self { import_notifications, timeout, inner_delay: None } +impl UntilImportedOrTransaction { + /// Create a new stream. + /// + /// # Arguments + /// * `import_notifications` - Stream of block import notifications + /// * `tx_notifications` - Stream of transaction import notifications + /// * `max_rebuilds_per_sec` - Maximum transaction-triggered rebuilds per second + pub fn new( + import_notifications: ImportNotifications, + tx_notifications: impl Stream + Send + 'static, + max_rebuilds_per_sec: u32, + ) -> Self { + let min_rebuild_interval = if max_rebuilds_per_sec > 0 { + Duration::from_millis(1000 / max_rebuilds_per_sec as u64) + } else { + Duration::from_secs(u64::MAX) // Effectively disable tx-triggered rebuilds + }; + + Self { + import_notifications, + tx_notifications: Box::pin(tx_notifications), + min_rebuild_interval, + rate_limit_delay: None, + initial_fired: false, + has_pending_tx: false, + } } } -impl Stream for UntilImportedOrTimeout { - type Item = (); +impl Stream for UntilImportedOrTransaction { + type Item = RebuildTrigger; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut fire = false; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // Fire immediately on first poll to bootstrap mining at genesis + if !self.initial_fired { + self.initial_fired = true; + debug!(target: LOG_TARGET, "Initial trigger, bootstrapping block production"); + return Poll::Ready(Some(RebuildTrigger::Initial)); + } - loop { - match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { - Poll::Pending => break, - Poll::Ready(Some(_)) => { - fire = true; + // Check for block imports first - these always trigger immediately + if let Poll::Ready(notification) = + Stream::poll_next(Pin::new(&mut self.import_notifications), cx) + { + match notification { + Some(_) => { + // Block import resets pending state since we'll build fresh + self.has_pending_tx = false; + self.rate_limit_delay = None; + debug!(target: LOG_TARGET, "Block imported, triggering rebuild"); + return Poll::Ready(Some(RebuildTrigger::BlockImported)); }, - Poll::Ready(None) => return Poll::Ready(None), + None => return Poll::Ready(None), } } - let timeout = self.timeout; - let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout)); - - match Future::poll(Pin::new(inner_delay), cx) { - Poll::Pending => (), - Poll::Ready(()) => { - fire = true; - }, + // Drain all pending transaction notifications + while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.tx_notifications), cx) + { + self.has_pending_tx = true; } - if fire { - self.inner_delay = None; - Poll::Ready(Some(())) - } else { - Poll::Pending + // If we have pending transactions, check rate limit + if self.has_pending_tx { + // Check if rate limit allows firing (no delay or delay expired) + let can_fire = match self.rate_limit_delay.as_mut() { + None => true, + Some(delay) => Future::poll(Pin::new(delay), cx).is_ready(), + }; + + if can_fire { + self.has_pending_tx = false; + self.rate_limit_delay = Some(Delay::new(self.min_rebuild_interval)); + debug!(target: LOG_TARGET, "New transaction(s), triggering rebuild"); + return Poll::Ready(Some(RebuildTrigger::NewTransactions)); + } } + + Poll::Pending } } diff --git a/node/src/service.rs b/node/src/service.rs index df6f15bd..27a9b662 100644 --- a/node/src/service.rs +++ b/node/src/service.rs @@ -166,7 +166,8 @@ pub fn new_full< other: (pow_block_import, mut telemetry), } = new_partial(&config)?; - let mut tx_stream = transaction_pool.clone().import_notification_stream(); + let tx_stream_for_worker = transaction_pool.clone().import_notification_stream(); + let tx_stream_for_logger = transaction_pool.clone().import_notification_stream(); let net_config = sc_network::config::FullNetworkConfiguration::< Block, @@ -276,7 +277,7 @@ pub fn new_full< sync_service.clone(), rewards_address, inherent_data_providers, - Duration::from_secs(10), + tx_stream_for_worker, Duration::from_secs(10), ); @@ -357,17 +358,22 @@ pub fn new_full< // If external miner URL is provided, use external mining if let Some(miner_url) = &external_miner_url { - // Cancel previous job if metadata has changed - if let Some(job_id) = ¤t_job_id { - if let Err(e) = external_miner_client::cancel_mining_job( - &http_client, - miner_url, - job_id, - ) - .await - { - log::warn!("⛏️Failed to cancel previous mining job: {}", e); - } + // Fire-and-forget cancellation of previous job - don't wait for confirmation + // This reduces latency when switching to a new block + if let Some(old_job_id) = current_job_id.take() { + let cancel_client = http_client.clone(); + let cancel_url = miner_url.clone(); + tokio::spawn(async move { + if let Err(e) = external_miner_client::cancel_mining_job( + &cancel_client, + &cancel_url, + &old_job_id, + ) + .await + { + log::debug!("⛏️ Failed to cancel previous mining job {}: {}", old_job_id, e); + } + }); } // Get current distance_threshold from runtime @@ -514,6 +520,7 @@ pub fn new_full< }); task_manager.spawn_handle().spawn("tx-logger", None, async move { + let mut tx_stream = tx_stream_for_logger; while let Some(tx_hash) = tx_stream.next().await { if let Some(tx) = transaction_pool.ready_transaction(&tx_hash) { log::trace!(target: "miner", "New transaction: Hash = {:?}", tx_hash);