Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,22 @@ impl Api {
// Retrieves the current pool information
pub async fn get_pool_info(State(state): State<AppState>) -> impl IntoResponse {
let current_pool_address = state.router.current_pool;
let latency = *state.router.latency_rx.borrow();

match (current_pool_address, latency) {
(Some(address), Some(latency)) => {
match current_pool_address {
Some(address) => {
let latency = match state.router.get_latency(address).await {
Ok(latency) => Some(latency),
Err(_) => None,
};
let response_data = serde_json::json!({
"address": address.to_string(),
"latency": latency.as_millis().to_string()
"latency": latency.map(|d| d.as_millis().to_string()).unwrap_or_else(|| "N/A".to_string())
});
(
StatusCode::OK,
Json(APIResponse::success(Some(response_data))),
)
}
(_, _) => (
None => (
StatusCode::NOT_FOUND,
Json(APIResponse::error(Some(
"Pool information unavailable".to_string(),
Expand Down
29 changes: 10 additions & 19 deletions src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use noise_sv2::Initiator;
use roles_logic_sv2::{common_messages_sv2::SetupConnection, parsers::Mining};
use tokio::{
net::TcpStream,
sync::{
mpsc::{Receiver, Sender},
watch,
},
sync::mpsc::{Receiver, Sender},
};
use tracing::{error, info};

Expand All @@ -32,8 +29,6 @@ pub struct Router {
auth_pub_k: Secp256k1PublicKey,
setup_connection_msg: Option<SetupConnection<'static>>,
timer: Option<Duration>,
latency_tx: watch::Sender<Option<Duration>>,
pub latency_rx: watch::Receiver<Option<Duration>>,
}

impl Router {
Expand All @@ -48,15 +43,12 @@ impl Router {
// If None, default time of 5s is used.
timer: Option<Duration>,
) -> Self {
let (latency_tx, latency_rx) = watch::channel(None);
Self {
pool_addresses,
current_pool: None,
auth_pub_k,
setup_connection_msg,
timer,
latency_tx,
latency_rx,
}
}

Expand Down Expand Up @@ -84,20 +76,19 @@ impl Router {
error!("No pool addresses provided");
return None;
}
if self.pool_addresses.len() == 1 {
let pool = if self.pool_addresses.len() == 1 {
let pool = self.pool_addresses[0];
info!(
"Only one pool address available, using: {:?}",
self.pool_addresses[0]
);
return Some(self.pool_addresses[0]);
}
if let Some((pool, latency)) = self.select_pool().await {
info!("Latency for Pool {:?} is {:?}", pool, latency);
self.latency_tx.send_replace(Some(latency)); // update latency
Some(pool)
pool
} else {
None
}
let (pool, latency) = self.select_pool().await?;
info!("Latency for Pool {:?} is {:?}", pool, latency);
pool
};
Some(pool)
}

/// Select the best pool for monitoring
Expand Down Expand Up @@ -189,7 +180,7 @@ impl Router {
}

/// Returns the sum all the latencies for a given upstream
async fn get_latency(&self, pool_address: SocketAddr) -> Result<Duration, ()> {
pub async fn get_latency(&self, pool_address: SocketAddr) -> Result<Duration, ()> {
let mut pool = PoolLatency::new(pool_address);
let setup_connection_msg = self.setup_connection_msg.as_ref();
let timer = self.timer.as_ref();
Expand Down