diff --git a/Cargo.lock b/Cargo.lock index ccb010d..0a53112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1112,7 +1112,7 @@ dependencies = [ [[package]] name = "dmnd-client" -version = "0.3.14" +version = "0.3.15" dependencies = [ "async-recursion", "axum", diff --git a/Cargo.toml b/Cargo.toml index 2850380..4778a0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dmnd-client" -version = "0.3.14" +version = "0.3.15" edition = "2021" [lib] diff --git a/src/translator/downstream/diff_management.rs b/src/translator/downstream/diff_management.rs index 6284082..61dcbd8 100644 --- a/src/translator/downstream/diff_management.rs +++ b/src/translator/downstream/diff_management.rs @@ -90,6 +90,7 @@ impl Downstream { u.channel_nominal_hashrate -= // Make sure that upstream channel hasrate never goes below 0 f32::min(estimated_downstream_hash_rate, u.channel_nominal_hashrate); + u.request_immediate_update(); })?; Ok(()) } @@ -318,6 +319,7 @@ impl Downstream { } else { c.channel_nominal_hashrate = 0.0; } + c.request_immediate_update(); })?; Ok(()) } @@ -560,10 +562,8 @@ mod test { initial_difficulty: latest_difficulty, hard_minimum_difficulty: None, }; - let upstream_config = Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate, - })); + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, channel_nominal_hashrate); + let upstream_config = Arc::new(Mutex::new(upstream_config)); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, _rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); @@ -620,10 +620,8 @@ mod test { initial_difficulty: latest_difficulty, hard_minimum_difficulty: None, }; - let upstream_config = Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate, - })); + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, channel_nominal_hashrate); + let upstream_config = Arc::new(Mutex::new(upstream_config)); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); @@ -683,8 +681,15 @@ mod test { target_rate_submits(), RAW_BOOTSTRAP_HASHRATE, ); + let mut update_rx = upstream_config + .safe_lock(|u| u.subscribe_updates()) + .unwrap(); Downstream::update_difficulty_and_hashrate(&downstream).unwrap(); + tokio::time::timeout(Duration::from_secs(1), update_rx.changed()) + .await + .unwrap() + .unwrap(); let estimated_downstream_hash_rate = downstream .safe_lock(|d| d.difficulty_mgmt.estimated_downstream_hash_rate) @@ -697,6 +702,37 @@ mod test { assert_hashrate_close(channel_nominal_hashrate, quantized_hashrate); } + #[tokio::test] + async fn removing_downstream_hashrate_requests_immediate_upstream_retarget() { + let latest_difficulty = 1_024.0; + let estimated_hashrate = hashrate_for_diff(latest_difficulty); + let pid = Pid::new(*crate::SHARE_PER_MIN, latest_difficulty * 10.0); + let (downstream, upstream_config) = seeded_downstream( + estimated_hashrate, + latest_difficulty, + pid, + VecDeque::new(), + estimated_hashrate, + ); + let mut update_rx = upstream_config + .safe_lock(|u| u.subscribe_updates()) + .unwrap(); + + downstream + .safe_lock(|d| d.mark_channel_hashrate_registered()) + .unwrap(); + Downstream::remove_downstream_hashrate_from_channel(&downstream).unwrap(); + tokio::time::timeout(Duration::from_secs(1), update_rx.changed()) + .await + .unwrap() + .unwrap(); + + let channel_nominal_hashrate = upstream_config + .safe_lock(|u| u.channel_nominal_hashrate) + .unwrap(); + assert_eq!(channel_nominal_hashrate, 0.0); + } + #[tokio::test] #[ignore = "non-zero delay is blocked until the stale bootstrap difficulty replay bug is fixed"] async fn positive_delay_does_not_replay_stale_bootstrap_difficulty_after_retarget() { @@ -840,10 +876,7 @@ mod test { initial_difficulty: 10_000.0, hard_minimum_difficulty: Some(NON_LOCAL_DOWNSTREAM_MIN_DIFFICULTY), }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, _rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); @@ -902,10 +935,7 @@ mod test { initial_difficulty: 10_000_000_000.0, hard_minimum_difficulty: None, }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: 60, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = UpstreamDifficultyConfig::new(60, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = tokio::sync::mpsc::channel(10); let (tx_outgoing, _rx_outgoing) = channel(10); let (tx_update_token, _rx_update_token) = channel(10); diff --git a/src/translator/downstream/downstream.rs b/src/translator/downstream/downstream.rs index bd4e396..d00dfec 100644 --- a/src/translator/downstream/downstream.rs +++ b/src/translator/downstream/downstream.rs @@ -1236,10 +1236,8 @@ mod tests { initial_difficulty: 1.0, hard_minimum_difficulty: None, }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = channel::(8); let (tx_outgoing, rx_outgoing) = channel(8); let (tx_update_token, _rx_update_token) = channel(8); diff --git a/src/translator/downstream/notify.rs b/src/translator/downstream/notify.rs index 331c03a..55e98c1 100644 --- a/src/translator/downstream/notify.rs +++ b/src/translator/downstream/notify.rs @@ -49,8 +49,10 @@ pub async fn start_notify( d.difficulty_mgmt.estimated_downstream_hash_rate, ) })?; - upstream_difficulty_config - .safe_lock(|c| c.channel_nominal_hashrate += registered_hashrate)?; + upstream_difficulty_config.safe_lock(|c| { + c.channel_nominal_hashrate += registered_hashrate; + c.request_immediate_update(); + })?; downstream.safe_lock(|d| d.mark_channel_hashrate_registered())?; if let Err(e) = stats_sender.setup_stats_reliable(connection_id).await { error!("Failed to register downstream stats {connection_id}: {e}"); @@ -66,6 +68,7 @@ pub async fn start_notify( upstream_difficulty_config.safe_lock(|u| { u.channel_nominal_hashrate -= f32::min(registered_hashrate, u.channel_nominal_hashrate); + u.request_immediate_update(); })?; } return Ok(()); @@ -275,10 +278,8 @@ mod tests { initial_difficulty: 1.0, hard_minimum_difficulty: None, }; - let upstream_config = UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, - channel_nominal_hashrate: 0.0, - }; + let (upstream_config, _rx) = + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0); let (tx_sv1_submit, _rx_sv1_submit) = channel::(8); let (tx_outgoing, rx_outgoing) = channel(8); let (tx_update_token, _rx_update_token) = channel(8); @@ -342,10 +343,11 @@ mod tests { initial_difficulty: latest_difficulty, hard_minimum_difficulty: None, }; - let upstream_config = Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, + let (upstream_config, _rx) = UpstreamDifficultyConfig::new( + crate::CHANNEL_DIFF_UPDTATE_INTERVAL, channel_nominal_hashrate, - })); + ); + let upstream_config = Arc::new(Mutex::new(upstream_config)); let (tx_sv1_submit, _rx_sv1_submit) = channel::(8); let (tx_outgoing, _rx_outgoing) = channel(8); let (tx_update_token, _rx_update_token) = channel(8); @@ -421,6 +423,51 @@ mod tests { } } + #[tokio::test] + async fn start_notify_requests_immediate_upstream_retarget_on_register() { + let first_job = first_job("91"); + let (downstream, _rx_outgoing) = + downstream_with_first_job(first_job, vec!["worker".to_string()]); + let downstream = Arc::new(Mutex::new(downstream)); + let mut update_rx = downstream + .safe_lock(|d| { + d.upstream_difficulty_config + .safe_lock(|c| c.subscribe_updates()) + .unwrap() + }) + .unwrap(); + let task_manager = TaskManager::initialize(); + let (_tx_notify, rx_notify) = broadcast::channel(8); + + start_notify( + task_manager.clone(), + downstream.clone(), + rx_notify, + "127.0.0.1".to_string(), + 1, + ) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(1), update_rx.changed()) + .await + .unwrap() + .unwrap(); + + let channel_nominal_hashrate = downstream + .safe_lock(|d| { + d.upstream_difficulty_config + .safe_lock(|c| c.channel_nominal_hashrate) + .unwrap() + }) + .unwrap(); + assert_eq!(channel_nominal_hashrate, 1.0); + + if let Some(aborter) = task_manager.safe_lock(|t| t.get_aborter()).unwrap() { + drop(aborter); + } + } + #[tokio::test] async fn first_retarget_waits_full_adjustment_interval_before_fixing_upstream_hashrate() { let quantized_difficulty = quantized_difficulty_for_hashrate(RAW_BOOTSTRAP_HASHRATE); diff --git a/src/translator/mod.rs b/src/translator/mod.rs index 27dbff2..d70c008 100644 --- a/src/translator/mod.rs +++ b/src/translator/mod.rs @@ -89,10 +89,10 @@ pub async fn start( let channel_nominal_hashrate = 0.0; - let upstream_diff = UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, + let (upstream_diff, diff_update_rx) = UpstreamDifficultyConfig::new( + crate::CHANNEL_DIFF_UPDTATE_INTERVAL, channel_nominal_hashrate, - }; + ); let diff_config = Arc::new(Mutex::new(upstream_diff)); // Instantiate a new `Upstream` (SV2 Pool) @@ -113,6 +113,7 @@ pub async fn start( recv_from_up, rx_sv2_submit_shares_ext, rx_update_token, + diff_update_rx, ) .await?; TaskManager::add_upstream(task_manager.clone(), upstream_abortable) diff --git a/src/translator/proxy/bridge.rs b/src/translator/proxy/bridge.rs index 5b35df1..6724f39 100644 --- a/src/translator/proxy/bridge.rs +++ b/src/translator/proxy/bridge.rs @@ -128,11 +128,6 @@ impl Bridge { debug!("New extended channel opened with id {}", success.channel_id); let extranonce = success.extranonce_prefix.to_vec(); let extranonce2_len = success.extranonce_size; - self.target - .safe_lock(|t| *t = success.target.to_vec()) - .map_err(|e| { - Error::TargetError(roles_logic_sv2::Error::PoisonLock(e.to_string())) - })?; Ok(OpenSv1Downstream { channel_id: success.channel_id, last_notify: self.last_notify.clone(), @@ -730,24 +725,38 @@ pub struct OpenSv1Downstream { #[cfg(test)] mod test { use super::*; + use crate::translator::downstream::Downstream; + use bitcoin::{blockdata::witness::Witness, hashes::Hash}; use tokio::sync::mpsc; + const TEST_JOB_ID: u32 = 0; + const TEST_NTIME: u32 = 1_700_000_000; + pub mod test_utils { use super::*; pub fn create_bridge(extranonces: ExtendedExtranonce) -> Result>, ()> { + create_bridge_with_upstream_target( + extranonces, + [ + 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, + ], + ) + } + + pub fn create_bridge_with_upstream_target( + extranonces: ExtendedExtranonce, + upstream_target: [u8; 32], + ) -> Result>, ()> { let (tx_sv2_submit_shares_ext, _rx_sv2_submit_shares_ext) = mpsc::channel(1); let (tx_sv1_notify, _rx_sv1_notify) = broadcast::channel(1); - let upstream_target = vec![ - 0, 0, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, - ]; let b = Bridge::new( tx_sv2_submit_shares_ext.clone(), tx_sv1_notify, extranonces, - Arc::new(Mutex::new(upstream_target)), + Arc::new(Mutex::new(upstream_target.to_vec())), 1, ) .map_err(|_| ())?; @@ -765,12 +774,103 @@ mod test { id: 0, } } + + pub fn create_sv1_submit_with_fields( + job_id: u32, + extranonce2_len: usize, + ntime: u32, + nonce: u32, + ) -> Submit<'static> { + Submit { + user_name: "test_user".to_string(), + job_id: job_id.to_string(), + extra_nonce2: sv1_api::utils::Extranonce::try_from(vec![0; extranonce2_len]) + .unwrap(), + time: sv1_api::utils::HexU32Be(ntime), + nonce: sv1_api::utils::HexU32Be(nonce), + version_bits: None, + id: 0, + } + } + } + + fn seed_bridge_job(bridge: &mut Bridge) { + let out_id = bitcoin::hashes::sha256d::Hash::from_slice(&[0_u8; 32]).unwrap(); + let previous_output = bitcoin::OutPoint { + txid: bitcoin::Txid::from_raw_hash(out_id), + vout: 0xffff_ffff, + }; + let input = bitcoin::TxIn { + previous_output, + script_sig: vec![89_u8; 16].into(), + sequence: bitcoin::Sequence(0), + witness: Witness::new(), + }; + let tx = bitcoin::Transaction { + version: bitcoin::transaction::Version(1), + lock_time: bitcoin::locktime::absolute::LockTime::from_time(TEST_NTIME).unwrap(), + input: vec![input], + output: vec![], + }; + let tx = bitcoin::consensus::serialize(&tx); + let prev_hash = SetNewPrevHash { + channel_id: 1, + job_id: TEST_JOB_ID, + prev_hash: [3; 32].into(), + min_ntime: TEST_NTIME, + nbits: 0x1d00ffff, + }; + bridge.channel_factory.on_new_prev_hash(prev_hash).unwrap(); + let new_mining_job = NewExtendedMiningJob { + channel_id: 1, + job_id: TEST_JOB_ID, + min_ntime: binary_sv2::Sv2Option::new(Some(TEST_NTIME)), + version: 0, + version_rolling_allowed: false, + merkle_path: vec![].into(), + coinbase_tx_prefix: tx[0..42].to_vec().try_into().unwrap(), + coinbase_tx_suffix: tx[58..].to_vec().try_into().unwrap(), + }; + bridge + .channel_factory + .on_new_extended_mining_job(new_mining_job) + .unwrap(); + } + + fn classify_submit( + bridge: &mut Bridge, + channel_id: u32, + extranonce2_len: usize, + nonce: u32, + ) -> OnNewShare { + let upstream_target: [u8; 32] = bridge + .target + .safe_lock(|t| t.clone()) + .unwrap() + .try_into() + .unwrap(); + let mut upstream_target: Target = upstream_target.into(); + bridge.channel_factory.set_target(&mut upstream_target); + let translated = bridge + .translate_submit( + channel_id, + test_utils::create_sv1_submit_with_fields( + TEST_JOB_ID, + extranonce2_len, + TEST_NTIME, + nonce, + ), + None, + ) + .unwrap(); + bridge + .channel_factory + .on_submit_shares_extended(translated) + .unwrap() } #[test] fn test_version_bits_insert() { - use bitcoin::{blockdata::witness::Witness, hashes::Hash}; - let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); let bridge = match test_utils::create_bridge(extranonces) { Ok(bridge) => bridge, @@ -854,4 +954,113 @@ mod test { }) .unwrap(); } + + #[test] + fn opening_second_downstream_does_not_clobber_shared_upstream_target() { + let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); + let upstream_target = Downstream::difficulty_to_target(1_024.0); + let bridge = + test_utils::create_bridge_with_upstream_target(extranonces, upstream_target).unwrap(); + + bridge + .safe_lock(|bridge| { + bridge.on_new_sv1_connection(1_000_000_000_000.0).unwrap(); + bridge.on_new_sv1_connection(10_000_000_000_000.0).unwrap(); + }) + .unwrap(); + + let shared_target = bridge + .safe_lock(|bridge| bridge.target.safe_lock(|t| t.clone()).unwrap()) + .unwrap(); + assert_eq!(shared_target, upstream_target.to_vec()); + } + + #[test] + fn repeated_downstream_opens_leave_shared_upstream_target_unchanged() { + let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); + let upstream_target = Downstream::difficulty_to_target(2_048.0); + let bridge = + test_utils::create_bridge_with_upstream_target(extranonces, upstream_target).unwrap(); + + bridge + .safe_lock(|bridge| { + for index in 0..32 { + let hash_rate = 1_000_000_000_000.0 + (index as f32 * 250_000_000_000.0); + bridge.on_new_sv1_connection(hash_rate).unwrap(); + } + }) + .unwrap(); + + let shared_target = bridge + .safe_lock(|bridge| bridge.target.safe_lock(|t| t.clone()).unwrap()) + .unwrap(); + assert_eq!(shared_target, upstream_target.to_vec()); + } + + #[test] + fn share_that_only_meets_downstream_target_stays_local_with_two_channels() { + let extranonces = ExtendedExtranonce::new(0..6, 6..8, 8..16); + let upstream_target = [0; 32]; + let bridge = + test_utils::create_bridge_with_upstream_target(extranonces, upstream_target).unwrap(); + + let (channel_two_id, channel_two_extranonce2_len) = bridge + .safe_lock(|bridge| { + seed_bridge_job(bridge); + let channel_one = bridge.on_new_sv1_connection(1_000_000_000_000.0).unwrap(); + let channel_two = bridge.on_new_sv1_connection(2_000_000_000_000.0).unwrap(); + bridge.channel_factory.update_target_for_channel( + channel_one.channel_id, + Downstream::difficulty_to_target(16_384.0).into(), + ); + bridge + .channel_factory + .update_target_for_channel(channel_two.channel_id, [255; 32].into()); + (channel_two.channel_id, channel_two.extranonce2_len as usize) + }) + .unwrap(); + + let (found_local_only_share, downstream_hits, upstream_hits, bitcoin_hits, errors) = bridge + .safe_lock(|bridge| { + let mut downstream_hits = 0; + let mut upstream_hits = 0; + let mut bitcoin_hits = 0; + let mut errors = 0; + + let found = (0..4_096).any(|nonce| { + match classify_submit( + bridge, + channel_two_id, + channel_two_extranonce2_len, + nonce, + ) { + OnNewShare::ShareMeetDownstreamTarget => { + downstream_hits += 1; + true + } + OnNewShare::SendSubmitShareUpstream(_) => { + upstream_hits += 1; + false + } + OnNewShare::ShareMeetBitcoinTarget(_) => { + bitcoin_hits += 1; + false + } + OnNewShare::SendErrorDownstream(_) => { + errors += 1; + false + } + OnNewShare::RelaySubmitShareUpstream => false, + } + }); + + (found, downstream_hits, upstream_hits, bitcoin_hits, errors) + }) + .unwrap(); + + assert!( + found_local_only_share, + "expected at least one share to stay local; downstream={downstream_hits}, upstream={upstream_hits}, bitcoin={bitcoin_hits}, errors={errors}" + ); + } } diff --git a/src/translator/upstream/diff_management.rs b/src/translator/upstream/diff_management.rs index 350f146..a8592f4 100644 --- a/src/translator/upstream/diff_management.rs +++ b/src/translator/upstream/diff_management.rs @@ -1,34 +1,60 @@ use crate::translator::error::Error; use super::Upstream; +use tokio::sync::watch; #[derive(Debug, Clone)] pub struct UpstreamDifficultyConfig { pub channel_diff_update_interval: u32, pub channel_nominal_hashrate: f32, + update_revision_tx: watch::Sender, +} + +impl UpstreamDifficultyConfig { + pub fn new( + channel_diff_update_interval: u32, + channel_nominal_hashrate: f32, + ) -> (Self, watch::Receiver) { + let (update_revision_tx, update_revision_rx) = watch::channel(0_u64); + ( + Self { + channel_diff_update_interval, + channel_nominal_hashrate, + update_revision_tx, + }, + update_revision_rx, + ) + } + + pub fn request_immediate_update(&self) { + let next_revision = (*self.update_revision_tx.borrow()).wrapping_add(1); + let _ = self.update_revision_tx.send(next_revision); + } + + #[cfg(test)] + pub fn subscribe_updates(&self) -> watch::Receiver { + self.update_revision_tx.subscribe() + } } use super::super::error::ProxyResult; use binary_sv2::u256_from_int; -use roles_logic_sv2::{ - mining_sv2::UpdateChannel, parsers::Mining, utils::Mutex, Error as RolesLogicError, -}; +use roles_logic_sv2::{mining_sv2::UpdateChannel, parsers::Mining, utils::Mutex}; use std::{sync::Arc, time::Duration}; use tracing::error; impl Upstream { - /// this function checks if the elapsed time since the last update has surpassed the config + /// Emit an `UpdateChannel` using the current aggregate nominal hashrate. pub(super) async fn try_update_hashrate(self_: Arc>) -> ProxyResult<'static, ()> { let (channel_id_option, diff_mgmt, tx_message) = self_ .safe_lock(|u| (u.channel_id, u.difficulty_config.clone(), u.sender.clone())) .map_err(|_e| Error::TranslatorDiffConfigMutexPoisoned)?; - let channel_id = channel_id_option.ok_or(super::super::error::Error::RolesSv2Logic( - RolesLogicError::NotFoundChannelId, - ))?; - let (timeout, new_hashrate) = diff_mgmt - .safe_lock(|d| (d.channel_diff_update_interval, d.channel_nominal_hashrate)) + let Some(channel_id) = channel_id_option else { + return Ok(()); + }; + let new_hashrate = diff_mgmt + .safe_lock(|d| d.channel_nominal_hashrate) .map_err(|_| Error::TranslatorDiffConfigMutexPoisoned)?; - // UPDATE CHANNEL let update_channel = UpdateChannel { channel_id, nominal_hash_rate: new_hashrate, @@ -40,7 +66,44 @@ impl Upstream { error!("Failed to send message"); return Err(Error::AsyncChannelError); } - tokio::time::sleep(Duration::from_secs(timeout as u64)).await; Ok(()) } + + pub(super) async fn run_diff_management( + self_: Arc>, + mut update_rx: watch::Receiver, + ) { + loop { + let timeout = match self_.safe_lock(|u| { + u.difficulty_config + .safe_lock(|d| d.channel_diff_update_interval) + .map_err(|_| Error::TranslatorDiffConfigMutexPoisoned) + }) { + Ok(Ok(timeout)) => timeout, + Ok(Err(e)) => { + error!("Failed to read upstream diff interval: {e:?}"); + return; + } + Err(e) => { + error!("Failed to read upstream diff interval: {e:?}"); + return; + } + }; + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(timeout as u64)) => {} + changed = update_rx.changed() => { + if changed.is_err() { + error!("Upstream diff update channel closed"); + return; + } + } + } + + if let Err(e) = Self::try_update_hashrate(self_.clone()).await { + error!("Failed to update hashrate: {e:?}"); + return; + } + } + } } diff --git a/src/translator/upstream/upstream.rs b/src/translator/upstream/upstream.rs index cfe494b..af4d021 100644 --- a/src/translator/upstream/upstream.rs +++ b/src/translator/upstream/upstream.rs @@ -24,10 +24,12 @@ use roles_logic_sv2::{ use std::{ collections::BTreeMap, sync::{atomic::AtomicBool, Arc}, - time::Duration, }; use tokio::{ - sync::mpsc::{Receiver as TReceiver, Sender as TSender}, + sync::{ + mpsc::{Receiver as TReceiver, Sender as TSender}, + watch, + }, task, }; use tracing::{error, info, warn}; @@ -146,6 +148,7 @@ impl Upstream { incoming_receiver: TReceiver>, rx_sv2_submit_shares_ext: TReceiver, rx_update_token: TReceiver, + diff_update_rx: watch::Receiver, ) -> Result> { let task_manager = TaskManager::initialize(); let abortable = task_manager @@ -156,7 +159,7 @@ impl Upstream { Self::connect(self_.clone()).await?; // Propagate error, it will be handled in the caller let (diff_manager_abortable, main_loop_abortable) = - Self::parse_incoming(self_.clone(), incoming_receiver)?; + Self::parse_incoming(self_.clone(), incoming_receiver, diff_update_rx)?; let handle_submit_abortable = Self::handle_submit(self_.clone(), rx_sv2_submit_shares_ext)?; let handle_token_update_abortable = @@ -278,6 +281,7 @@ impl Upstream { pub fn parse_incoming( self_: Arc>, mut receiver: TReceiver>, + diff_update_rx: watch::Receiver, ) -> ProxyResult<'static, (AbortOnDrop, AbortOnDrop)> { let clone = self_.clone(); let (tx_frame, tx_sv2_extranonce, tx_sv2_new_ext_mining_job, tx_sv2_set_new_prev_hash) = @@ -293,16 +297,7 @@ impl Upstream { .map_err(|_| Error::TranslatorUpstreamMutexPoisoned)?; let diff_manager_handle = { let self_ = self_.clone(); - task::spawn(async move { - // No need to start diff management immediatly - tokio::time::sleep(Duration::from_secs(5)).await; - loop { - if let Err(e) = Self::try_update_hashrate(self_.clone()).await { - error!("Failed to update hashrate: {e:?}"); - return; - }; - } - }) + task::spawn(async move { Self::run_diff_management(self_, diff_update_rx).await }) }; let main_loop_handle = { @@ -933,6 +928,7 @@ mod tests { use super::*; use crate::monitor::shares::RejectionReason; use tokio::sync::{mpsc, oneshot}; + use tokio::time::{timeout, Duration}; async fn test_upstream() -> Arc> { let (tx_sv2_set_new_prev_hash, _rx_sv2_set_new_prev_hash) = mpsc::channel(1); @@ -946,10 +942,9 @@ mod tests { crate::MIN_EXTRANONCE_SIZE - 1, tx_sv2_extranonce, Arc::new(Mutex::new(vec![0; 32])), - Arc::new(Mutex::new(UpstreamDifficultyConfig { - channel_diff_update_interval: crate::CHANNEL_DIFF_UPDTATE_INTERVAL, - channel_nominal_hashrate: 0.0, - })), + Arc::new(Mutex::new( + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0).0, + )), sender, "sig".to_string(), ) @@ -1022,4 +1017,54 @@ mod tests { ); assert_eq!(second_result_rx.await.unwrap(), Ok(())); } + + #[tokio::test] + async fn immediate_signal_emits_update_channel_before_periodic_interval() { + let (tx_sv2_set_new_prev_hash, _rx_sv2_set_new_prev_hash) = mpsc::channel(1); + let (tx_sv2_new_ext_mining_job, _rx_sv2_new_ext_mining_job) = mpsc::channel(1); + let (tx_sv2_extranonce, _rx_sv2_extranonce) = mpsc::channel(1); + let (sender, mut receiver) = mpsc::channel(4); + let (difficulty_config, diff_update_rx) = + UpstreamDifficultyConfig::new(crate::CHANNEL_DIFF_UPDTATE_INTERVAL, 0.0); + let difficulty_config = Arc::new(Mutex::new(difficulty_config)); + let upstream = Upstream::new( + tx_sv2_set_new_prev_hash, + tx_sv2_new_ext_mining_job, + crate::MIN_EXTRANONCE_SIZE - 1, + tx_sv2_extranonce, + Arc::new(Mutex::new(vec![0; 32])), + difficulty_config.clone(), + sender, + "sig".to_string(), + ) + .await + .unwrap(); + upstream.safe_lock(|u| u.channel_id = Some(42)).unwrap(); + + let (_incoming_tx, incoming_rx) = mpsc::channel(1); + let (diff_manager_abortable, main_loop_abortable) = + Upstream::parse_incoming(upstream.clone(), incoming_rx, diff_update_rx).unwrap(); + + difficulty_config + .safe_lock(|c| { + c.channel_nominal_hashrate = 12_345.0; + c.request_immediate_update(); + }) + .unwrap(); + + let message = timeout(Duration::from_millis(250), receiver.recv()) + .await + .unwrap() + .unwrap(); + match message { + Mining::UpdateChannel(update) => { + assert_eq!(update.channel_id, 42); + assert_eq!(update.nominal_hash_rate, 12_345.0); + } + other => panic!("expected UpdateChannel, got {other:?}"), + } + + drop(diff_manager_abortable); + drop(main_loop_abortable); + } }