From 1ab8e736fd0431b2241ccb3ad05e79d0e7b25f1e Mon Sep 17 00:00:00 2001 From: fi3 Date: Wed, 6 May 2026 13:57:12 +0200 Subject: [PATCH] FIX upstream share classification by preserving the aggregate pool target Keep the shared upstream target sourced only from real upstream channel state instead of replacing it when individual downstream channels open. Downstream-specific targets remain per-channel, so share classification continues to compare bitcoin, aggregate upstream, and downstream targets in the intended order. Emit immediate channel retarget updates whenever aggregate nominal hashrate changes during downstream registration, downstream removal, or hashrate reconciliation. This keeps the pool target aligned during connection churn, prevents local-only shares from being escalated upstream, and leaves the submit limiter as a safety net instead of the steady-state control path. Add regression coverage for shared target stability across multiple downstream opens, local classification with distinct downstream targets, and immediate upstream retarget propagation. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/translator/downstream/diff_management.rs | 62 +++-- src/translator/downstream/downstream.rs | 6 +- src/translator/downstream/notify.rs | 65 +++++- src/translator/mod.rs | 7 +- src/translator/proxy/bridge.rs | 233 ++++++++++++++++++- src/translator/upstream/diff_management.rs | 85 ++++++- src/translator/upstream/upstream.rs | 79 +++++-- 9 files changed, 467 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ccb010d6..0a53112d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1112,7 +1112,7 @@ dependencies = [ [[package]] name = "dmnd-client" -version = "0.3.14" +version = "0.3.15" dependencies = [ "async-recursion", "axum", diff --git a/Cargo.toml b/Cargo.toml index 28503802..4778a0a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dmnd-client" -version = "0.3.14" +version = "0.3.15" edition = "2021" [lib] diff --git a/src/translator/downstream/diff_management.rs b/src/translator/downstream/diff_management.rs index 62840825..61dcbd82 100644 --- a/src/translator/downstream/diff_management.rs +++ b/src/translator/downstream/diff_management.rs @@ -90,6 +90,7 @@ impl Downstream { u.channel_nominal_hashrate -= // Make sure that upstream channel hasrate never goes below 0 f32::min(estimated_downstream_hash_rate, u.channel_nominal_hashrate); + u.request_immediate_update(); })?; Ok(()) } @@ -318,6 +319,7 @@ impl Downstream { } else { c.channel_nominal_hashrate = 0.0; } + c.request_immediate_update(); })?; Ok(()) } @@ -560,10 +562,8 @@ mod test { initial_difficulty: latest_difficulty, hard_minimum_difficulty: None, }; - let upstream_config = Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate, - })); + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, channel_nominal_hashrate); + let upstream_config = Arc::new(Mutex::new(upstream_config)); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, _rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); @@ -620,10 +620,8 @@ mod test { initial_difficulty: latest_difficulty, hard_minimum_difficulty: None, }; - let upstream_config = Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate, - })); + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, channel_nominal_hashrate); + let upstream_config = Arc::new(Mutex::new(upstream_config)); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); @@ -683,8 +681,15 @@ mod test { target_rate_submits(), RAW_BOOTSTRAP_HASHRATE, ); + let mut update_rx = upstream_config + .safe_lock(|u| u.subscribe_updates()) + .unwrap(); Downstream::update_difficulty_and_hashrate(&downstream).unwrap(); + tokio::time::timeout(Duration::from_secs(1), update_rx.changed()) + .await + .unwrap() + .unwrap(); let estimated_downstream_hash_rate = downstream .safe_lock(|d| d.difficulty_mgmt.estimated_downstream_hash_rate) @@ -697,6 +702,37 @@ mod test { assert_hashrate_close(channel_nominal_hashrate, quantized_hashrate); } + #[tokio::test] + async fn removing_downstream_hashrate_requests_immediate_upstream_retarget() { + let latest_difficulty = 1_024.0; + let estimated_hashrate = hashrate_for_diff(latest_difficulty); + let pid = Pid::new(*crate::SHARE_PER_MIN, latest_difficulty * 10.0); + let (downstream, upstream_config) = seeded_downstream( + estimated_hashrate, + latest_difficulty, + pid, + VecDeque::new(), + estimated_hashrate, + ); + let mut update_rx = upstream_config + .safe_lock(|u| u.subscribe_updates()) + .unwrap(); + + downstream + .safe_lock(|d| d.mark_channel_hashrate_registered()) + .unwrap(); + Downstream::remove_downstream_hashrate_from_channel(&downstream).unwrap(); + tokio::time::timeout(Duration::from_secs(1), update_rx.changed()) + .await + .unwrap() + .unwrap(); + + let channel_nominal_hashrate = upstream_config + .safe_lock(|u| u.channel_nominal_hashrate) + .unwrap(); + assert_eq!(channel_nominal_hashrate, 0.0); + } + #[tokio::test] #[ignore = "non-zero delay is blocked until the stale bootstrap difficulty replay bug is fixed"] async fn positive_delay_does_not_replay_stale_bootstrap_difficulty_after_retarget() { @@ -840,10 +876,7 @@ mod test { initial_difficulty: 10_000.0, hard_minimum_difficulty: Some(NON_LOCAL_DOWNSTREAM_MIN_DIFFICULTY), }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, _rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); @@ -902,10 +935,7 @@ mod test { initial_difficulty: 10_000_000_000.0, hard_minimum_difficulty: None, }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, _rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); diff --git a/src/translator/downstream/downstream.rs b/src/translator/downstream/downstream.rs index bd4e3963..d00dfeca 100644 --- a/src/translator/downstream/downstream.rs +++ b/src/translator/downstream/downstream.rs @@ -1236,10 +1236,8 @@ mod tests { initial_difficulty: 1.0, hard_minimum_difficulty: None, }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = channel::(8); let (tx_outgoing, rx_outgoing) = channel(8); let (tx_update_token, _rx_update_token) = channel(8); diff --git a/src/translator/downstream/notify.rs b/src/translator/downstream/notify.rs index 331c03af..55e98c14 100644 --- a/src/translator/downstream/notify.rs +++ b/src/translator/downstream/notify.rs @@ -49,8 +49,10 @@ pub async fn start_notify( d.difficulty_mgmt.estimated_downstream_hash_rate, ) })?; - upstream_difficulty_config - .safe_lock(|c| c.channel_nominal_hashrate += registered_hashrate)?; + upstream_difficulty_config.safe_lock(|c| { + c.channel_nominal_hashrate += registered_hashrate; + c.request_immediate_update(); + })?; downstream.safe_lock(|d| d.mark_channel_hashrate_registered())?; if let Err(e) = stats_sender.setup_stats_reliable(connection_id).await { error!("Failed to register downstream stats {connection_id}: {e}"); @@ -66,6 +68,7 @@ pub async fn start_notify( upstream_difficulty_config.safe_lock(|u| { u.channel_nominal_hashrate -= f32::min(registered_hashrate, u.channel_nominal_hashrate); + u.request_immediate_update(); })?; } return Ok(()); @@ -275,10 +278,8 @@ mod tests { initial_difficulty: 1.0, hard_minimum_difficulty: None, }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = channel::(8); let (tx_outgoing, rx_outgoing) = channel(8); let (tx_update_token, _rx_update_token) = channel(8); @@ -342,10 +343,11 @@ mod tests { initial_difficulty: latest_difficulty, hard_minimum_difficulty: None, }; - let upstream_config = Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, + let (upstream_config, _rx) = UpstreamDifficultyConfig::new( + crate::CHANNEL_DIFF_UPDTATE_INTERVAL, channel_nominal_hashrate, - })); + ); + let upstream_config = Arc::new(Mutex::new(upstream_config)); let (tx_sv1_submit, _rx_sv1_submit) = channel::(8); let (tx_outgoing, _rx_outgoing) = channel(8); let (tx_update_token, _rx_update_token) = channel(8); @@ -421,6 +423,51 @@ mod tests { } } + #[tokio::test] + async fn start_notify_requests_immediate_upstream_retarget_on_register() { + let first_job = first_job("91"); + let (downstream, _rx_outgoing) = + downstream_with_first_job(first_job, vec!["worker".to_string()]); + let downstream = Arc::new(Mutex::new(downstream)); + let mut update_rx = downstream + .safe_lock(|d| { + d.upstream_difficulty_config + .safe_lock(|c| c.subscribe_updates()) + .unwrap() + }) + .unwrap(); + let task_manager = TaskManager::initialize(); + let (_tx_notify, rx_notify) = broadcast::channel(8); + + start_notify( + task_manager.clone(), + downstream.clone(), + rx_notify, + "127.0.0.1".to_string(), + 1, + ) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(1), update_rx.changed()) + .await + .unwrap() + .unwrap(); + + let channel_nominal_hashrate = downstream + .safe_lock(|d| { + d.upstream_difficulty_config + .safe_lock(|c| c.channel_nominal_hashrate) + .unwrap() + }) + .unwrap(); + assert_eq!(channel_nominal_hashrate, 1.0); + + if let Some(aborter) = task_manager.safe_lock(|t| t.get_aborter()).unwrap() { + drop(aborter); + } + } + #[tokio::test] async fn first_retarget_waits_full_adjustment_interval_before_fixing_upstream_hashrate() { let quantized_difficulty = quantized_difficulty_for_hashrate(RAW_BOOTSTRAP_HASHRATE); diff --git a/src/translator/mod.rs b/src/translator/mod.rs index 27dbff22..d70c0088 100644 --- a/src/translator/mod.rs +++ b/src/translator/mod.rs @@ -89,10 +89,10 @@ pub async fn start( let channel_nominal_hashrate = 0.0; - let upstream_diff = UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, + let (upstream_diff, diff_update_rx) = UpstreamDifficultyConfig::new( + crate::CHANNEL_DIFF_UPDTATE_INTERVAL, channel_nominal_hashrate, - }; + ); let diff_config = Arc::new(Mutex::new(upstream_diff)); // Instantiate a new `Upstream` (SV2 Pool) @@ -113,6 +113,7 @@ pub async fn start( recv_from_up, rx_sv2_submit_shares_ext, rx_update_token, + diff_update_rx, ) .await?; TaskManager::add_upstream(task_manager.clone(), upstream_abortable) diff --git a/src/translator/proxy/bridge.rs b/src/translator/proxy/bridge.rs index 5b35df18..6724f39e 100644 --- a/src/translator/proxy/bridge.rs +++ b/src/translator/proxy/bridge.rs @@ -128,11 +128,6 @@ impl Bridge { debug!("New extended channel opened with id {}", success.channel_id); let extranonce = success.extranonce_prefix.to_vec(); let extranonce2_len = success.extranonce_size; - self.target - .safe_lock(|t| *t = success.target.to_vec()) - .map_err(|e| { - Error::TargetError(roles_logic_sv2::Error::PoisonLock(e.to_string())) - })?; Ok(OpenSv1Downstream { channel_id: success.channel_id, last_notify: self.last_notify.clone(), @@ -730,24 +725,38 @@ pub struct OpenSv1Downstream { #[cfg(test)] mod test { use super::*; + use crate::translator::downstream::Downstream; + use bitcoin::{blockdata::witness::Witness, hashes::Hash}; use tokio::sync::mpsc; + const TEST_JOB_ID: u32 = 0; + const TEST_NTIME: u32 = 1_700_000_000; + pub mod test_utils { use super::*; pub fn create_bridge(extranonces: ExtendedExtranonce) -> Result>, ()> { + create_bridge_with_upstream_target( + extranonces, + [ + 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + ], + ) + } + + pub fn create_bridge_with_upstream_target( + extranonces: ExtendedExtranonce, + upstream_target: [u8; 32], + ) -> Result>, ()> { let (tx_sv2_submit_shares_ext, _rx_sv2_submit_shares_ext) = mpsc::channel(1); let (tx_sv1_notify, _rx_sv1_notify) = broadcast::channel(1); - let upstream_target = vec![ - 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, - ]; let b = Bridge::new( tx_sv2_submit_shares_ext.clone(), tx_sv1_notify, extranonces, - Arc::new(Mutex::new(upstream_target)), + Arc::new(Mutex::new(upstream_target.to_vec())), 1, ) .map_err(|_| ())?; @@ -765,12 +774,103 @@ mod test { id: 0, } } + + pub fn create_sv1_submit_with_fields( + job_id: u32, + extranonce2_len: usize, + ntime: u32, + nonce: u32, + ) -> Submit<'static> { + Submit { + user_name: "test_user".to_string(), + job_id: job_id.to_string(), + extra_nonce2: sv1_api::utils::Extranonce::try_from(vec![0; extranonce2_len]) + .unwrap(), + time: sv1_api::utils::HexU32Be(ntime), + nonce: sv1_api::utils::HexU32Be(nonce), + version_bits: None, + id: 0, + } + } + } + + fn seed_bridge_job(bridge: &mut Bridge) { + let out_id = bitcoin::hashes::sha256d::Hash::from_slice(&[0_u8; 32]).unwrap(); + let previous_output = bitcoin::OutPoint { + txid: bitcoin::Txid::from_raw_hash(out_id), + vout: 0xffff_ffff, + }; + let input = bitcoin::TxIn { + previous_output, + script_sig: vec![89_u8; 16].into(), + sequence: bitcoin::Sequence(0), + witness: Witness::new(), + }; + let tx = bitcoin::Transaction { + version: bitcoin::transaction::Version(1), + lock_time: bitcoin::locktime::absolute::LockTime::from_time(TEST_NTIME).unwrap(), + input: vec![input], + output: vec![], + }; + let tx = bitcoin::consensus::serialize(&tx); + let prev_hash = SetNewPrevHash { + channel_id: 1, + job_id: TEST_JOB_ID, + prev_hash: [3; 32].into(), + min_ntime: TEST_NTIME, + nbits: 0x1d00ffff, + }; + bridge.channel_factory.on_new_prev_hash(prev_hash).unwrap(); + let new_mining_job = NewExtendedMiningJob { + channel_id: 1, + job_id: TEST_JOB_ID, + min_ntime: binary_sv2::Sv2Option::new(Some(TEST_NTIME)), + version: 0, + version_rolling_allowed: false, + merkle_path: vec![].into(), + coinbase_tx_prefix: tx[0..42].to_vec().try_into().unwrap(), + coinbase_tx_suffix: tx[58..].to_vec().try_into().unwrap(), + }; + bridge + .channel_factory + .on_new_extended_mining_job(new_mining_job) + .unwrap(); + } + + fn classify_submit( + bridge: &mut Bridge, + channel_id: u32, + extranonce2_len: usize, + nonce: u32, + ) -> OnNewShare { + let upstream_target: [u8; 32] = bridge + .target + .safe_lock(|t| t.clone()) + .unwrap() + .try_into() + .unwrap(); + let mut upstream_target: Target = upstream_target.into(); + bridge.channel_factory.set_target(&mut upstream_target); + let translated = bridge + .translate_submit( + channel_id, + test_utils::create_sv1_submit_with_fields( + TEST_JOB_ID, + extranonce2_len, + TEST_NTIME, + nonce, + ), + None, + ) + .unwrap(); + bridge + .channel_factory + .on_submit_shares_extended(translated) + .unwrap() } #[test] fn test_version_bits_insert() { - use bitcoin::{blockdata::witness::Witness, hashes::Hash}; - let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); let bridge = match test_utils::create_bridge(extranonces) { Ok(bridge) => bridge, @@ -854,4 +954,113 @@ mod test { }) .unwrap(); } + + #[test] + fn opening_second_downstream_does_not_clobber_shared_upstream_target() { + let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); + let upstream_target = Downstream::difficulty_to_target(1_024.0); + let bridge = + test_utils::create_bridge_with_upstream_target(extranonces, upstream_target).unwrap(); + + bridge + .safe_lock(|bridge| { + bridge.on_new_sv1_connection(1_000_000_000_000.0).unwrap(); + bridge.on_new_sv1_connection(10_000_000_000_000.0).unwrap(); + }) + .unwrap(); + + let shared_target = bridge + .safe_lock(|bridge| bridge.target.safe_lock(|t| t.clone()).unwrap()) + .unwrap(); + assert_eq!(shared_target, upstream_target.to_vec()); + } + + #[test] + fn repeated_downstream_opens_leave_shared_upstream_target_unchanged() { + let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); + let upstream_target = Downstream::difficulty_to_target(2_048.0); + let bridge = + test_utils::create_bridge_with_upstream_target(extranonces, upstream_target).unwrap(); + + bridge + .safe_lock(|bridge| { + for index in 0..32 { + let hash_rate = 1_000_000_000_000.0 + (index as f32 * 250_000_000_000.0); + bridge.on_new_sv1_connection(hash_rate).unwrap(); + } + }) + .unwrap(); + + let shared_target = bridge + .safe_lock(|bridge| bridge.target.safe_lock(|t| t.clone()).unwrap()) + .unwrap(); + assert_eq!(shared_target, upstream_target.to_vec()); + } + + #[test] + fn share_that_only_meets_downstream_target_stays_local_with_two_channels() { + let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); + let upstream_target = [0; 32]; + let bridge = + test_utils::create_bridge_with_upstream_target(extranonces, upstream_target).unwrap(); + + let (channel_two_id, channel_two_extranonce2_len) = bridge + .safe_lock(|bridge| { + seed_bridge_job(bridge); + let channel_one = bridge.on_new_sv1_connection(1_000_000_000_000.0).unwrap(); + let channel_two = bridge.on_new_sv1_connection(2_000_000_000_000.0).unwrap(); + bridge.channel_factory.update_target_for_channel( + channel_one.channel_id, + Downstream::difficulty_to_target(16_384.0).into(), + ); + bridge + .channel_factory + .update_target_for_channel(channel_two.channel_id, [255; 32].into()); + (channel_two.channel_id, channel_two.extranonce2_len as usize) + }) + .unwrap(); + + let (found_local_only_share, downstream_hits, upstream_hits, bitcoin_hits, errors) = bridge + .safe_lock(|bridge| { + let mut downstream_hits = 0; + let mut upstream_hits = 0; + let mut bitcoin_hits = 0; + let mut errors = 0; + + let found = (0..4_096).any(|nonce| { + match classify_submit( + bridge, + channel_two_id, + channel_two_extranonce2_len, + nonce, + ) { + OnNewShare::ShareMeetDownstreamTarget => { + downstream_hits += 1; + true + } + OnNewShare::SendSubmitShareUpstream(_) => { + upstream_hits += 1; + false + } + OnNewShare::ShareMeetBitcoinTarget(_) => { + bitcoin_hits += 1; + false + } + OnNewShare::SendErrorDownstream(_) => { + errors += 1; + false + } + OnNewShare::RelaySubmitShareUpstream => false, + } + }); + + (found, downstream_hits, upstream_hits, bitcoin_hits, errors) + }) + .unwrap(); + + assert!( + found_local_only_share, + "expected at least one share to stay local; downstream={downstream_hits}, upstream={upstream_hits}, bitcoin={bitcoin_hits}, errors={errors}" + ); + } } diff --git a/src/translator/upstream/diff_management.rs b/src/translator/upstream/diff_management.rs index 350f1460..a8592f46 100644 --- a/src/translator/upstream/diff_management.rs +++ b/src/translator/upstream/diff_management.rs @@ -1,34 +1,60 @@ use crate::translator::error::Error; use super::Upstream; +use tokio::sync::watch; #[derive(Debug, Clone)] pub struct UpstreamDifficultyConfig { pub channel_diff_update_interval: u32, pub channel_nominal_hashrate: f32, + update_revision_tx: watch::Sender, +} + +impl UpstreamDifficultyConfig { + pub fn new( + channel_diff_update_interval: u32, + channel_nominal_hashrate: f32, + ) -> (Self, watch::Receiver) { + let (update_revision_tx, update_revision_rx) = watch::channel(0_u64); + ( + Self { + channel_diff_update_interval, + channel_nominal_hashrate, + update_revision_tx, + }, + update_revision_rx, + ) + } + + pub fn request_immediate_update(&self) { + let next_revision = (*self.update_revision_tx.borrow()).wrapping_add(1); + let _ = self.update_revision_tx.send(next_revision); + } + + #[cfg(test)] + pub fn subscribe_updates(&self) -> watch::Receiver { + self.update_revision_tx.subscribe() + } } use super::super::error::ProxyResult; use binary_sv2::u256_from_int; -use roles_logic_sv2::{ - mining_sv2::UpdateChannel, parsers::Mining, utils::Mutex, Error as RolesLogicError, -}; +use roles_logic_sv2::{mining_sv2::UpdateChannel, parsers::Mining, utils::Mutex}; use std::{sync::Arc, time::Duration}; use tracing::error; impl Upstream { - /// this function checks if the elapsed time since the last update has surpassed the config + /// Emit an `UpdateChannel` using the current aggregate nominal hashrate. pub(super) async fn try_update_hashrate(self_: Arc>) -> ProxyResult<'static, ()> { let (channel_id_option, diff_mgmt, tx_message) = self_ .safe_lock(|u| (u.channel_id, u.difficulty_config.clone(), u.sender.clone())) .map_err(|_e| Error::TranslatorDiffConfigMutexPoisoned)?; - let channel_id = channel_id_option.ok_or(super::super::error::Error::RolesSv2Logic( - RolesLogicError::NotFoundChannelId, - ))?; - let (timeout, new_hashrate) = diff_mgmt - .safe_lock(|d| (d.channel_diff_update_interval, d.channel_nominal_hashrate)) + let Some(channel_id) = channel_id_option else { + return Ok(()); + }; + let new_hashrate = diff_mgmt + .safe_lock(|d| d.channel_nominal_hashrate) .map_err(|_| Error::TranslatorDiffConfigMutexPoisoned)?; - // UPDATE CHANNEL let update_channel = UpdateChannel { channel_id, nominal_hash_rate: new_hashrate, @@ -40,7 +66,44 @@ impl Upstream { error!("Failed to send message"); return Err(Error::AsyncChannelError); } - tokio::time::sleep(Duration::from_secs(timeout as u64)).await; Ok(()) } + + pub(super) async fn run_diff_management( + self_: Arc>, + mut update_rx: watch::Receiver, + ) { + loop { + let timeout = match self_.safe_lock(|u| { + u.difficulty_config + .safe_lock(|d| d.channel_diff_update_interval) + .map_err(|_| Error::TranslatorDiffConfigMutexPoisoned) + }) { + Ok(Ok(timeout)) => timeout, + Ok(Err(e)) => { + error!("Failed to read upstream diff interval: {e:?}"); + return; + } + Err(e) => { + error!("Failed to read upstream diff interval: {e:?}"); + return; + } + }; + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(timeout as u64)) => {} + changed = update_rx.changed() => { + if changed.is_err() { + error!("Upstream diff update channel closed"); + return; + } + } + } + + if let Err(e) = Self::try_update_hashrate(self_.clone()).await { + error!("Failed to update hashrate: {e:?}"); + return; + } + } + } } diff --git a/src/translator/upstream/upstream.rs b/src/translator/upstream/upstream.rs index cfe494b5..af4d0216 100644 --- a/src/translator/upstream/upstream.rs +++ b/src/translator/upstream/upstream.rs @@ -24,10 +24,12 @@ use roles_logic_sv2::{ use std::{ collections::BTreeMap, sync::{atomic::AtomicBool, Arc}, - time::Duration, }; use tokio::{ - sync::mpsc::{Receiver as TReceiver, Sender as TSender}, + sync::{ + mpsc::{Receiver as TReceiver, Sender as TSender}, + watch, + }, task, }; use tracing::{error, info, warn}; @@ -146,6 +148,7 @@ impl Upstream { incoming_receiver: TReceiver>, rx_sv2_submit_shares_ext: TReceiver, rx_update_token: TReceiver, + diff_update_rx: watch::Receiver, ) -> Result> { let task_manager = TaskManager::initialize(); let abortable = task_manager @@ -156,7 +159,7 @@ impl Upstream { Self::connect(self_.clone()).await?; // Propagate error, it will be handled in the caller let (diff_manager_abortable, main_loop_abortable) = - Self::parse_incoming(self_.clone(), incoming_receiver)?; + Self::parse_incoming(self_.clone(), incoming_receiver, diff_update_rx)?; let handle_submit_abortable = Self::handle_submit(self_.clone(), rx_sv2_submit_shares_ext)?; let handle_token_update_abortable = @@ -278,6 +281,7 @@ impl Upstream { pub fn parse_incoming( self_: Arc>, mut receiver: TReceiver>, + diff_update_rx: watch::Receiver, ) -> ProxyResult<'static, (AbortOnDrop, AbortOnDrop)> { let clone = self_.clone(); let (tx_frame, tx_sv2_extranonce, tx_sv2_new_ext_mining_job, tx_sv2_set_new_prev_hash) = @@ -293,16 +297,7 @@ impl Upstream { .map_err(|_| Error::TranslatorUpstreamMutexPoisoned)?; let diff_manager_handle = { let self_ = self_.clone(); - task::spawn(async move { - // No need to start diff management immediatly - tokio::time::sleep(Duration::from_secs(5)).await; - loop { - if let Err(e) = Self::try_update_hashrate(self_.clone()).await { - error!("Failed to update hashrate: {e:?}"); - return; - }; - } - }) + task::spawn(async move { Self::run_diff_management(self_, diff_update_rx).await }) }; let main_loop_handle = { @@ -933,6 +928,7 @@ mod tests { use super::*; use crate::monitor::shares::RejectionReason; use tokio::sync::{mpsc, oneshot}; + use tokio::time::{timeout, Duration}; async fn test_upstream() -> Arc> { let (tx_sv2_set_new_prev_hash, _rx_sv2_set_new_prev_hash) = mpsc::channel(1); @@ -946,10 +942,9 @@ mod tests { crate::MIN_EXTRANONCE_SIZE - 1, tx_sv2_extranonce, Arc::new(Mutex::new(vec![0; 32])), - Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, - channel_nominal_hashrate: 0.0, - })), + Arc::new(Mutex::new( + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0).0, + )), sender, "sig".to_string(), ) @@ -1022,4 +1017,54 @@ mod tests { ); assert_eq!(second_result_rx.await.unwrap(), Ok(())); } + + #[tokio::test] + async fn immediate_signal_emits_update_channel_before_periodic_interval() { + let (tx_sv2_set_new_prev_hash, _rx_sv2_set_new_prev_hash) = mpsc::channel(1); + let (tx_sv2_new_ext_mining_job, _rx_sv2_new_ext_mining_job) = mpsc::channel(1); + let (tx_sv2_extranonce, _rx_sv2_extranonce) = mpsc::channel(1); + let (sender, mut receiver) = mpsc::channel(4); + let (difficulty_config, diff_update_rx) = + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0); + let difficulty_config = Arc::new(Mutex::new(difficulty_config)); + let upstream = Upstream::new( + tx_sv2_set_new_prev_hash, + tx_sv2_new_ext_mining_job, + crate::MIN_EXTRANONCE_SIZE - 1, + tx_sv2_extranonce, + Arc::new(Mutex::new(vec![0; 32])), + difficulty_config.clone(), + sender, + "sig".to_string(), + ) + .await + .unwrap(); + upstream.safe_lock(|u| u.channel_id = Some(42)).unwrap(); + + let (_incoming_tx, incoming_rx) = mpsc::channel(1); + let (diff_manager_abortable, main_loop_abortable) = + Upstream::parse_incoming(upstream.clone(), incoming_rx, diff_update_rx).unwrap(); + + difficulty_config + .safe_lock(|c| { + c.channel_nominal_hashrate = 12_345.0; + c.request_immediate_update(); + }) + .unwrap(); + + let message = timeout(Duration::from_millis(250), receiver.recv()) + .await + .unwrap() + .unwrap(); + match message { + Mining::UpdateChannel(update) => { + assert_eq!(update.channel_id, 42); + assert_eq!(update.nominal_hash_rate, 12_345.0); + } + other => panic!("expected UpdateChannel, got {other:?}"), + } + + drop(diff_manager_abortable); + drop(main_loop_abortable); + } }