From 9b972f5ed29c5cee1ca60ef40533745b23cb15d6 Mon Sep 17 00:00:00 2001 From: Divyansh Seth Date: Thu, 26 Jun 2025 14:55:27 +0530 Subject: [PATCH 1/2] fix/latency_update --- src/router/mod.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/router/mod.rs b/src/router/mod.rs index 95ca34bb..7acc2bf9 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -82,22 +82,23 @@ impl Router { info!("Selecting best Pool for connection"); if self.pool_addresses.is_empty() { error!("No pool addresses provided"); + self.latency_tx.send_replace(None); return None; } - if self.pool_addresses.len() == 1 { + let (pool, latency) = if self.pool_addresses.len() == 1 { + let pool = self.pool_addresses[0]; info!( "Only one pool address available, using: {:?}", self.pool_addresses[0] ); - return Some(self.pool_addresses[0]); - } - if let Some((pool, latency)) = self.select_pool().await { - info!("Latency for Pool {:?} is {:?}", pool, latency); - self.latency_tx.send_replace(Some(latency)); // update latency - Some(pool) + let latency = self.get_latency(pool).await.ok()?; + (pool, latency) } else { - None - } + self.select_pool().await? + }; + info!("Latency for Pool {:?} is {:?}", pool, latency); + self.latency_tx.send_replace(Some(latency)); + Some(pool) } /// Select the best pool for monitoring From 51b805d982a0d3e789ca98c50c1046c0a1d69e78 Mon Sep 17 00:00:00 2001 From: Divyansh Seth Date: Sun, 29 Jun 2025 16:06:40 +0530 Subject: [PATCH 2/2] Move latency calculation to get_pool_info API handler --- src/api/routes.rs | 14 ++++++++------ src/router/mod.rs | 24 +++++++----------------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/api/routes.rs b/src/api/routes.rs index 250e06f6..2878b180 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -67,20 +67,22 @@ impl Api { // Retrieves the current pool information pub async fn get_pool_info(State(state): State) -> impl IntoResponse { let current_pool_address = state.router.current_pool; - let latency = *state.router.latency_rx.borrow(); - - match (current_pool_address, latency) { - (Some(address), Some(latency)) => { + match current_pool_address { + Some(address) => { + let latency = match state.router.get_latency(address).await { + Ok(latency) => Some(latency), + Err(_) => None, + }; let response_data = serde_json::json!({ "address": address.to_string(), - "latency": latency.as_millis().to_string() + "latency": latency.map(|d| d.as_millis().to_string()).unwrap_or_else(|| "N/A".to_string()) }); ( StatusCode::OK, Json(APIResponse::success(Some(response_data))), ) } - (_, _) => ( + None => ( StatusCode::NOT_FOUND, Json(APIResponse::error(Some( "Pool information unavailable".to_string(), diff --git a/src/router/mod.rs b/src/router/mod.rs index 7acc2bf9..849983b4 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -12,10 +12,7 @@ use noise_sv2::Initiator; use roles_logic_sv2::{common_messages_sv2::SetupConnection, parsers::Mining}; use tokio::{ net::TcpStream, - sync::{ - mpsc::{Receiver, Sender}, - watch, - }, + sync::mpsc::{Receiver, Sender}, }; use tracing::{error, info}; @@ -32,8 +29,6 @@ pub struct Router { auth_pub_k: Secp256k1PublicKey, setup_connection_msg: Option>, timer: Option, - latency_tx: watch::Sender>, - pub latency_rx: watch::Receiver>, } impl Router { @@ -48,15 +43,12 @@ impl Router { // If None, default time of 5s is used. timer: Option, ) -> Self { - let (latency_tx, latency_rx) = watch::channel(None); Self { pool_addresses, current_pool: None, auth_pub_k, setup_connection_msg, timer, - latency_tx, - latency_rx, } } @@ -82,22 +74,20 @@ impl Router { info!("Selecting best Pool for connection"); if self.pool_addresses.is_empty() { error!("No pool addresses provided"); - self.latency_tx.send_replace(None); return None; } - let (pool, latency) = if self.pool_addresses.len() == 1 { + let pool = if self.pool_addresses.len() == 1 { let pool = self.pool_addresses[0]; info!( "Only one pool address available, using: {:?}", self.pool_addresses[0] ); - let latency = self.get_latency(pool).await.ok()?; - (pool, latency) + pool } else { - self.select_pool().await? + let (pool, latency) = self.select_pool().await?; + info!("Latency for Pool {:?} is {:?}", pool, latency); + pool }; - info!("Latency for Pool {:?} is {:?}", pool, latency); - self.latency_tx.send_replace(Some(latency)); Some(pool) } @@ -190,7 +180,7 @@ impl Router { } /// Returns the sum all the latencies for a given upstream - async fn get_latency(&self, pool_address: SocketAddr) -> Result { + pub async fn get_latency(&self, pool_address: SocketAddr) -> Result { let mut pool = PoolLatency::new(pool_address); let setup_connection_msg = self.setup_connection_msg.as_ref(); let timer = self.timer.as_ref();