diff --git a/.gitignore b/.gitignore index ea8c4bf7..1e6eccae 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +/cpuminer-opt \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index ce0c4533..882f6823 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,9 +8,11 @@ use std::{ use tracing::{error, info, warn}; use crate::{HashUnit, DEFAULT_SV1_HASHPOWER}; + lazy_static! { pub static ref CONFIG: Configuration = Configuration::load_config(); } + #[derive(Parser)] struct Args { #[clap(long)] @@ -39,12 +41,14 @@ struct Args { config_file: Option, #[clap(long = "api-server-port", short = 's')] api_server_port: Option, + #[clap(long = "hashrate-distribution", value_delimiter = ',')] + hashrate_distribution: Option>, } #[derive(Serialize, Deserialize)] struct ConfigFile { - token: Option, tp_address: Option, + token: Option, pool_addresses: Option>, test_pool_addresses: Option>, interval: Option, @@ -55,6 +59,7 @@ struct ConfigFile { test: Option, listening_addr: Option, api_server_port: Option, + hashrate_distribution: Option>, } pub struct Configuration { @@ -70,7 +75,9 @@ pub struct Configuration { test: bool, listening_addr: Option, api_server_port: String, + hashrate_distribution: Option>, } + impl Configuration { pub fn token() -> Option { CONFIG.token.clone() @@ -137,6 +144,14 @@ impl Configuration { CONFIG.test } + pub fn hashrate_distribution() -> Option> { + CONFIG.hashrate_distribution.clone() + } + + pub fn wants_hashrate_distribution() -> bool { + CONFIG.hashrate_distribution.is_some() + } + // Loads config from CLI, file, or env vars with precedence: CLI > file > env. fn load_config() -> Self { let args = Args::parse(); @@ -157,6 +172,7 @@ impl Configuration { test: None, listening_addr: None, api_server_port: None, + hashrate_distribution: None, }); let token = args @@ -285,6 +301,16 @@ impl Configuration { .unwrap_or("off".to_string()); let test = args.test || config.test.unwrap_or(false) || std::env::var("TEST").is_ok(); + let hashrate_distribution = args + .hashrate_distribution + .or(config.hashrate_distribution) + .or_else(|| { + std::env::var("HASHRATE_DISTRIBUTION").ok().and_then(|s| { + s.split(',') + .map(|s| s.trim().parse::().ok()) + .collect::>>() + }) + }); Configuration { token, @@ -299,6 +325,7 @@ impl Configuration { test, listening_addr, api_server_port, + hashrate_distribution, } } } diff --git a/src/main.rs b/src/main.rs index 10a8c265..fb34d9fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,11 +6,12 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[global_allocator] static GLOBAL: Jemalloc = Jemalloc; +use crate::router::connection_manager::ConnectionManager; // Add this import use crate::shared::utils::AbortOnDrop; use config::Configuration; use key_utils::Secp256k1PublicKey; use lazy_static::lazy_static; -use proxy_state::{PoolState, ProxyState, TpState, TranslatorState}; +use proxy_state::{PoolState, ProxyState, TpState, TranslatorState}; // Add ProxyStates use std::{net::SocketAddr, time::Duration}; use tokio::sync::mpsc::channel; use tracing::{error, info, warn}; @@ -83,20 +84,86 @@ async fn main() { let pool_addresses = Configuration::pool_address() .filter(|p| !p.is_empty()) - .unwrap_or_else(|| { - if Configuration::test() { - panic!("Test pool address is missing"); - } else { - panic!("Pool address is missing"); + .unwrap_or_else(|| panic!("Pool address is missing")); + + // Set downstream hashrate using Configuration pattern + ConnectionManager::set_downstream_hashrate(Configuration::downstream_hashrate()); + + // Get pool addresses with auth keys using Configuration pattern + let pool_address_keys: Vec<(SocketAddr, Secp256k1PublicKey)> = pool_addresses + .iter() + .map(|&addr| (addr, auth_pub_k)) + .collect(); + + // Determine hashrate distribution using Configuration pattern + let wants_distribution = Configuration::wants_hashrate_distribution(); + + // Create router based on configuration + let mut router = if wants_distribution { + // Multi-upstream with distribution + match Router::new_multi(pool_address_keys.clone(), None, None, true).await { + Ok(router) => router, + Err(e) => { + error!("Failed to create multi-upstream router: {}", e); + std::process::exit(1); } - }); + } + } else if pool_address_keys.len() > 1 { + // Multiple pools, latency-based selection + Router::new_with_keys(pool_address_keys.clone(), None, None) + } else { + // Single upstream (backward compatible) + Router::new(pool_addresses, auth_pub_k, None, None) + }; - let mut router = router::Router::new(pool_addresses, auth_pub_k, None, None); let epsilon = Duration::from_millis(30_000); - let best_upstream = router.select_pool_connect().await; - initialize_proxy(&mut router, best_upstream, epsilon).await; - info!("exiting"); - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + + // Handle the three different scenarios + + if wants_distribution { + // Get the distribution from config + let distribution = Configuration::hashrate_distribution().unwrap_or_else(|| { + let count = pool_address_keys.len(); + let equal_percentage = 100.0 / count as f32; + vec![equal_percentage; count] + }); + + info!("Using hashrate distribution: {:?}", distribution); + + // Initialize upstream connections + if let Err(e) = router.initialize_upstream_connections().await { + error!("Failed to initialize upstream connections: {}", e); + std::process::exit(1); + } + + // Set the distribution + if let Err(e) = router.set_hashrate_distribution(distribution.clone()).await { + error!("Failed to set hashrate distribution: {}", e); + std::process::exit(1); + } + + // Wait for connections to establish + tokio::time::sleep(Duration::from_secs(3)).await; + initialize_proxy(&mut router, None, epsilon).await; + } else if pool_address_keys.len() > 1 { + // Test latency and select best pool + let best_upstream = router.select_pool_connect().await; + + if let Some(ref _upstream) = best_upstream { + } else { + error!("Failed to connect to any upstream pool"); + std::process::exit(1); + } + + initialize_proxy(&mut router, best_upstream, epsilon).await; + } else { + let best_upstream = router.select_pool_connect().await; + if best_upstream.is_none() { + error!("Failed to connect to upstream pool"); + std::process::exit(1); + } + initialize_proxy(&mut router, best_upstream, epsilon).await; + } } async fn initialize_proxy( @@ -104,6 +171,111 @@ async fn initialize_proxy( mut pool_addr: Option, epsilon: Duration, ) { + // Check if we're in multi-upstream mode + if router.is_multi_upstream_enabled() { + // Initialize all the same components as single upstream mode + let stats_sender = api::stats::StatsSender::new(); + + // Start SV1 ingress to handle downstream (mining device) connections + let (downs_sv1_tx, downs_sv1_rx) = channel(10); + let sv1_ingress_abortable = ingress::sv1_ingress::start_listen_for_downstream(downs_sv1_tx); + + // Start translator to handle SV1 <-> SV2 translation + let (translator_up_tx, mut translator_up_rx) = channel(10); + let translator_abortable = + match translator::start(downs_sv1_rx, translator_up_tx, stats_sender.clone()).await { + Ok(abortable) => abortable, + Err(e) => { + error!("Impossible to initialize translator: {e}"); + ProxyState::update_translator_state(TranslatorState::Down); + ProxyState::update_tp_state(TpState::Down); + return; + } + }; + + // Get translator channels + let (jdc_to_translator_sender, jdc_from_translator_receiver, _) = translator_up_rx + .recv() + .await + .expect("Translator failed before initialization"); + + // Setup JDC channels with correct types + let (from_jdc_to_share_accounter_send, from_jdc_to_share_accounter_recv) = + channel::>(10); + let (from_share_accounter_to_jdc_send, from_share_accounter_to_jdc_recv) = + channel::>(10); + + // Check if TP is available first + let tp_available = TP_ADDRESS.safe_lock(|tp| tp.clone()).ok().flatten(); + + let (share_accounter_abortable, jdc_abortable) = if let Some(_tp_addr) = tp_available { + // WITH TP: Start JDC first, then share accounting with JDC channels + let jdc_handle = jd_client::start( + jdc_from_translator_receiver, + jdc_to_translator_sender, + from_share_accounter_to_jdc_recv, + from_jdc_to_share_accounter_send, + ) + .await; + + if jdc_handle.is_some() { + // JDC started successfully, use JDC channels + match router + .start_multi_upstream_share_accounting_with_jdc( + from_jdc_to_share_accounter_recv, + from_share_accounter_to_jdc_send, + ) + .await + { + Ok(handle) => (handle, jdc_handle), + Err(e) => { + error!( + "Failed to start multi-upstream share accounting with JDC: {}", + e + ); + return; + } + } + } else { + error!("Failed to start JDC with TP"); + return; + } + } else { + // WITHOUT TP: Use translator channels directly + match router + .start_multi_upstream_share_accounting( + jdc_from_translator_receiver, + jdc_to_translator_sender, + ) + .await + { + Ok(handle) => (handle, None), + Err(e) => { + error!("Failed to start multi-upstream share accounting: {}", e); + return; + } + } + }; + + // Start API server + let server_handle = tokio::spawn(api::start(router.clone(), stats_sender)); + + // Collect abort handles for monitoring + let mut abort_handles = vec![ + (sv1_ingress_abortable, "sv1_ingress".to_string()), + (translator_abortable, "translator".to_string()), + (share_accounter_abortable, "share_accounter".to_string()), + ]; + if let Some(jdc_handle) = jdc_abortable { + abort_handles.push((jdc_handle, "jdc".to_string())); + } + + // Use combined monitoring function + monitor_multi_upstream(router.clone(), abort_handles, server_handle, epsilon).await; + return; + } + + // Single upstream mode only loop { // Initial setup for the proxy let stats_sender = api::stats::StatsSender::new(); @@ -141,13 +313,14 @@ async fn initialize_proxy( } }; - let (from_jdc_to_share_accounter_send, from_jdc_to_share_accounter_recv) = channel(10); - let (from_share_accounter_to_jdc_send, from_share_accounter_to_jdc_recv) = channel(10); + // This creates the channels but translator has no upstream to connect to let (jdc_to_translator_sender, jdc_from_translator_receiver, _) = translator_up_rx .recv() .await .expect("Translator failed before initialization"); + let (from_jdc_to_share_accounter_send, from_jdc_to_share_accounter_recv) = channel(10); + let (from_share_accounter_to_jdc_send, from_share_accounter_to_jdc_recv) = channel(10); let jdc_abortable: Option; let share_accounter_abortable; let tp = match TP_ADDRESS.safe_lock(|tp| tp.clone()) { @@ -224,7 +397,7 @@ async fn initialize_proxy( pool_addr = None; continue; } - }; + } } } @@ -292,6 +465,69 @@ async fn monitor( } } +// Combined monitoring function for multi-upstream mode +async fn monitor_multi_upstream( + router: Router, + abort_handles: Vec<(AbortOnDrop, String)>, + server_handle: tokio::task::JoinHandle<()>, + _epsilon: Duration, +) { + let mut report_counter = 0; + + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + + // Monitor finished tasks (critical components) + if let Some((_handle, name)) = abort_handles + .iter() + .find(|(handle, _name)| handle.is_finished()) + { + if name != "jdc" { + error!("Critical task {:?} failed, restarting", name); + for (handle, _name) in abort_handles { + drop(handle); + } + server_handle.abort(); + return; + } + } + + // Check proxy state - but be more lenient for multi-upstream mode + let is_proxy_down = ProxyState::is_proxy_down(); + if is_proxy_down.0 { + if let Some(ref status) = is_proxy_down.1 { + let status_str = format!("{:?}", status); + if !status_str.contains("Tp(Down)") && !status_str.contains("InternalInconsistency") + { + error!("{:?} is DOWN, restarting", status); + drop(abort_handles); + server_handle.abort(); + return; + } + } + } + + if report_counter >= 10 { + let detailed_stats = router.get_detailed_connection_stats().await; + let active_count = detailed_stats + .iter() + .filter(|(_, active, _)| *active) + .count(); + + if active_count > 0 { + info!("Active pools: {}", active_count); + for (upstream_id, is_active, percentage) in &detailed_stats { + if *is_active && *percentage > 0.0 { + info!(" {} {:.1}%", upstream_id, percentage); + } + } + } + report_counter = 0; + } else { + report_counter += 1; + } + } +} pub enum Reconnect { NewUpstream(std::net::SocketAddr), // Reconnecting with a new upstream NoUpstream, // Reconnecting without upstream diff --git a/src/router/connection_manager.rs b/src/router/connection_manager.rs new file mode 100644 index 00000000..aafe7457 --- /dev/null +++ b/src/router/connection_manager.rs @@ -0,0 +1,98 @@ +use lazy_static::lazy_static; +use roles_logic_sv2::utils::Mutex; +use std::collections::HashMap; +use tracing::{error, info}; + +lazy_static! { + static ref DOWNSTREAM_HASHRATE: Mutex = Mutex::new(0.0); + static ref UPSTREAM_CONNECTIONS: Mutex> = + Mutex::new(HashMap::new()); +} + +#[derive(Debug, Clone)] +pub struct ConnectionInfo { + pub connected: bool, + pub address: std::net::SocketAddr, + #[allow(dead_code)] + pub allocated_percentage: f32, + #[allow(dead_code)] + pub last_seen: std::time::Instant, +} + +pub struct ConnectionManager; + +impl ConnectionManager { + pub fn set_downstream_hashrate(hashrate: f32) { + info!("Setting downstream hashrate to: {} h/s", hashrate); + if DOWNSTREAM_HASHRATE.safe_lock(|h| *h = hashrate).is_err() { + error!("Failed to set downstream hashrate"); + } + } + + pub fn get_downstream_hashrate() -> f32 { + let mut hashrate = 0.0; + if DOWNSTREAM_HASHRATE.safe_lock(|h| hashrate = *h).is_err() { + error!("Failed to get downstream hashrate"); + } + hashrate + } + + pub fn set_upstream_connection(id: &str, info: ConnectionInfo) { + if UPSTREAM_CONNECTIONS + .safe_lock(|conns| { + conns.insert(id.to_string(), info.clone()); + }) + .is_err() + { + error!("Failed to update upstream connection"); + } + + if info.connected { + info!("Upstream {} connected to {}", id, info.address); + } else { + info!("Upstream {} disconnected", id); + } + } + + #[allow(dead_code)] + pub fn get_upstream_connection(id: &str) -> Option { + let mut result = None; + if UPSTREAM_CONNECTIONS + .safe_lock(|conns| { + result = conns.get(id).cloned(); + }) + .is_err() + { + error!("Failed to get upstream connection"); + } + result + } + + #[allow(dead_code)] + pub fn get_all_upstream_connections() -> HashMap { + let mut result = HashMap::new(); + if UPSTREAM_CONNECTIONS + .safe_lock(|conns| { + result = conns.clone(); + }) + .is_err() + { + error!("Failed to get all upstream connections"); + } + result + } + + #[allow(dead_code)] + pub fn update_upstream_percentage(id: &str, percentage: f32) { + if UPSTREAM_CONNECTIONS + .safe_lock(|conns| { + if let Some(info) = conns.get_mut(id) { + info.allocated_percentage = percentage; + } + }) + .is_err() + { + error!("Failed to update upstream percentage"); + } + } +} diff --git a/src/router/mod.rs b/src/router/mod.rs index e6ad0127..ea513066 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -17,47 +17,177 @@ use tokio::{ watch, }, }; -use tracing::{error, info}; +use tracing::{debug, error, info}; use crate::{ minin_pool_connection::{self, get_mining_setup_connection_msg, mining_setup_connection}, + proxy_state::ProxyState, shared::utils::AbortOnDrop, }; +// Add the new modules +pub mod connection_manager; +pub mod multi_upstream_manager; + +pub use connection_manager::ConnectionManager; +pub use multi_upstream_manager::MultiUpstreamManager; + /// Router handles connection to Multiple upstreams. -#[derive(Clone)] pub struct Router { pool_addresses: Vec, + keys: Vec, pub current_pool: Option, + upstream_manager: Option, + + // Keep these fields for backward compatibility with single upstream mode auth_pub_k: Secp256k1PublicKey, setup_connection_msg: Option>, timer: Option, - latency_tx: watch::Sender>, - pub latency_rx: watch::Receiver>, + latency_tx: tokio::sync::watch::Sender>, + pub latency_rx: tokio::sync::watch::Receiver>, +} + +impl Clone for Router { + fn clone(&self) -> Self { + Self { + pool_addresses: self.pool_addresses.clone(), + keys: self.keys.clone(), + current_pool: self.current_pool, + upstream_manager: self.upstream_manager.clone(), + auth_pub_k: self.auth_pub_k, + setup_connection_msg: self.setup_connection_msg.clone(), + timer: self.timer, + latency_tx: self.latency_tx.clone(), + latency_rx: self.latency_rx.clone(), + } + } } impl Router { /// Creates a new `Router` instance with the specified upstream addresses. + /// Now supports multiple pools with different auth keys for latency-based selection pub fn new( pool_addresses: Vec, auth_pub_k: Secp256k1PublicKey, - // Configuration msg used to setup connection between client and pool - // If not, present `get_mining_setup_connection_msg()` is called to generated default values setup_connection_msg: Option>, - // Max duration for pool setup after which it times out. - // If None, default time of 5s is used. timer: Option, ) -> Self { let (latency_tx, latency_rx) = watch::channel(None); + let auth_pub_keys = vec![auth_pub_k; pool_addresses.len()]; + + Self { + pool_addresses, + keys: auth_pub_keys, + current_pool: None, + upstream_manager: None, + auth_pub_k, + setup_connection_msg, + timer, + latency_tx, + latency_rx, + } + } + + /// Creates a new Router with multiple upstream addresses and auth keys + /// Support both latency-based selection and custom distribution + pub fn new_with_keys( + pool_address_keys: Vec<(SocketAddr, Secp256k1PublicKey)>, + setup_connection_msg: Option>, + timer: Option, + ) -> Self { + let pool_addresses: Vec = + pool_address_keys.iter().map(|(addr, _)| *addr).collect(); + let keys: Vec = pool_address_keys.iter().map(|(_, key)| *key).collect(); + let auth_pub_k = keys[0]; + + let (latency_tx, latency_rx) = watch::channel(None); + Self { pool_addresses, + keys, + current_pool: None, + upstream_manager: None, + auth_pub_k, + setup_connection_msg, + timer, + latency_tx, + latency_rx, + } + } + + /// Creates a new Router with multiple upstream addresses and auth keys + /// This now supports both latency-based selection and custom distribution + pub async fn new_multi( + pool_address_keys: Vec<(SocketAddr, Secp256k1PublicKey)>, + setup_connection_msg: Option>, + timer: Option, + use_distribution: bool, + ) -> Result { + let pool_addresses: Vec = + pool_address_keys.iter().map(|(addr, _)| *addr).collect(); + let keys: Vec = pool_address_keys.iter().map(|(_, key)| *key).collect(); + + // Create upstream manager only if we want custom distribution + let upstream_manager = if use_distribution { + Some(MultiUpstreamManager::new( + pool_addresses.clone(), + keys[0], + setup_connection_msg.clone(), + timer, + )) + } else { + None + }; + + let auth_pub_k = keys + .first() + .copied() + .ok_or("No authentication keys provided")?; + + let (latency_tx, latency_rx) = watch::channel(None); + + Ok(Self { + pool_addresses, + keys, current_pool: None, + upstream_manager, auth_pub_k, setup_connection_msg, timer, latency_tx, latency_rx, + }) + } + + /// Get detailed connection statistics + pub async fn get_detailed_connection_stats(&self) -> Vec<(String, bool, f32)> { + if let Some(ref manager) = self.upstream_manager { + manager.get_detailed_connection_stats().await + } else { + vec![] + } + } + + /// Check if multi-upstream is enabled + pub fn is_multi_upstream_enabled(&self) -> bool { + self.upstream_manager.is_some() + } + + /// Checks for faster upstream and switches to it if found + pub async fn monitor_upstream(&mut self, epsilon: Duration) -> Option { + // For multi-upstream mode, we don't switch since we use all simultaneously + if self.is_multi_upstream_enabled() { + return None; + } + + // For single upstream mode, check for better latency + if let Some(best_pool) = self.select_pool_monitor(epsilon).await { + if Some(best_pool) != self.current_pool { + info!("Switching to faster upstream {:?}", best_pool); + return Some(best_pool); + } } + None } /// Internal function to select pool with the least latency. @@ -77,19 +207,6 @@ impl Router { best_pool.map(|pool| (pool, least_latency)) } - /// Select the best pool for connection - pub async fn select_pool_connect(&self) -> Option { - info!("Selecting the best upstream "); - if let Some((pool, latency)) = self.select_pool().await { - info!("Latency for upstream {:?} is {:?}", pool, latency); - self.latency_tx.send_replace(Some(latency)); // update latency - Some(pool) - } else { - //info!("No available pool"); - None - } - } - /// Select the best pool for monitoring async fn select_pool_monitor(&self, epsilon: Duration) -> Option { if let Some((best_pool, best_pool_latency)) = self.select_pool().await { @@ -101,10 +218,9 @@ impl Router { Ok(latency) => latency, Err(e) => { error!("Failed to get latency: {:?}", e); - return None; + return Some(best_pool); } }; - // saturating_sub is used to avoid panic on negative duration result if best_pool_latency < current_latency.saturating_sub(epsilon) { info!( "Found faster pool: {:?} with latency {:?}", @@ -121,7 +237,20 @@ impl Router { None } - /// Selects the best upstream and connects to. + /// Select the best pool for connection + pub async fn select_pool_connect(&mut self) -> Option { + info!("Selecting the best upstream"); + + if let Some((pool, latency)) = self.select_pool().await { + info!("Latency for upstream {:?} is {:?}", pool, latency); + self.latency_tx.send_replace(Some(latency)); + Some(pool) + } else { + None + } + } + + /// Selects the best upstream and connects to it. /// Uses minin_pool_connection::connect_pool pub async fn connect_pool( &mut self, @@ -138,8 +267,6 @@ impl Router { Some(addr) => addr, None => match self.select_pool_connect().await { Some(addr) => addr, - // Called when we initialize the proxy, without a valid pool we can not start mine and we - // return Err None => { return Err(minin_pool_connection::errors::Error::Unrecoverable); } @@ -149,37 +276,122 @@ impl Router { info!("Upstream {:?} selected", pool); + // Find the matching auth key for this address + let auth_pub_key = if let Some(index) = self.pool_addresses.iter().position(|&a| a == pool) + { + self.keys[index] + } else { + self.auth_pub_k + }; + match minin_pool_connection::connect_pool( pool, - self.auth_pub_k, + auth_pub_key, self.setup_connection_msg.clone(), self.timer, ) .await { Ok((send_to_pool, recv_from_pool, pool_connection_abortable)) => { + // Update ConnectionManager with successful connection + let upstream_id = format!( + "upstream-{}", + self.pool_addresses + .iter() + .position(|&a| a == pool) + .unwrap_or(0) + ); + + let connection_info = connection_manager::ConnectionInfo { + connected: true, + address: pool, + allocated_percentage: 100.0, // Single upstream gets 100% + last_seen: std::time::Instant::now(), + }; + ConnectionManager::set_upstream_connection(&upstream_id, connection_info); + + // Update current pool address crate::POOL_ADDRESS .safe_lock(|pool_address| { *pool_address = Some(pool); }) .unwrap_or_else(|_| { error!("Pool address Mutex corrupt"); - crate::proxy_state::ProxyState::update_inconsistency(Some(1)); + ProxyState::update_inconsistency(Some(1)); }); Ok((send_to_pool, recv_from_pool, pool_connection_abortable)) } - Err(e) => Err(e), + Err(e) => { + // Update ConnectionManager with failed connection + let upstream_id = format!( + "upstream-{}", + self.pool_addresses + .iter() + .position(|&a| a == pool) + .unwrap_or(0) + ); + + let connection_info = connection_manager::ConnectionInfo { + connected: false, + address: pool, + allocated_percentage: 0.0, + last_seen: std::time::Instant::now(), + }; + ConnectionManager::set_upstream_connection(&upstream_id, connection_info); + + Err(e) + } + } + } + + /// Start multi-upstream share accounting + pub async fn start_multi_upstream_share_accounting( + &self, + from_translator_recv: tokio::sync::mpsc::Receiver< + roles_logic_sv2::parsers::Mining<'static>, + >, + to_translator_send: tokio::sync::mpsc::Sender>, + ) -> Result> { + if let Some(ref manager) = self.upstream_manager { + manager + .start_multi_upstream_share_accounting(from_translator_recv, to_translator_send) + .await + } else { + Err("Multi-upstream manager not initialized".into()) + } + } + + /// Start multi-upstream share accounting with JDC + pub async fn start_multi_upstream_share_accounting_with_jdc( + &self, + from_jdc_recv: Receiver>, + to_jdc_send: Sender>, + ) -> Result> { + if let Some(ref manager) = self.upstream_manager { + let manager_handle = manager + .start_multi_upstream_share_accounting(from_jdc_recv, to_jdc_send) + .await?; + Ok(manager_handle) + } else { + Err("Multi-upstream manager not initialized".into()) } } /// Returns the sum all the latencies for a given upstream async fn get_latency(&self, pool_address: SocketAddr) -> Result { + // Find the auth key for this address + let auth_pub_key = + if let Some(index) = self.pool_addresses.iter().position(|&a| a == pool_address) { + self.keys[index] + } else { + self.auth_pub_k + }; + let mut pool = PoolLatency::new(pool_address); let setup_connection_msg = self.setup_connection_msg.as_ref(); let timer = self.timer.as_ref(); - let auth_pub_key = self.auth_pub_k; tokio::time::timeout( Duration::from_secs(15), @@ -226,22 +438,54 @@ impl Router { pool.open_sv2_jd_connection, pool.get_a_mining_token, ]; - // Get sum of all latencies for pool let sum_of_latencies: Duration = latencies.iter().flatten().sum(); Ok(sum_of_latencies) } - /// Checks for faster upstream switch to it if found - pub async fn monitor_upstream(&mut self, epsilon: Duration) -> Option { - if let Some(best_pool) = self.select_pool_monitor(epsilon).await { - if Some(best_pool) != self.current_pool { - info!("Switching to faster upstreamn {:?}", best_pool); - return Some(best_pool); - } else { - return None; + /// Initialize upstream connections for the manager + pub async fn initialize_upstream_connections(&mut self) -> Result<(), String> { + if let Some(ref manager) = self.upstream_manager { + debug!( + "Initializing {} upstream connections", + self.pool_addresses.len() + ); + + for (idx, (addr, key)) in self.pool_addresses.iter().zip(self.keys.iter()).enumerate() { + let id = format!("upstream-{}", idx); + debug!("Adding upstream {}: {} ({})", id, addr, key); + + if let Err(e) = manager + .add_upstream( + id.clone(), + *addr, + *key, + self.setup_connection_msg.clone(), + self.timer, + ) + .await + { + error!("Failed to add upstream {}: {:?}", id, e); + } } + + manager.initialize_connections().await; + Ok(()) + } else { + Err("No upstream manager available".to_string()) + } + } + + /// Sets the hashrate distribution for the upstream manager. + pub async fn set_hashrate_distribution( + &mut self, + distribution: Vec, + ) -> Result<(), &'static str> { + if let Some(ref mut manager) = self.upstream_manager { + let _ = manager.set_hashrate_distribution(distribution).await; + Ok(()) + } else { + Err("No upstream manager available") } - None } } @@ -258,7 +502,6 @@ struct PoolLatency { } impl PoolLatency { - // Create new `PoolLatency` given an upstream address fn new(pool: SocketAddr) -> PoolLatency { Self { pool, @@ -271,15 +514,12 @@ impl PoolLatency { } } - /// Sets the `PoolLatency`'s `open_sv2_mining_connection`, `setup_channel_timer`, `receive_first_job`, - /// and `receive_first_set_new_prev_hash` async fn get_mining_setup_latencies( &mut self, setup_connection_msg: Option>, timer: Option, authority_public_key: Secp256k1PublicKey, ) -> Result<(), ()> { - // Set open_sv2_mining_connection latency let open_sv2_mining_connection_timer = Instant::now(); match TcpStream::connect(self.pool).await { Ok(stream) => { @@ -293,7 +533,6 @@ impl PoolLatency { ) .await?; - // Set setup_channel latency let setup_channel_timer = Instant::now(); let result = mining_setup_connection( &mut receiver, @@ -330,18 +569,15 @@ impl PoolLatency { _new_ext_job, )) = message.clone() { - // Set receive_first_job latency self.receive_first_job = Some(timer.elapsed()); received_new_job = true; } if let PoolExtMessages::Mining(Mining::SetNewPrevHash(_new_prev_hash)) = message.clone() { - // Set receive_first_set_new_prev_hash latency self.receive_first_set_new_prev_hash = Some(timer.elapsed()); received_prev_hash = true; } - // Both latencies have been set so we break the loop if received_new_job && received_prev_hash { break; } @@ -367,14 +603,11 @@ impl PoolLatency { } } - /// Sets the `PoolLatency`'s `open_sv2_jd_connection` and `get_a_mining_token` async fn get_jd_latencies( &mut self, authority_public_key: Secp256k1PublicKey, ) -> Result<(), ()> { let address = self.pool; - - // Set open_sv2_jd_connection latency let open_sv2_jd_connection_timer = Instant::now(); match tokio::time::timeout(Duration::from_secs(2), TcpStream::connect(address)).await { @@ -384,7 +617,6 @@ impl PoolLatency { .map_err(|_| error!(" TP_ADDRESS Mutex Corrupted"))?; if let Some(_tp_addr) = tp { let initiator = Initiator::from_raw_k(authority_public_key.into_bytes()) - // Safe expect Key is a constant and must be right .expect("Unable to create initialtor"); let (mut receiver, mut sender, _, _) = match Connection::new(stream, HandshakeRole::Initiator(initiator)).await { @@ -428,7 +660,6 @@ impl PoolLatency { } }; - // Set get_a_mining_token latency let get_a_mining_token_timer = Instant::now(); let _token = JobDeclarator::get_last_token(&job_declarator).await; self.get_a_mining_token = Some(get_a_mining_token_timer.elapsed()); @@ -453,7 +684,6 @@ fn open_channel() -> Mining<'static> { user_identity: "ABC" .to_string() .try_into() - // This can never fail .expect("Failed to convert user identity to string"), nominal_hash_rate: 0.0, }, @@ -473,7 +703,6 @@ async fn initialize_mining_connections( (), > { let initiator = - // Safe expect Key is a constant and must be right Initiator::from_raw_k(authority_public_key.into_bytes()).expect("Invalid authority key"); let (receiver, sender, _, _) = match Connection::new(stream, HandshakeRole::Initiator(initiator)).await { diff --git a/src/router/multi_upstream_manager.rs b/src/router/multi_upstream_manager.rs new file mode 100644 index 00000000..4f927947 --- /dev/null +++ b/src/router/multi_upstream_manager.rs @@ -0,0 +1,560 @@ +use crate::{minin_pool_connection, router::connection_manager::ConnectionManager, HashUnit}; +use key_utils::Secp256k1PublicKey; +use rand; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tracing::{debug, error, info, warn}; +#[derive(Clone)] +pub struct MultiUpstreamManager { + upstreams: Arc>>, + auth_pub_k: Secp256k1PublicKey, + setup_connection_msg: Option>, + timer: Option, +} + +#[derive(Debug)] +pub struct UpstreamConnection { + pub address: SocketAddr, + pub is_active: bool, + pub allocated_percentage: f32, + // Add connection channels if needed + pub sender: Option< + tokio::sync::mpsc::Sender>, + >, + pub receiver: Option< + tokio::sync::mpsc::Receiver>, + >, +} + +impl UpstreamConnection { + pub fn new(address: SocketAddr) -> Self { + Self { + address, + is_active: false, + allocated_percentage: 0.0, + sender: None, + receiver: None, + } + } +} +impl MultiUpstreamManager { + pub fn new( + upstreams: Vec, + auth_pub_k: Secp256k1PublicKey, + setup_connection_msg: Option< + roles_logic_sv2::common_messages_sv2::SetupConnection<'static>, + >, + timer: Option, + ) -> Self { + let upstream_map = upstreams + .into_iter() + .enumerate() + .map(|(i, addr)| (format!("upstream-{}", i), UpstreamConnection::new(addr))) + .collect(); + + Self { + upstreams: Arc::new(Mutex::new(upstream_map)), + auth_pub_k, + setup_connection_msg, + timer, + } + } + + /// Start and maintain all upstream connections according to hashrate distribution + pub async fn maintain_connections(&self) { + let upstreams = self.upstreams.clone(); + let auth_pub_k = self.auth_pub_k; + let setup_connection_msg = self.setup_connection_msg.clone(); + let timer = self.timer; + + let upstreams_guard = upstreams.lock().await; + for (id, conn) in upstreams_guard.iter() { + let id = id.clone(); + let address = conn.address; + let upstreams = upstreams.clone(); + let setup_connection_msg = setup_connection_msg.clone(); + + tokio::spawn(async move { + loop { + info!("Attempting to connect to upstream: {} ({})", id, address); + match minin_pool_connection::connect_pool( + address, + auth_pub_k, + setup_connection_msg.clone(), + timer, + ) + .await + { + Ok((send, recv, _abortable)) => { + info!("Successfully connected to upstream: {} ({})", id, address); + + // Store both sender and receiver + { + let mut upstreams_guard = upstreams.lock().await; + if let Some(upstream) = upstreams_guard.get_mut(&id) { + upstream.sender = Some(send); + upstream.receiver = Some(recv); // Store receiver too! + upstream.is_active = true; + info!("Stored sender and receiver for upstream: {}", id); + } + } + + // Keep connection alive + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + Err(e) => { + error!("Failed to connect to upstream {}: {:?}", id, e); + Self::update_upstream_status(&upstreams, &id, false).await; + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + }); + } + drop(upstreams_guard); + } + + /// Helper method to update upstream connection status + async fn update_upstream_status( + upstreams: &Arc>>, + id: &str, + is_active: bool, + ) { + let mut upstreams = upstreams.lock().await; + if let Some(upstream) = upstreams.get_mut(id) { + upstream.is_active = is_active; + } + } + + pub async fn set_hashrate_distribution( + &mut self, + mut distribution: Vec, + ) -> Result<(), String> { + let upstream_count = { + let upstreams = self.upstreams.lock().await; + upstreams.len() + }; + + // Pad with zeros if needed + if distribution.len() < upstream_count { + let zeros_needed = upstream_count - distribution.len(); + distribution.extend(vec![0.0; zeros_needed]); + debug!("Padded distribution with {} zeros", zeros_needed); + } else if distribution.len() > upstream_count { + return Err(format!( + "Distribution array has {} values but only {} pools available", + distribution.len(), + upstream_count + )); + } + + // Sort distribution highest to lowest + distribution.sort_by(|a, b| b.partial_cmp(a).unwrap_or(std::cmp::Ordering::Equal)); + + // Get pools ranked by latency (best first) + let ranked_pools = self.rank_by_latency().await; + + // Normalize percentages + let total: f32 = distribution.iter().sum(); + if (total - 100.0).abs() > 0.1 { + warn!("Distribution sum: {:.1}%, normalizing to 100%", total); + if total > 0.0 { + for percentage in &mut distribution { + *percentage = (*percentage / total) * 100.0; + } + } else { + return Err("All distribution percentages are zero or negative".to_string()); + } + } + + // Apply distribution to ranked pools + let mut upstreams = self.upstreams.lock().await; + + // Reset all to 0% + for upstream in upstreams.values_mut() { + upstream.allocated_percentage = 0.0; + } + + // Assign percentages to best latency pools first + let downstream_hashrate = ConnectionManager::get_downstream_hashrate(); + + for (i, (pool_id, _address, latency)) in ranked_pools.iter().enumerate() { + if let Some(&percentage) = distribution.get(i) { + if let Some(upstream) = upstreams.get_mut(pool_id) { + upstream.allocated_percentage = percentage; + + if percentage > 0.0 { + let allocated_hashrate = (percentage / 100.0) * downstream_hashrate; + info!( + "{}: {} ({:.1}%) - {}ms", + pool_id, + HashUnit::format_value(allocated_hashrate), + percentage, + latency.as_millis() + ); + } + } + } + } + + Ok(()) + } + + // Add this method to test latency + async fn test_pool_latency(address: &SocketAddr) -> Result { + let start = std::time::Instant::now(); + + match tokio::time::timeout( + Duration::from_secs(2), + tokio::net::TcpStream::connect(address), + ) + .await + { + Ok(Ok(_)) => Ok(start.elapsed()), + Ok(Err(e)) => Err(format!("Connection failed: {}", e)), + Err(_) => Err("Timeout".to_string()), + } + } + + // Add this method to rank pools by latency + pub async fn rank_by_latency(&self) -> Vec<(String, SocketAddr, Duration)> { + let upstreams = self.upstreams.lock().await; + let mut pool_latencies = Vec::new(); + + info!("Testing latency for {} pools", upstreams.len()); + + for (id, conn) in upstreams.iter() { + match Self::test_pool_latency(&conn.address).await { + Ok(latency) => { + debug!("{}: {}ms", id, latency.as_millis()); + pool_latencies.push((id.clone(), conn.address, latency)); + } + Err(e) => { + warn!("{}: latency test failed - {}", id, e); + pool_latencies.push((id.clone(), conn.address, Duration::from_secs(999))); + } + } + } + + // Sort by latency (lowest first) + pool_latencies.sort_by_key(|(_, _, latency)| *latency); + + pool_latencies + } + #[allow(dead_code)] // This method is kept for future use + pub async fn get_upstreams(&self) -> HashMap { + let upstreams = self.upstreams.lock().await; + upstreams + .iter() + .map(|(k, v)| { + ( + k.clone(), + UpstreamConnection { + address: v.address, + is_active: v.is_active, + allocated_percentage: v.allocated_percentage, + sender: v.sender.clone(), + receiver: None, // receiver can't be cloned, set to None + }, + ) + }) + .collect() + } + pub async fn get_detailed_connection_stats(&self) -> Vec<(String, bool, f32)> { + let upstreams = self.upstreams.lock().await; + + upstreams + .iter() + .map(|(id, conn)| (id.clone(), conn.is_active, conn.allocated_percentage)) + .collect() + } + + // Add upstream (called from Router) + pub async fn add_upstream( + &self, + id: String, + address: SocketAddr, + _key: Secp256k1PublicKey, + _setup_connection_msg: Option< + roles_logic_sv2::common_messages_sv2::SetupConnection<'static>, + >, + _timer: Option, + ) -> Result<(), String> { + let mut upstreams = self.upstreams.lock().await; + + // Check if upstream already exists - if so, don't overwrite it + if upstreams.contains_key(&id) { + return Ok(()); + } + + // Only insert if it doesn't exist + upstreams.insert(id.clone(), UpstreamConnection::new(address)); + Ok(()) + } + + // Maintain connections (called from Router) + pub async fn initialize_connections(&self) { + self.maintain_connections().await; + } + #[allow(dead_code)] // This method is kept for future use + pub async fn handle_upstream_failure(&mut self, upstream_id: &str) { + { + let mut upstreams = self.upstreams.lock().await; + + if let Some(upstream) = upstreams.get_mut(upstream_id) { + upstream.is_active = false; + warn!( + "Upstream {} disconnected, attempting reconnection", + upstream_id + ); + } + } // Drop the guard here + + // Redistribute hashrate among remaining active upstreams + self.rebalance_on_failure().await; + } + #[allow(dead_code)] // This method is kept for future use + async fn rebalance_on_failure(&mut self) { + let upstreams = self.upstreams.lock().await; + let active_count = upstreams.values().filter(|u| u.is_active).count(); + + if active_count == 0 { + error!("All upstreams disconnected - proxy will retry connections"); + return; + } + + info!( + "Rebalancing hashrate across {} active upstreams", + active_count + ); + // Implement rebalancing logic here + } + + pub async fn start_multi_upstream_share_accounting( + &self, + mut from_translator_recv: tokio::sync::mpsc::Receiver< + roles_logic_sv2::parsers::Mining<'static>, + >, + to_translator_send: tokio::sync::mpsc::Sender>, // Remove underscore + ) -> Result> { + info!("Starting multi-upstream share accounting"); + + // Clone the manager for use in the async task + let manager = self.clone(); + + let share_accounting_task = tokio::spawn({ + async move { + info!("Multi-upstream share accounting task started"); + + let mut message_count = 0; + let response_count = 0; + + // Start response handlers for each upstream + manager + .start_upstream_response_handlers(to_translator_send.clone()) + .await; + + loop { + tokio::select! { + // Handle outbound messages FROM translator TO upstreams + Some(message) = from_translator_recv.recv() => { + message_count += 1; + info!("Received message #{} from translator", message_count); + + // Route message to appropriate upstream based on hashrate distribution + manager.route_message_to_upstream(message).await; + } + + // Keep task alive + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + debug!("Multi-upstream share accounting keepalive (out: {}, in: {} messages)", message_count, response_count); + } + + else => { + warn!("All channels closed, stopping share accounting"); + break; + } + } + } + + info!( + "Multi-upstream share accounting task finished (out: {}, in: {} messages)", + message_count, response_count + ); + } + }); + + // Create AbortOnDrop to manage the task + Ok(crate::shared::utils::AbortOnDrop::new( + share_accounting_task, + )) + } + + pub async fn route_message_to_upstream( + &self, + message: roles_logic_sv2::parsers::Mining<'static>, + ) { + let upstreams_guard = self.upstreams.lock().await; + let active_upstreams: Vec<(String, f32)> = upstreams_guard + .iter() + .filter_map(|(id, conn)| { + if conn.is_active { + Some((id.clone(), conn.allocated_percentage)) + } else { + None + } + }) + .collect(); + + if active_upstreams.is_empty() { + warn!("No active upstreams available for message routing"); + return; + } + + let total_percentage: f32 = active_upstreams.iter().map(|(_, pct)| pct).sum(); + if total_percentage <= 0.0 { + warn!("Total upstream percentage is 0, cannot distribute"); + return; + } + + // Weighted random + let random_value = rand::random::() * total_percentage; + let mut cumulative = 0.0; + let mut selected_upstream: Option = None; + + for (upstream_id, percentage) in &active_upstreams { + cumulative += percentage; + if random_value <= cumulative { + selected_upstream = Some(upstream_id.clone()); + break; + } + } + + if let Some(upstream_id) = selected_upstream { + let percentage = active_upstreams + .iter() + .find(|(id, _)| id == &upstream_id) + .map(|(_, pct)| *pct) + .unwrap_or(0.0); + + let message_type = match &message { + roles_logic_sv2::parsers::Mining::OpenStandardMiningChannel(_) => { + "OpenStandardMiningChannel" + } + roles_logic_sv2::parsers::Mining::OpenExtendedMiningChannel(_) => { + "OpenExtendedMiningChannel" + } + roles_logic_sv2::parsers::Mining::SubmitSharesStandard(_) => "SubmitSharesStandard", + roles_logic_sv2::parsers::Mining::SubmitSharesExtended(_) => "SubmitSharesExtended", + _ => "Other", + }; + + debug!( + "Routing {} message to upstream: {} ({:.1}%)", + message_type, upstream_id, percentage + ); + + if let Some(upstream) = upstreams_guard.get(&upstream_id) { + if let Some(ref sender) = upstream.sender { + let pool_message = + demand_share_accounting_ext::parser::PoolExtMessages::Mining(message); + if let Err(e) = sender.send(pool_message).await { + warn!("Failed to send message to upstream {}: {}", upstream_id, e); + } else { + info!( + "Successfully routed {} to {} ({:.1}%)", + message_type, upstream_id, percentage + ); + } + } else { + warn!("No sender available for upstream {}", upstream_id); + } + } + } + + drop(upstreams_guard); + } + /// Start response handlers for all upstream connections + async fn start_upstream_response_handlers( + &self, + to_translator_send: tokio::sync::mpsc::Sender>, + ) { + let upstreams = self.upstreams.lock().await; + + for (upstream_id, _conn) in upstreams.iter() { + let upstream_id = upstream_id.clone(); + let upstreams = self.upstreams.clone(); + let to_translator_send = to_translator_send.clone(); + + tokio::spawn(async move { + loop { + // Try to get the receiver for this upstream + let mut receiver_opt = None; + { + let mut upstreams_guard = upstreams.lock().await; + if let Some(upstream) = upstreams_guard.get_mut(&upstream_id) { + if upstream.is_active && upstream.receiver.is_some() { + receiver_opt = upstream.receiver.take(); // Take ownership + } + } + } + + if let Some(mut receiver) = receiver_opt { + info!("Starting response handler for upstream: {}", upstream_id); + + // Listen for responses from this upstream + loop { + tokio::select! { + Some(response) = receiver.recv() => { + debug!("Received response from upstream: {}", upstream_id); + + // Convert PoolExtMessages back to Mining message + if let demand_share_accounting_ext::parser::PoolExtMessages::Mining(mining_response) = response { + // Forward response back to translator (and then to cpuminer) + if let Err(e) = to_translator_send.send(mining_response).await { + warn!("Failed to send response to translator from {}: {}", upstream_id, e); + break; + } else { + debug!("Successfully forwarded response from {} to translator", upstream_id); + } + } + } + + _ = tokio::time::sleep(tokio::time::Duration::from_secs(30)) => { + // Check if upstream is still active + let upstreams_guard = upstreams.lock().await; + if let Some(upstream) = upstreams_guard.get(&upstream_id) { + if !upstream.is_active { + warn!("Upstream {} became inactive, stopping response handler", upstream_id); + break; + } + } + } + + else => { + warn!("Response channel closed for upstream: {}", upstream_id); + break; + } + } + } + + // Put receiver back if upstream is still active + { + let mut upstreams_guard = upstreams.lock().await; + if let Some(upstream) = upstreams_guard.get_mut(&upstream_id) { + if upstream.is_active { + upstream.receiver = Some(receiver); + } + } + } + } else { + // No receiver available, wait and retry + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + } + }); + } + } +}