Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions client/consensus/qpow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use sp_consensus_qpow::QPoWApi;
use sp_runtime::{generic::BlockId, traits::Block as BlockT, AccountId32};
use std::{sync::Arc, time::Duration};

use crate::worker::UntilImportedOrTimeout;
pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata};
use futures::{Future, StreamExt};
use crate::worker::UntilImportedOrTransaction;
pub use crate::worker::{MiningBuild, MiningHandle, MiningMetadata, RebuildTrigger};
use futures::{Future, Stream, StreamExt};
use log::*;
use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
Expand Down Expand Up @@ -342,18 +342,28 @@ where
Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry))
}

/// Maximum transaction-triggered rebuilds per second.
/// Hardcoded for now but could be made configurable later.
const MAX_REBUILDS_PER_SEC: u32 = 2;

/// Start the mining worker for QPoW. This function provides the necessary helper functions that can
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
///
/// Two values are returned -- a worker, which contains functions that allows querying the current
/// mining metadata and submitting mined blocks, and a future, which must be polled to fill in
/// information in the worker.
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
/// The worker will rebuild blocks when:
/// - A new block is imported from the network
/// - New transactions arrive (rate limited to MAX_REBUILDS_PER_SEC)
///
/// This allows transactions to be included faster since we don't wait for the next block import
/// to rebuild. Mining on a new block vs the old block has the same probability of success per
/// nonce, so the only cost is the overhead of rebuilding (which is minimal compared to mining
/// time).
#[allow(clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
pub fn start_mining_worker<Block, C, S, E, SO, L, CIDP>(
pub fn start_mining_worker<Block, C, S, E, SO, L, CIDP, TxHash, TxStream>(
block_import: BoxBlockImport<Block>,
client: Arc<C>,
select_chain: S,
Expand All @@ -362,7 +372,7 @@ pub fn start_mining_worker<Block, C, S, E, SO, L, CIDP>(
justification_sync_link: L,
rewards_address: AccountId32,
create_inherent_data_providers: CIDP,
timeout: Duration,
tx_notifications: TxStream,
build_time: Duration,
) -> (MiningHandle<Block, C, L, <E::Proposer as Proposer<Block>>::Proof>, impl Future<Output = ()>)
where
Expand All @@ -381,17 +391,22 @@ where
SO: SyncOracle + Clone + Send + Sync + 'static,
L: JustificationSyncLink<Block>,
CIDP: CreateInherentDataProviders<Block, ()>,
TxHash: Send + 'static,
TxStream: Stream<Item = TxHash> + Send + Unpin + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let mut trigger_stream = UntilImportedOrTransaction::new(
client.import_notification_stream(),
tx_notifications,
MAX_REBUILDS_PER_SEC,
);
let worker = MiningHandle::new(client.clone(), block_import, justification_sync_link);
let worker_ret = worker.clone();

let task = async move {
loop {
if timer.next().await.is_none() {
break;
}

// Main block building loop - runs until trigger stream closes
// Wait for a trigger (Initial, BlockImported, or NewTransactions)
// continue skips to the next iteration to wait for another trigger
while let Some(trigger) = trigger_stream.next().await {
if sync_oracle.is_major_syncing() {
debug!(target: LOG_TARGET, "Skipping proposal due to sync.");
worker.on_major_syncing();
Expand All @@ -412,7 +427,9 @@ where
};
let best_hash = best_header.hash();

if worker.best_hash() == Some(best_hash) {
// Skip redundant block import triggers if we're already building on this hash.
// Initial and NewTransactions triggers should proceed to rebuild.
if trigger == RebuildTrigger::BlockImported && worker.best_hash() == Some(best_hash) {
continue;
}

Expand Down
128 changes: 97 additions & 31 deletions client/consensus/qpow/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,51 +198,117 @@ where
}
}

/// A stream that waits for a block import or timeout.
pub struct UntilImportedOrTimeout<Block: BlockT> {
/// Reason why the stream fired - either a block was imported or enough transactions arrived.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RebuildTrigger {
/// Initial trigger to bootstrap mining (fires once on first poll).
Initial,
/// A new block was imported from the network.
BlockImported,
/// Enough new transactions arrived to trigger a rebuild.
NewTransactions,
}

/// A stream that waits for a block import or new transactions (with rate limiting).
///
/// This enables block producers to include new transactions faster by rebuilding
/// the block being mined when transactions arrive, rather than waiting for the
/// next block import or timeout.
///
/// Rate limiting prevents excessive rebuilds - we limit to `max_rebuilds_per_sec`.
pub struct UntilImportedOrTransaction<Block: BlockT, TxHash> {
/// Block import notifications stream.
import_notifications: ImportNotifications<Block>,
timeout: Duration,
inner_delay: Option<Delay>,
/// Transaction pool import notifications stream.
tx_notifications: Pin<Box<dyn Stream<Item = TxHash> + Send>>,
/// Minimum interval between transaction-triggered rebuilds.
min_rebuild_interval: Duration,
/// Rate limit delay - if set, we're waiting before we can fire again.
rate_limit_delay: Option<Delay>,
/// Whether we've fired the initial trigger yet.
initial_fired: bool,
/// Whether we have pending transactions waiting to trigger a rebuild.
has_pending_tx: bool,
}

impl<Block: BlockT> UntilImportedOrTimeout<Block> {
/// Create a new stream using the given import notification and timeout duration.
pub fn new(import_notifications: ImportNotifications<Block>, timeout: Duration) -> Self {
Self { import_notifications, timeout, inner_delay: None }
impl<Block: BlockT, TxHash> UntilImportedOrTransaction<Block, TxHash> {
/// Create a new stream.
///
/// # Arguments
/// * `import_notifications` - Stream of block import notifications
/// * `tx_notifications` - Stream of transaction import notifications
/// * `max_rebuilds_per_sec` - Maximum transaction-triggered rebuilds per second
pub fn new(
import_notifications: ImportNotifications<Block>,
tx_notifications: impl Stream<Item = TxHash> + Send + 'static,
max_rebuilds_per_sec: u32,
) -> Self {
let min_rebuild_interval = if max_rebuilds_per_sec > 0 {
Duration::from_millis(1000 / max_rebuilds_per_sec as u64)
} else {
Duration::from_secs(u64::MAX) // Effectively disable tx-triggered rebuilds
};

Self {
import_notifications,
tx_notifications: Box::pin(tx_notifications),
min_rebuild_interval,
rate_limit_delay: None,
initial_fired: false,
has_pending_tx: false,
}
}
}

impl<Block: BlockT> Stream for UntilImportedOrTimeout<Block> {
type Item = ();
impl<Block: BlockT, TxHash> Stream for UntilImportedOrTransaction<Block, TxHash> {
type Item = RebuildTrigger;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
let mut fire = false;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<RebuildTrigger>> {
// Fire immediately on first poll to bootstrap mining at genesis
if !self.initial_fired {
self.initial_fired = true;
debug!(target: LOG_TARGET, "Initial trigger, bootstrapping block production");
return Poll::Ready(Some(RebuildTrigger::Initial));
}

loop {
match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
Poll::Pending => break,
Poll::Ready(Some(_)) => {
fire = true;
// Check for block imports first - these always trigger immediately
if let Poll::Ready(notification) =
Stream::poll_next(Pin::new(&mut self.import_notifications), cx)
{
match notification {
Some(_) => {
// Block import resets pending state since we'll build fresh
self.has_pending_tx = false;
self.rate_limit_delay = None;
debug!(target: LOG_TARGET, "Block imported, triggering rebuild");
return Poll::Ready(Some(RebuildTrigger::BlockImported));
},
Poll::Ready(None) => return Poll::Ready(None),
None => return Poll::Ready(None),
}
}

let timeout = self.timeout;
let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout));

match Future::poll(Pin::new(inner_delay), cx) {
Poll::Pending => (),
Poll::Ready(()) => {
fire = true;
},
// Drain all pending transaction notifications
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.tx_notifications), cx)
{
self.has_pending_tx = true;
}

if fire {
self.inner_delay = None;
Poll::Ready(Some(()))
} else {
Poll::Pending
// If we have pending transactions, check rate limit
if self.has_pending_tx {
// Check if rate limit allows firing (no delay or delay expired)
let can_fire = match self.rate_limit_delay.as_mut() {
None => true,
Some(delay) => Future::poll(Pin::new(delay), cx).is_ready(),
};

if can_fire {
self.has_pending_tx = false;
self.rate_limit_delay = Some(Delay::new(self.min_rebuild_interval));
debug!(target: LOG_TARGET, "New transaction(s), triggering rebuild");
return Poll::Ready(Some(RebuildTrigger::NewTransactions));
}
}

Poll::Pending
}
}
33 changes: 20 additions & 13 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ pub fn new_full<
other: (pow_block_import, mut telemetry),
} = new_partial(&config)?;

let mut tx_stream = transaction_pool.clone().import_notification_stream();
let tx_stream_for_worker = transaction_pool.clone().import_notification_stream();
let tx_stream_for_logger = transaction_pool.clone().import_notification_stream();

let net_config = sc_network::config::FullNetworkConfiguration::<
Block,
Expand Down Expand Up @@ -276,7 +277,7 @@ pub fn new_full<
sync_service.clone(),
rewards_address,
inherent_data_providers,
Duration::from_secs(10),
tx_stream_for_worker,
Duration::from_secs(10),
);

Expand Down Expand Up @@ -357,17 +358,22 @@ pub fn new_full<

// If external miner URL is provided, use external mining
if let Some(miner_url) = &external_miner_url {
// Cancel previous job if metadata has changed
if let Some(job_id) = &current_job_id {
if let Err(e) = external_miner_client::cancel_mining_job(
&http_client,
miner_url,
job_id,
)
.await
{
log::warn!("⛏️Failed to cancel previous mining job: {}", e);
}
// Fire-and-forget cancellation of previous job - don't wait for confirmation
// This reduces latency when switching to a new block
if let Some(old_job_id) = current_job_id.take() {
let cancel_client = http_client.clone();
let cancel_url = miner_url.clone();
tokio::spawn(async move {
if let Err(e) = external_miner_client::cancel_mining_job(
&cancel_client,
&cancel_url,
&old_job_id,
)
.await
{
log::debug!("⛏️ Failed to cancel previous mining job {}: {}", old_job_id, e);
}
});
}

// Get current distance_threshold from runtime
Expand Down Expand Up @@ -514,6 +520,7 @@ pub fn new_full<
});

task_manager.spawn_handle().spawn("tx-logger", None, async move {
let mut tx_stream = tx_stream_for_logger;
while let Some(tx_hash) = tx_stream.next().await {
if let Some(tx) = transaction_pool.ready_transaction(&tx_hash) {
log::trace!(target: "miner", "New transaction: Hash = {:?}", tx_hash);
Expand Down
Loading