diff --git a/.gitignore b/.gitignore index 54466f5b..399c9a20 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,14 @@ -/target +/target/ +# Local environment and config files +.env +config.toml + +# Mining software +cpuminer-opt/ +cpuminer-opt + +# Other local files that shouldn't be tracked +*.local +.DS_Store + diff --git a/src/config.rs b/src/config.rs index 425ae3a0..4d9533a2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -50,12 +50,21 @@ struct Args { monitor: bool, #[clap(long, short = 'u')] auto_update: bool, + #[clap(long = "hashrate-dist", value_delimiter = ',')] + hashrate_distribution: Option>, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct PoolConfig { + pub address: SocketAddr, + pub weight: f32, } #[derive(Serialize, Deserialize)] struct ConfigFile { token: Option, tp_address: Option, + hashrate_distribution: Option>, interval: Option, delay: Option, downstream_hashrate: Option, @@ -76,6 +85,7 @@ impl ConfigFile { ConfigFile { token: None, tp_address: None, + hashrate_distribution: None, interval: None, delay: None, downstream_hashrate: None, @@ -96,6 +106,7 @@ impl ConfigFile { pub struct Configuration { token: Option, tp_address: Option, + hashrate_distribution: Option>, interval: u64, delay: u64, downstream_hashrate: f32, @@ -213,6 +224,76 @@ impl Configuration { CONFIG.auto_update } + pub fn hashrate_distribution() -> Option> { + CONFIG.hashrate_distribution.clone() + } + + pub async fn pool_configs() -> Option> { + let hashrate_dist = Self::hashrate_distribution(); + if let Some(distribution) = hashrate_dist { + // Get pool addresses dynamically + let addresses = Self::pool_address().await.unwrap_or_default(); + + if addresses.is_empty() { + warn!("No pool addresses provided for hashrate distribution"); + return None; + } + + let mut pools = Vec::new(); + let total_dist = distribution.iter().sum::(); + + if addresses.len() != distribution.len() { + warn!( + "Hashrate distribution length ({}) doesn't match pools ({}). Normalizing.", + distribution.len(), + addresses.len() + ); + } + + let min_len = addresses.len().min(distribution.len()); + for i in 0..min_len { + let weight = if total_dist > 0.0 { + distribution[i] / total_dist + } else { + 1.0 / addresses.len() as f32 + }; + pools.push(PoolConfig { + address: addresses[i], + weight, + }); + } + + // Assign 0.0 weight to extra addresses (if any) + for addr in addresses.into_iter().skip(min_len) { + pools.push(PoolConfig { + address: addr, + weight: 0.0, + }); + } + + Some(Self::normalize_pool_weights(pools)) + } else { + None + } + } + + /// Normalize weights to sum to 1.0 + fn normalize_pool_weights(mut pools: Vec) -> Vec { + let total_weight: f32 = pools.iter().map(|p| p.weight).sum(); + if total_weight <= 0.0 { + warn!("Total weight is zero or negative. Assigning equal weights."); + let equal_weight = 1.0 / pools.len() as f32; + for pool in &mut pools { + pool.weight = equal_weight; + } + } else { + for pool in &mut pools { + pool.weight /= total_weight; + } + } + pools + } + // Loads config from CLI, file, or env vars with precedence: CLI > file > env. fn load_config() -> Self { let args = Args::parse(); @@ -316,9 +397,21 @@ impl Configuration { || config.auto_update.unwrap_or(true) || std::env::var("AUTO_UPDATE").is_ok(); + let hashrate_distribution = args + .hashrate_distribution + .or(config.hashrate_distribution) + .or_else(|| { + std::env::var("HASHRATE_DISTRIBUTION").ok().and_then(|s| { + s.split(',') + .map(|x| x.trim().parse::().ok()) + .collect::>>() + }) + }); + Configuration { token, tp_address, + hashrate_distribution, interval, delay, downstream_hashrate, diff --git a/src/main.rs b/src/main.rs index 9a8068bb..0beb91f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,7 @@ use key_utils::Secp256k1PublicKey; use lazy_static::lazy_static; use proxy_state::{PoolState, ProxyState, TpState, TranslatorState}; use self_update::{backends, cargo_crate_version, update::UpdateStatus, TempDir}; +use std::sync::Arc; use std::{net::SocketAddr, time::Duration}; use tokio::sync::mpsc::channel; use tracing::{debug, error, info, warn}; @@ -121,9 +122,14 @@ async fn main() { _ => unreachable!(), }); - let mut router = router::Router::new(pool_addresses, auth_pub_k, None, None); + let mut router = router::Router::new(pool_addresses, auth_pub_k, None, None).await; let epsilon = Duration::from_millis(30_000); - let best_upstream = router.select_pool_connect().await; + let best_upstream = if router.is_multi_upstream() { + None + } else { + router.select_pool_connect().await + }; + initialize_proxy(&mut router, best_upstream, epsilon).await; info!("exiting"); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; @@ -136,121 +142,353 @@ async fn initialize_proxy( ) { loop { let stats_sender = api::stats::StatsSender::new(); - let (send_to_pool, recv_from_pool, pool_connection_abortable) = - match router.connect_pool(pool_addr).await { - Ok(connection) => connection, - Err(_) => { - error!("No upstream available. Retrying in 5 seconds..."); - warn!( - "Please make sure the your token {} is correct", - Configuration::token().expect("Token is not set") - ); - let secs = 5; - tokio::time::sleep(Duration::from_secs(secs)).await; - continue; + let is_multi_upstream = router.is_multi_upstream(); + + if is_multi_upstream { + // Multi-upstream: connect to all pools and start components for each + let pool_connections = router.connect_all_pools().await; + + // Handle connection results + let mut any_success = false; + let mut abort_handles = Vec::new(); + + // Create per-pool translator stacks + let mut pool_translators = Vec::new(); + + // For each pool, create a complete translator stack + for (pool_id, pool_addr, result) in pool_connections { + match result { + Ok((send_to_pool, recv_from_pool, pool_connection_abortable)) => { + info!( + "Successfully connected to pool {} (ID: {})", + pool_addr, pool_id + ); + any_success = true; + + // Create per-pool downstream channels + let (pool_downs_tx, pool_downs_rx) = channel(10); + + // Create per-pool translator channels + let (translator_up_tx, mut translator_up_rx) = channel(10); + + // Start translator for this specific pool + let translator_abortable = match translator::start( + pool_downs_rx, + translator_up_tx, + stats_sender.clone(), + Arc::new(router.clone()), + pool_addr, + ) + .await + { + Ok(abortable) => abortable, + Err(e) => { + error!( + "Impossible to initialize translator for pool {}: {e}", + pool_addr + ); + continue; + } + }; + + // Get translator channels for this pool + let (jdc_to_translator_sender, jdc_from_translator_receiver, _) = + match translator_up_rx.recv().await { + Some(val) => val, + None => { + error!( + "Translator failed before initialization for pool {}", + pool_addr + ); + continue; + } + }; + + // Set up JDC and share accounter for this pool + let (from_jdc_to_share_accounter_send, from_jdc_to_share_accounter_recv) = + channel(10); + let (from_share_accounter_to_jdc_send, from_share_accounter_to_jdc_recv) = + channel(10); + + let tp = match TP_ADDRESS.safe_lock(|tp| tp.clone()) { + Ok(tp) => tp, + Err(e) => { + error!("TP_ADDRESS Mutex Corrupted: {e}"); + continue; + } + }; + + let jdc_abortable: Option; + let share_accounter_abortable; + if let Some(_tp_addr) = tp { + jdc_abortable = jd_client::start( + jdc_from_translator_receiver, + jdc_to_translator_sender, + from_share_accounter_to_jdc_recv, + from_jdc_to_share_accounter_send, + ) + .await; + if jdc_abortable.is_none() { + ProxyState::update_tp_state(TpState::Down); + }; + share_accounter_abortable = match share_accounter::start( + from_jdc_to_share_accounter_recv, + from_share_accounter_to_jdc_send, + recv_from_pool, + send_to_pool.clone(), + ) + .await + { + Ok(abortable) => abortable, + Err(_) => { + error!( + "Failed to start share_accounter for pool {}", + pool_addr + ); + continue; + } + } + } else { + jdc_abortable = None; + share_accounter_abortable = match share_accounter::start( + jdc_from_translator_receiver, + jdc_to_translator_sender, + recv_from_pool, + send_to_pool.clone(), + ) + .await + { + Ok(abortable) => abortable, + Err(_) => { + error!( + "Failed to start share_accounter for pool {}", + pool_addr + ); + continue; + } + }; + } + + // Store pool translator info for downstream distribution + pool_translators.push((pool_id, pool_addr, pool_downs_tx)); + + // Collect abort handles for this pool + abort_handles.push(( + pool_connection_abortable, + format!("pool_connection_{}", pool_id), + )); + abort_handles + .push((translator_abortable, format!("translator_{}", pool_id))); + abort_handles.push(( + share_accounter_abortable, + format!("share_accounter_{}", pool_id), + )); + if let Some(jdc_handle) = jdc_abortable { + abort_handles.push((jdc_handle, format!("jdc_{}", pool_id))); + } + } + Err(e) => { + error!( + "Failed to connect to pool {} (ID: {}): {:?}", + pool_addr, pool_id, e + ); + } } - }; + } - let (downs_sv1_tx, downs_sv1_rx) = channel(10); - let sv1_ingress_abortable = ingress::sv1_ingress::start_listen_for_downstream(downs_sv1_tx); + // Start SV1 ingress with custom downstream distribution + let (downs_sv1_tx, downs_sv1_rx) = channel(10); + let sv1_ingress_abortable = + ingress::sv1_ingress::start_listen_for_downstream(downs_sv1_tx); + abort_handles.push((sv1_ingress_abortable, "sv1_ingress".to_string())); - let (translator_up_tx, mut translator_up_rx) = channel(10); - let translator_abortable = - match translator::start(downs_sv1_rx, translator_up_tx, stats_sender.clone()).await { - Ok(abortable) => abortable, - Err(e) => { - error!("Impossible to initialize translator: {e}"); - // Impossible to start the proxy so we restart proxy - ProxyState::update_translator_state(TranslatorState::Down); - ProxyState::update_tp_state(TpState::Down); - return; + // Start downstream distribution task that routes miners to appropriate pools + let router_for_distribution = router.clone(); + let pool_translators_for_distribution = pool_translators.clone(); + let distribution_task = tokio::spawn(async move { + let mut recv = downs_sv1_rx; + + while let Some((send_to_downstream, recv_from_downstream, ip_addr)) = + recv.recv().await + { + info!("New downstream connection from {}", ip_addr); + + // Calling router to assign pool + if let Some(assigned_pool) = + router_for_distribution.assign_miner_to_pool().await + { + // Send to the appropriate translator + if let Some((_, _, pool_downs_tx)) = pool_translators_for_distribution + .iter() + .find(|(_, addr, _)| *addr == assigned_pool) + { + if let Err(e) = pool_downs_tx + .send((send_to_downstream, recv_from_downstream, ip_addr)) + .await + { + error!( + "Failed to send downstream connection to pool {}: {}", + assigned_pool, e + ); + } + } + } else { + error!("Could not assign miner from {} to any pool", ip_addr); + } } - }; + }); + abort_handles.push(( + distribution_task.into(), + "downstream_distribution".to_string(), + )); + + if !any_success { + error!("No upstream available. Retrying in 5 seconds..."); + warn!( + "Please make sure the your token {} is correct", + Configuration::token().expect("Token is not set") + ); + let secs = 5; + tokio::time::sleep(Duration::from_secs(secs)).await; + // Restart loop, esentially restarting proxy - let (from_jdc_to_share_accounter_send, from_jdc_to_share_accounter_recv) = channel(10); - let (from_share_accounter_to_jdc_send, from_share_accounter_to_jdc_recv) = channel(10); - let (jdc_to_translator_sender, jdc_from_translator_receiver, _) = translator_up_rx - .recv() - .await - .expect("Translator failed before initialization"); - - let jdc_abortable: Option; - let share_accounter_abortable; - let tp = match TP_ADDRESS.safe_lock(|tp| tp.clone()) { - Ok(tp) => tp, - Err(e) => { - error!("TP_ADDRESS Mutex Corrupted: {e}"); - return; + continue; } - }; - if let Some(_tp_addr) = tp { - jdc_abortable = jd_client::start( - jdc_from_translator_receiver, - jdc_to_translator_sender, - from_share_accounter_to_jdc_recv, - from_jdc_to_share_accounter_send, - ) - .await; - if jdc_abortable.is_none() { - ProxyState::update_tp_state(TpState::Down); + let server_handle = tokio::spawn(api::start(router.clone(), stats_sender)); + match monitor(router, abort_handles, epsilon, server_handle).await { + Reconnect::NewUpstream(_) | Reconnect::NoUpstream => { + ProxyState::update_proxy_state_up(); + continue; + } }; - share_accounter_abortable = match share_accounter::start( - from_jdc_to_share_accounter_recv, - from_share_accounter_to_jdc_send, - recv_from_pool, - send_to_pool, + } else { + // Single-upstream: use the best pool passed in (or re-select if needed) + let (send_to_pool, recv_from_pool, pool_connection_abortable) = + match router.connect_pool(pool_addr).await { + Ok(connection) => connection, + Err(_) => { + error!("No upstream available. Retrying in 5 seconds..."); + warn!( + "Please make sure the your token {} is correct", + Configuration::token().expect("Token is not set") + ); + let secs = 5; + tokio::time::sleep(Duration::from_secs(secs)).await; + // Restart loop, esentially restarting proxy + continue; + } + }; + + let (downs_sv1_tx, downs_sv1_rx) = channel(10); + let sv1_ingress_abortable = + ingress::sv1_ingress::start_listen_for_downstream(downs_sv1_tx); + + let (translator_up_tx, mut translator_up_rx) = channel(10); + let translator_abortable = match translator::start( + downs_sv1_rx, + translator_up_tx, + stats_sender.clone(), + Arc::new(router.clone()), + pool_addr.expect("Best latency pool address should be available"), ) .await { Ok(abortable) => abortable, - Err(_) => { - error!("Failed to start share_accounter"); + Err(e) => { + error!("Impossible to initialize translator: {e}"); + // Impossible to start the proxy so we restart proxy + ProxyState::update_translator_state(TranslatorState::Down); + ProxyState::update_tp_state(TpState::Down); return; } - } - } else { - jdc_abortable = None; + }; - share_accounter_abortable = match share_accounter::start( - jdc_from_translator_receiver, - jdc_to_translator_sender, - recv_from_pool, - send_to_pool, - ) - .await - { - Ok(abortable) => abortable, - Err(_) => { - error!("Failed to start share_accounter"); + let (from_jdc_to_share_accounter_send, from_jdc_to_share_accounter_recv) = channel(10); + let (from_share_accounter_to_jdc_send, from_share_accounter_to_jdc_recv) = channel(10); + let (jdc_to_translator_sender, jdc_from_translator_receiver, _) = translator_up_rx + .recv() + .await + .expect("Translator failed before initialization"); + + let jdc_abortable: Option; + let share_accounter_abortable; + let tp = match TP_ADDRESS.safe_lock(|tp| tp.clone()) { + Ok(tp) => tp, + Err(e) => { + error!("TP_ADDRESS Mutex Corrupted: {e}"); return; } }; - }; - // Collecting all abort handles - let mut abort_handles = vec![ - (pool_connection_abortable, "pool_connection".to_string()), - (sv1_ingress_abortable, "sv1_ingress".to_string()), - (translator_abortable, "translator".to_string()), - (share_accounter_abortable, "share_accounter".to_string()), - ]; - if let Some(jdc_handle) = jdc_abortable { - abort_handles.push((jdc_handle, "jdc".to_string())); - } - let server_handle = tokio::spawn(api::start(router.clone(), stats_sender)); - match monitor(router, abort_handles, epsilon, server_handle).await { - Reconnect::NewUpstream(new_pool_addr) => { - ProxyState::update_proxy_state_up(); - pool_addr = Some(new_pool_addr); - continue; + if let Some(_tp_addr) = tp { + jdc_abortable = jd_client::start( + jdc_from_translator_receiver, + jdc_to_translator_sender, + from_share_accounter_to_jdc_recv, + from_jdc_to_share_accounter_send, + ) + .await; + if jdc_abortable.is_none() { + ProxyState::update_tp_state(TpState::Down); + }; + share_accounter_abortable = match share_accounter::start( + from_jdc_to_share_accounter_recv, + from_share_accounter_to_jdc_send, + recv_from_pool, + send_to_pool, + ) + .await + { + Ok(abortable) => abortable, + Err(_) => { + error!("Failed to start share_accounter for pool {:?}", pool_addr); + return; + } + } + } else { + jdc_abortable = None; + + share_accounter_abortable = match share_accounter::start( + jdc_from_translator_receiver, + jdc_to_translator_sender, + recv_from_pool, + send_to_pool, + ) + .await + { + Ok(abortable) => abortable, + Err(_) => { + error!("Failed to start share_accounter for pool {:?}", pool_addr); + return; + } + }; + }; + + // Collecting all abort handles + let mut abort_handles = vec![ + (pool_connection_abortable, "pool_connection".to_string()), + (sv1_ingress_abortable, "sv1_ingress".to_string()), + (translator_abortable, "translator".to_string()), + (share_accounter_abortable, "share_accounter".to_string()), + ]; + if let Some(jdc_handle) = jdc_abortable { + abort_handles.push((jdc_handle, "jdc".to_string())); } - Reconnect::NoUpstream => { - ProxyState::update_proxy_state_up(); - pool_addr = None; - continue; + let server_handle = tokio::spawn(api::start(router.clone(), stats_sender)); + match monitor(router, abort_handles, epsilon, server_handle).await { + Reconnect::NewUpstream(new_pool_addr) => { + ProxyState::update_proxy_state_up(); + pool_addr = Some(new_pool_addr); + continue; + } + Reconnect::NoUpstream => { + ProxyState::update_proxy_state_up(); + pool_addr = None; + continue; + } } - }; + } } } diff --git a/src/minin_pool_connection/mod.rs b/src/minin_pool_connection/mod.rs index 3565a94d..5bf01894 100644 --- a/src/minin_pool_connection/mod.rs +++ b/src/minin_pool_connection/mod.rs @@ -90,12 +90,12 @@ pub async fn connect_pool( let (send_to_down, recv_from_down) = tokio::sync::mpsc::channel(10); let (send_from_down, recv_to_up) = tokio::sync::mpsc::channel(10); - let relay_up_task = relay_up(recv_to_up, sender); + let relay_up_task = relay_up(recv_to_up, sender, address); TaskManager::add_sv2_relay_up(task_manager.clone(), relay_up_task) .await .map_err(|_| Error::MiningPoolTaskManagerFailed)?; - let relay_down_task = relay_down(receiver, send_to_down); + let relay_down_task = relay_down(receiver, send_to_down, address); TaskManager::add_sv2_relay_down(task_manager.clone(), relay_down_task) .await .map_err(|_| Error::MiningPoolTaskManagerFailed)?; @@ -108,6 +108,7 @@ pub async fn connect_pool( pub fn relay_up( mut recv: Receiver>, send: Sender, + pool_address: SocketAddr, ) -> AbortOnDrop { let task = tokio::spawn(async move { while let Some(msg) = recv.recv().await { @@ -115,12 +116,15 @@ pub fn relay_up( if let Ok(std_frame) = std_frame { let either_frame: EitherFrame = std_frame.into(); if send.send(either_frame).await.is_err() { - error!("Mining upstream failed"); + error!("Failed to send message to pool {}", pool_address); ProxyState::update_pool_state(PoolState::Down); break; }; } else { - panic!("Internal Mining downstream try to send invalid message"); + panic!( + "Internal Mining downstream tried to send invalid message to pool {}", + pool_address + ); } } }); @@ -130,6 +134,7 @@ pub fn relay_up( pub fn relay_down( mut recv: Receiver, send: Sender>, + pool_address: SocketAddr, ) -> AbortOnDrop { let task = tokio::spawn(async move { while let Some(msg) = recv.recv().await { @@ -144,25 +149,34 @@ pub fn relay_down( if let Ok(msg) = msg { let msg = msg.into_static(); if send.send(msg).await.is_err() { - error!("Internal Mining downstream not available"); + error!( + "Internal Mining downstream not available for pool {}", + pool_address + ); // Update Proxy state to reflect Internal inconsistency ProxyState::update_inconsistency(Some(1)); } } else { - error!("Mining Upstream send non Mining message. Disconnecting"); + error!( + "Pool {} sent non-Mining message. Disconnecting", + pool_address + ); break; } } else { - error!("Mining Upstream send invalid message no header. Disconnecting"); + error!( + "Pool {} sent invalid message with no header. Disconnecting", + pool_address + ); break; } } else { - error!("Mining Upstream down."); + error!("Pool {} connection down", pool_address); break; } } - error!("Failed to receive msg from Pool"); + error!("Failed to receive msg from Pool {}", pool_address); ProxyState::update_pool_state(PoolState::Down); }); task.into() diff --git a/src/router/mod.rs b/src/router/mod.rs index 35925364..495e0143 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -10,6 +10,8 @@ use demand_sv2_connection::noise_connection_tokio::Connection; use key_utils::Secp256k1PublicKey; use noise_sv2::Initiator; use roles_logic_sv2::{common_messages_sv2::SetupConnection, parsers::Mining}; +use std::sync::atomic::{AtomicU32, Ordering}; + use tokio::{ net::TcpStream, sync::{ @@ -20,11 +22,14 @@ use tokio::{ use tracing::{error, info}; use crate::{ + config::{Configuration, PoolConfig}, minin_pool_connection::{self, get_mining_setup_connection_msg, mining_setup_connection}, shared::utils::AbortOnDrop, }; /// Router handles connection to Multiple upstreams. +use std::sync::Arc; + #[derive(Clone)] pub struct Router { pool_addresses: Vec, @@ -34,11 +39,13 @@ pub struct Router { timer: Option, latency_tx: watch::Sender>, pub latency_rx: watch::Receiver>, + multi_pool_configs: Option>, + pub weighted_dist: Option>>, } impl Router { /// Creates a new `Router` instance with the specified upstream addresses. - pub fn new( + pub async fn new( pool_addresses: Vec, auth_pub_k: Secp256k1PublicKey, // Configuration msg used to setup connection between client and pool @@ -49,6 +56,12 @@ impl Router { timer: Option, ) -> Self { let (latency_tx, latency_rx) = watch::channel(None); + + let multi_pool_configs = Configuration::pool_configs().await; + let weighted_dist = multi_pool_configs + .as_ref() + .map(|configs| Arc::new((0..configs.len()).map(|_| AtomicU32::new(0)).collect())); + Self { pool_addresses, current_pool: None, @@ -57,6 +70,62 @@ impl Router { timer, latency_tx, latency_rx, + multi_pool_configs, + weighted_dist, + } + } + + pub fn is_multi_upstream(&self) -> bool { + self.multi_pool_configs + .as_ref() + .is_some_and(|configs| configs.len() > 1) + } + + pub async fn assign_miner_to_pool(&self) -> Option { + if self.is_multi_upstream() { + if let Some(configs) = &self.multi_pool_configs { + let assignments = self.weighted_dist.as_ref().unwrap(); + let total_miners: u32 = assignments.iter().map(|a| a.load(Ordering::Relaxed)).sum(); + + let mut best_pool_id = 0; + let mut best_difference = f32::NEG_INFINITY; + + for (pool_id, config) in configs.iter().enumerate() { + let current_count = assignments[pool_id].load(Ordering::Relaxed); + let current_ratio = if total_miners == 0 { + 0.0 + } else { + current_count as f32 / total_miners as f32 + }; + let target_ratio = config.weight; + let difference = target_ratio - current_ratio; + + if difference > best_difference { + best_difference = difference; + best_pool_id = pool_id; + } + } + + // Assign to most under-represented pool + let new_count = assignments[best_pool_id].fetch_add(1, Ordering::Relaxed) + 1; + let total_after = total_miners + 1; + let selected_pool = configs[best_pool_id].address; + + info!( + "✅ Miner {} → Pool {} ({}) [Pool:{}, Total:{}]", + total_after, best_pool_id, selected_pool, new_count, total_after + ); + + Some(selected_pool) + } else { + // Fallback to existing logic + self.current_pool + .or_else(|| self.pool_addresses.first().copied()) + } + } else { + // Single pool fallback + self.current_pool + .or_else(|| self.pool_addresses.first().copied()) } } @@ -232,6 +301,10 @@ impl Router { /// Checks for faster upstream switch to it if found pub async fn monitor_upstream(&mut self, epsilon: Duration) -> Option { + if self.is_multi_upstream() { + info!("Multi-upstream mode: No monitoring required"); + return None; + } if let Some(best_pool) = self.select_pool_monitor(epsilon).await { if Some(best_pool) != self.current_pool { info!("Switching to faster upstreamn {:?}", best_pool); @@ -242,6 +315,55 @@ impl Router { } None } + + /// Connect to all configured pools + pub async fn connect_all_pools( + &self, + ) -> Vec<( + usize, + SocketAddr, + Result< + ( + tokio::sync::mpsc::Sender>, + tokio::sync::mpsc::Receiver>, + crate::shared::utils::AbortOnDrop, + ), + crate::minin_pool_connection::errors::Error, + >, + )> { + if let Some(configs) = &self.multi_pool_configs { + let mut connections = Vec::new(); + for (id, config) in configs.iter().enumerate() { + info!( + "Connecting to pool {} with weight {}", + config.address, config.weight + ); + let result = self.clone().connect_pool(Some(config.address)).await; + connections.push((id, config.address, result)); + } + connections + } else { + Vec::new() + } + } + + /// Removes a miner from the assigned pool, allowing for rebalancing + pub async fn remove_miner_from_pool(&self, pool_address: SocketAddr) { + if let Some(configs) = &self.multi_pool_configs { + if let Some(pool_id) = configs.iter().position(|c| c.address == pool_address) { + let assignments = self.weighted_dist.as_ref().unwrap(); + let old = assignments[pool_id].load(Ordering::Relaxed); + if old > 0 { + let new = assignments[pool_id].fetch_sub(1, Ordering::Relaxed) - 1; + let total: u32 = assignments.iter().map(|a| a.load(Ordering::Relaxed)).sum(); + info!( + "Miner disconnected from Pool {}. New: {}/{} total", + pool_id, new, total + ); + } + } + } + } } /// Track latencies for various stages of pool connection setup. @@ -316,9 +438,10 @@ impl PoolLatency { return Err(()); } - let relay_up_task = minin_pool_connection::relay_up(recv_to_up, sender); + let relay_up_task = + minin_pool_connection::relay_up(recv_to_up, sender, self.pool); let relay_down_task = - minin_pool_connection::relay_down(receiver, send_to_down); + minin_pool_connection::relay_down(receiver, send_to_down, self.pool); let timer = Instant::now(); let mut received_new_job = false; diff --git a/src/translator/downstream/accept_connection.rs b/src/translator/downstream/accept_connection.rs index 7aa179c3..ca61f99b 100644 --- a/src/translator/downstream/accept_connection.rs +++ b/src/translator/downstream/accept_connection.rs @@ -24,6 +24,9 @@ pub async fn start_accept_connection( upstream_difficulty_config: Arc>, mut downstreams: Receiver<(Sender, Receiver, IpAddr)>, stats_sender: crate::api::stats::StatsSender, + router: Arc, + //for logging purposes + pool_address: std::net::SocketAddr, ) -> Result<(), Error<'static>> { let handle = { let task_manager = task_manager.clone(); @@ -36,13 +39,13 @@ pub async fn start_accept_connection( // The initial difficulty is derived from the formula: difficulty = hash_rate / (shares_per_second * 2^32) let initial_hash_rate = *crate::EXPECTED_SV1_HASHPOWER; info!( - "Translator initial hash rate for ip {} is {} H/s", - addr, initial_hash_rate + "Pool {}: Translator initial hash rate for ip {} is {} H/s", + pool_address, addr, initial_hash_rate ); let share_per_second = crate::SHARE_PER_MIN / 60.0; info!( - "Translator share per second for ip {} is {} shares/s", - addr, share_per_second + "Pool {}: Translator share per second for ip {} is {} shares/s", + pool_address, addr, share_per_second ); let initial_difficulty = initial_hash_rate / (share_per_second * 2f32.powf(32.0)); let initial_difficulty = @@ -50,15 +53,15 @@ pub async fn start_accept_connection( initial_difficulty, ); info!( - "Translator initial difficulty for ip {} is {}", - addr, initial_difficulty + "Pool {}: Translator initial difficulty for ip {} is {}", + pool_address, addr, initial_difficulty ); // Formula: expected_hash_rate = (shares_per_second) * initial_difficulty * 2^32, where shares_per_second = SHARE_PER_MIN / 60 let expected_hash_rate = (crate::SHARE_PER_MIN / 60.0) * initial_difficulty * 2f32.powf(32.0); info!( - "Translator expected hash rate for ip {} is {} H/s", - addr, expected_hash_rate + "Pool {}: Translator expected hash rate for ip {} is {} H/s", + pool_address, addr, expected_hash_rate ); match Bridge::ready(&bridge).await { @@ -70,6 +73,7 @@ pub async fn start_accept_connection( break; } }; + let open_sv1_downstream = match bridge.safe_lock(|s| s.on_new_sv1_connection(expected_hash_rate)) { Ok(sv1_downstream) => sv1_downstream, @@ -82,8 +86,8 @@ pub async fn start_accept_connection( match open_sv1_downstream { Ok(opened) => { info!( - "Translator opening connection for ip {} with id {}", - addr, opened.channel_id + "Pool {}: Translator opening connection for ip {} with id {}", + pool_address, addr, opened.channel_id ); Downstream::new_downstream( opened.channel_id, @@ -99,6 +103,8 @@ pub async fn start_accept_connection( task_manager.clone(), initial_difficulty, stats_sender.clone(), + router.clone(), + pool_address, ) .await } diff --git a/src/translator/downstream/diff_management.rs b/src/translator/downstream/diff_management.rs index 80a48d17..4ce1e12f 100644 --- a/src/translator/downstream/diff_management.rs +++ b/src/translator/downstream/diff_management.rs @@ -61,20 +61,44 @@ impl Downstream { /// aggregated channel hashrate. pub fn remove_downstream_hashrate_from_channel( self_: &Arc>, + router: Option>, ) -> ProxyResult<'static, ()> { - let (upstream_diff, estimated_downstream_hash_rate) = self_.safe_lock(|d| { + let ( + upstream_diff, + estimated_downstream_hash_rate, + assigned_pool, + connection_id, + pool_address, + ) = self_.safe_lock(|d| { ( d.upstream_difficulty_config.clone(), d.difficulty_mgmt.estimated_downstream_hash_rate, + d.assigned_pool, + d.connection_id, + d.pool_address, ) })?; info!( - "Removing downstream hashrate from channel upstream_diff: {:?}, downstream_diff: {:?}", - upstream_diff, estimated_downstream_hash_rate + "Pool {}: Removing downstream hashrate from channel upstream_diff: {:?}, downstream_diff: {:?}", + pool_address, upstream_diff, estimated_downstream_hash_rate ); + + // Remove miner from pool assignment when they disconnect + if let Some(router) = router { + if let Some(pool_addr) = assigned_pool { + tokio::spawn(async move { + router.remove_miner_from_pool(pool_addr).await; + }); + info!( + "Pool {}: REMOVED: Miner {} disconnected", + pool_address, connection_id + ); + } + } + upstream_diff.safe_lock(|u| { u.channel_nominal_hashrate -= - // Make sure that upstream channel hasrate never goes below 0 + // Make sure that upstream channel hashrate never goes below 0 f32::min(estimated_downstream_hash_rate, u.channel_nominal_hashrate); })?; Ok(()) @@ -456,6 +480,7 @@ mod test { time: HexU32Be(5609), clean_jobs: true, }; + let pool_address = "127.0.0.1:4444".parse().unwrap(); let mut downstream = Downstream::new( 1, vec![], @@ -469,6 +494,8 @@ mod test { Arc::new(Mutex::new(upstream_config)), crate::api::stats::StatsSender::new(), first_job, + None, + pool_address, ); downstream.difficulty_mgmt.estimated_downstream_hash_rate = start_hashrate as f32; diff --git a/src/translator/downstream/downstream.rs b/src/translator/downstream/downstream.rs index dfc72877..65099fcf 100644 --- a/src/translator/downstream/downstream.rs +++ b/src/translator/downstream/downstream.rs @@ -30,7 +30,7 @@ use rand::Rng; use server_to_client::Notify; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, - net::IpAddr, + net::{IpAddr, SocketAddr}, sync::Arc, }; use sv1_api::{ @@ -116,6 +116,9 @@ pub struct Downstream { pub(super) stats_sender: StatsSender, pub recent_jobs: RecentJobs, pub first_job: Notify<'static>, + pub assigned_pool: Option, + /// Pool address for logging purposes + pub pool_address: std::net::SocketAddr, pub share_monitor: SharesMonitor, } @@ -136,6 +139,8 @@ impl Downstream { task_manager: Arc>, initial_difficulty: f32, stats_sender: StatsSender, + router: Arc, + pool_address: std::net::SocketAddr, ) { assert!(last_notify.is_some()); @@ -181,6 +186,25 @@ impl Downstream { initial_difficulty, }; + let mut recent_notifies = VecDeque::with_capacity(2); + if let Some(notify) = last_notify.clone() { + recent_notifies.push_back(notify); + } + // Add this check to prevent router calls in multi-upstream mode + let assigned_pool = if !router.is_multi_upstream() { + // Only assign pool in single-upstream mode + router.assign_miner_to_pool().await + } else { + // In multi-upstream mode, pool is already determined by distribution + // Skip router calls entirely + None + }; + if let Some(pool_addr) = assigned_pool { + info!( + "New miner (ID: {}) assigned to pool {}", + connection_id, pool_addr + ); + } let downstream = Arc::new(Mutex::new(Downstream { connection_id, authorized_names: vec![], @@ -196,6 +220,8 @@ impl Downstream { stats_sender, recent_jobs: RecentJobs::new(), first_job: last_notify.expect("we have an assertion at the beginning of this function"), + assigned_pool, + pool_address, share_monitor: SharesMonitor::new(), })); @@ -204,6 +230,7 @@ impl Downstream { downstream.clone(), recv_from_down, connection_id, + router.clone(), ) .await { @@ -230,6 +257,7 @@ impl Downstream { rx_sv1_notify, host.clone(), connection_id, + router.clone(), ) .await { @@ -272,6 +300,8 @@ impl Downstream { upstream_difficulty_config: Arc>, downstreams: Receiver<(Sender, Receiver, IpAddr)>, stats_sender: StatsSender, + router: Arc, + pool_address: std::net::SocketAddr, ) -> Result> { let task_manager = TaskManager::initialize(); let abortable = task_manager @@ -286,6 +316,8 @@ impl Downstream { upstream_difficulty_config, downstreams, stats_sender, + router, + pool_address, ) .await { @@ -380,6 +412,8 @@ impl Downstream { upstream_difficulty_config: Arc>, stats_sender: StatsSender, first_job: Notify<'static>, + assigned_pool: Option, + pool_address: std::net::SocketAddr, ) -> Self { use crate::monitor::shares::SharesMonitor; @@ -398,6 +432,8 @@ impl Downstream { first_job, stats_sender, recent_jobs: RecentJobs::new(), + assigned_pool, + pool_address, share_monitor: SharesMonitor::new(), } } @@ -465,8 +501,8 @@ impl IsServer<'static> for Downstream { /// Only [Submit](client_to_server::Submit) requests for authorized user names can be submitted. fn handle_submit(&self, request: &client_to_server::Submit<'static>) -> bool { info!( - "Handling mining.submit request {} from {} with job_id {}, nonce: {:?}", - request.id, request.user_name, request.job_id, request.nonce + "Pool {}: Handling mining.submit request {} from {} with job_id {}, nonce: {:?}", + self.pool_address, request.id, request.user_name, request.job_id, request.nonce ); let mut request = request.clone(); @@ -488,8 +524,10 @@ impl IsServer<'static> for Downstream { self.stats_sender.update_rejected_shares(self.connection_id); return false; } + let job_id = job_id_as_number.clone().expect("checked above") as i64; crate::translator::utils::update_share_count(self.connection_id); // update share count + if let Some(job) = self .recent_jobs .get_matching_job(job_id_as_number.expect("checked above")) @@ -502,6 +540,7 @@ impl IsServer<'static> for Downstream { &self.difficulty_mgmt.current_difficulties, self.extranonce1.clone(), self.version_rolling_mask.clone(), + self.pool_address, ) { // Only forward upstream if the share meets the latest difficulty if let Some(latest_difficulty) = self.difficulty_mgmt.current_difficulties.back() { diff --git a/src/translator/downstream/notify.rs b/src/translator/downstream/notify.rs index 2250e274..32b04206 100644 --- a/src/translator/downstream/notify.rs +++ b/src/translator/downstream/notify.rs @@ -17,6 +17,7 @@ pub async fn start_notify( mut rx_sv1_notify: broadcast::Receiver>, host: String, connection_id: u32, + router: Arc, ) -> Result<(), Error<'static>> { let handle = { let task_manager = task_manager.clone(); @@ -123,6 +124,9 @@ pub async fn start_notify( Downstream::send_message_downstream(downstream.clone(), message).await; } } + // TODO here we want to be sure that on drop this is called + let _ = Downstream::remove_downstream_hashrate_from_channel(&downstream, Some(router)); + // TODO here we want to kill the tasks warn!( "Downstream: Shutting down sv1 downstream job notifier for {}", &host diff --git a/src/translator/downstream/receive_from_downstream.rs b/src/translator/downstream/receive_from_downstream.rs index 97265d2f..48b07268 100644 --- a/src/translator/downstream/receive_from_downstream.rs +++ b/src/translator/downstream/receive_from_downstream.rs @@ -15,6 +15,7 @@ pub async fn start_receive_downstream( downstream: Arc>, mut recv_from_down: mpsc::Receiver, connection_id: u32, + router: Arc, ) -> Result<(), Error<'static>> { let task_manager_clone = task_manager.clone(); let handle = task::spawn(async move { @@ -55,9 +56,13 @@ pub async fn start_receive_downstream( connection_id ); - if let Err(e) = Downstream::remove_downstream_hashrate_from_channel(&downstream) { + // Call disconnect handler with router + if let Err(e) = + Downstream::remove_downstream_hashrate_from_channel(&downstream, Some(router)) + { error!("Failed to remove downstream hashrate from channel: {}", e) }; + if task_manager_clone .safe_lock(|tm| tm.abort_tasks_for_connection_id(connection_id)) .is_err() diff --git a/src/translator/mod.rs b/src/translator/mod.rs index b861b46f..84f088ea 100644 --- a/src/translator/mod.rs +++ b/src/translator/mod.rs @@ -35,6 +35,8 @@ pub async fn start( Option
, )>, stats_sender: crate::api::stats::StatsSender, + router: Arc, + pool_address: std::net::SocketAddr, ) -> Result> { let task_manager = TaskManager::initialize(pool_connection.clone()); let abortable = task_manager @@ -102,6 +104,7 @@ pub async fn start( target.clone(), diff_config.clone(), send_to_up, + pool_address, ) .await?; @@ -146,6 +149,7 @@ pub async fn start( extended_extranonce, target, up_id, + pool_address, ) { Ok(b) => b, Err(e) => { @@ -176,6 +180,8 @@ pub async fn start( diff_config, downstreams, stats_sender, + router, + pool_address, ) .await { diff --git a/src/translator/proxy/bridge.rs b/src/translator/proxy/bridge.rs index ba8f49c4..0fc17dc5 100644 --- a/src/translator/proxy/bridge.rs +++ b/src/translator/proxy/bridge.rs @@ -63,6 +63,8 @@ pub struct Bridge { future_jobs: Vec>, last_p_hash: Option>, target: Arc>>, + /// Pool address for logging purposes + pub pool_address: std::net::SocketAddr, } impl Bridge { @@ -84,6 +86,7 @@ impl Bridge { extranonces: ExtendedExtranonce, target: Arc>>, channel_id: u32, + pool_address: std::net::SocketAddr, ) -> Result>, Error<'static>> { info!("Creating new bridge for channel_id {}:", channel_id); let ids = Arc::new(Mutex::new(GroupId::new())); @@ -107,6 +110,7 @@ impl Bridge { future_jobs: vec![], last_p_hash: None, target, + pool_address, }))) } @@ -250,9 +254,14 @@ impl Bridge { let channel_id = share.channel_id; let job_id = share.share.job_id.clone(); let share_id = share.share.id; + + let pool_address = self_ + .safe_lock(|s| s.pool_address) + .map_err(|_| Error::BridgeMutexPoisoned)?; + info!( - "Bridge received share {:?} for channel {:?} and job {:?}", - &share_id, &channel_id, &job_id + "Pool {}: Bridge received share {:?} for channel {:?} and job {:?}", + pool_address, &share_id, &channel_id, &job_id ); let (tx_sv2_submit_shares_ext, target_mutex) = self_ .safe_lock(|s| (s.tx_sv2_submit_shares_ext.clone(), s.target.clone())) @@ -297,8 +306,8 @@ impl Bridge { .unwrap_or("unparsable error code") .to_string(); error!( - "Submit share {} from channel {} and job {} error {}", - &share_id, &channel_id, &job_id, error_code + "Pool {}: Submit share {} from channel {} and job {} error {}", + pool_address, &share_id, &channel_id, &job_id, error_code ); } Ok(OnNewShare::SendSubmitShareUpstream((s, _))) => { @@ -308,9 +317,9 @@ impl Bridge { return Ok(()); } info!( - "Share with id {} meets upstream target from channel {} and job {}", - &share_id, &channel_id, &job_id - ); + "Pool {}: Share with id {} meets upstream target from channel {} and job {}", + pool_address, &share_id, &channel_id, &job_id + ); match s { Share::Extended(share) => { if tx_sv2_submit_shares_ext.send(share).await.is_err() { @@ -331,8 +340,8 @@ impl Bridge { Ok(OnNewShare::RelaySubmitShareUpstream) => unreachable!(), Ok(OnNewShare::ShareMeetDownstreamTarget) => { info!( - "Share with id {} meets downstream target from channel {} and job {}", - &share_id, &channel_id, &job_id + "Pool {}: Share with id {} meets downstream target from channel {} and job {}", + pool_address, &share_id, &channel_id, &job_id ); } // Proxy do not have JD capabilities @@ -663,12 +672,16 @@ mod test { 0, 0, 0, 0, 0, 0, 0, ]; + // Define pool_address for tests + let pool_address = "127.0.0.1:4444".parse().unwrap(); + let b = Bridge::new( tx_sv2_submit_shares_ext.clone(), tx_sv1_notify, extranonces, Arc::new(Mutex::new(upstream_target)), 1, + pool_address, // Add pool_address parameter ) .map_err(|_| ())?; Ok(b) diff --git a/src/translator/upstream/upstream.rs b/src/translator/upstream/upstream.rs index f5b62d5a..cf60b5a6 100644 --- a/src/translator/upstream/upstream.rs +++ b/src/translator/upstream/upstream.rs @@ -86,6 +86,8 @@ pub struct Upstream { // than the configured percentage pub(super) difficulty_config: Arc>, pub sender: TSender>, + /// The address of the pool, used for logging and debugging purposes. + pub pool_address: std::net::SocketAddr, } impl PartialEq for Upstream { @@ -109,6 +111,7 @@ impl Upstream { target: Arc>>, difficulty_config: Arc>, sender: TSender>, + pool_address: std::net::SocketAddr, ) -> ProxyResult<'static, Arc>> { Ok(Arc::new(Mutex::new(Self { extranonce_prefix: None, @@ -123,6 +126,7 @@ impl Upstream { target, difficulty_config, sender, + pool_address, }))) } @@ -300,7 +304,14 @@ impl Upstream { }; } Mining::NewExtendedMiningJob(m) => { - info!("Parsing incoming NewExtendedMiningJob message from Pool for Channel Id: {}", m.channel_id); + let pool_address = match self_.safe_lock(|s| s.pool_address) { + Ok(addr) => addr, + Err(e) => { + error!("Translator upstream mutex poisoned: {e}"); + return; + } + }; + info!("Parsing incoming NewExtendedMiningJob message from Pool {} for Channel Id: {}", pool_address, m.channel_id); let job_id = m.job_id; if let Err(e) = self_.safe_lock(|s| { @@ -509,8 +520,8 @@ impl ParseUpstreamMiningMessages Result, RolesLogicError> { info!( - "Handling OpenExtendedMiningChannelSuccess message from Pool for Channel Id: {}", - m.channel_id + "Pool {}: Handling OpenExtendedMiningChannelSuccess message for Channel Id: {}", + self.pool_address, m.channel_id ); let tproxy_e1_len = proxy_extranonce1_len(m.extranonce_size as usize, self.min_extranonce_size.into()) @@ -528,8 +539,8 @@ impl ParseUpstreamMiningMessages, extranonce1: Vec, version_rolling_mask: Option, + pool_address: std::net::SocketAddr, ) -> Option { info!( - "Validating share from request {} and job {}", - request.id, request.job_id + "Pool {}: Validating share from request {} and job {}", + pool_address, request.id, request.job_id ); let prev_hash_vec: Vec = job.prev_hash.clone().into(); @@ -149,7 +150,11 @@ pub fn validate_share( ); hash.reverse(); //convert to little-endian - info!("Share Hash: {:?}", hash.to_vec().as_hex()); + info!( + "Pool {}: Share Hash: {:?}", + pool_address, + hash.to_vec().as_hex() + ); // Check against difficulties from latest to earliest // TODO: This is not a sound check - We should check against the difficulty of the specific job for &difficulty in difficulties.iter().rev() {