From 0de2aaa2fcd430270cdad8f51c6d86115b524f4b Mon Sep 17 00:00:00 2001 From: Vara Rahul Rajana Date: Sat, 11 Apr 2026 21:14:20 +0530 Subject: [PATCH] refactor: implement connection cleanup by sending kill signals upon downstream task termination --- src/translator/downstream/downstream.rs | 4 ++++ src/translator/downstream/notify.rs | 12 ++++++++++-- .../downstream/receive_from_downstream.rs | 2 +- src/translator/downstream/send_to_downstream.rs | 14 +++++++++----- 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/translator/downstream/downstream.rs b/src/translator/downstream/downstream.rs index 938a6ed2..46643e22 100644 --- a/src/translator/downstream/downstream.rs +++ b/src/translator/downstream/downstream.rs @@ -267,9 +267,13 @@ impl Downstream { downstream.safe_lock(|s| (s.share_monitor.clone(), s.connection_id))?; // Create an abortable task for the shares monitor + let task_manager_clone = task_manager.clone(); let abortable = tokio::spawn(async move { info!("Starting shares monitor for downstream: {}", connection_id); share_monitor.clone().monitor().await; + if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() { + let _ = kill_signal.send(connection_id).await; + } }); // Register the task with the task manager so it can be aborted when needed diff --git a/src/translator/downstream/notify.rs b/src/translator/downstream/notify.rs index c5414d97..a1ba4fb9 100644 --- a/src/translator/downstream/notify.rs +++ b/src/translator/downstream/notify.rs @@ -32,6 +32,7 @@ pub async fn start_notify( upstream_difficulty_config .safe_lock(|c| c.channel_nominal_hashrate += Configuration::downstream_hashrate())?; stats_sender.setup_stats(connection_id); + let task_manager_clone = task_manager.clone(); task::spawn(async move { let timeout_timer = std::time::Instant::now(); let mut authorized_in_time = true; @@ -89,7 +90,7 @@ pub async fn start_notify( } tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - if let Err(e) = start_update(task_manager, downstream.clone(), connection_id).await { + if let Err(e) = start_update(task_manager_clone.clone(), downstream.clone(), connection_id).await { warn!("Translator impossible to start update task: {e}"); } else if authorized_in_time { // Get the mask after initialization since is set by configure message @@ -127,6 +128,9 @@ pub async fn start_notify( "Downstream: Shutting down sv1 downstream job notifier for {}", &host ); + if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() { + let _ = kill_signal.send(connection_id).await; + } }) }; TaskManager::add_notify(task_manager, handle.into(), connection_id) @@ -139,6 +143,7 @@ async fn start_update( downstream: Arc>, connection_id: u32, ) -> Result<(), Error<'static>> { + let task_manager_clone = task_manager.clone(); let handle = task::spawn(async move { // Prevent difficulty adjustments until after delay elapses tokio::time::sleep(std::time::Duration::from_secs(crate::Configuration::delay())).await; @@ -159,9 +164,12 @@ async fn start_update( // mining.set_difficulty if let Err(e) = Downstream::try_update_difficulty_settings(&downstream).await { error!("{e}"); - return; + break; }; } + if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() { + let _ = kill_signal.send(connection_id).await; + } }); TaskManager::add_update(task_manager, handle.into(), connection_id) .await diff --git a/src/translator/downstream/receive_from_downstream.rs b/src/translator/downstream/receive_from_downstream.rs index 59e31840..7845495a 100644 --- a/src/translator/downstream/receive_from_downstream.rs +++ b/src/translator/downstream/receive_from_downstream.rs @@ -47,7 +47,7 @@ pub async fn start_receive_downstream( sv1_api::error::Error::InvalidJsonRpcMessageKind )) ); - return; + break; } } if let Ok(stats_sender) = downstream.safe_lock(|d| d.stats_sender.clone()) { diff --git a/src/translator/downstream/send_to_downstream.rs b/src/translator/downstream/send_to_downstream.rs index 2c63476c..6b5f4d4e 100644 --- a/src/translator/downstream/send_to_downstream.rs +++ b/src/translator/downstream/send_to_downstream.rs @@ -14,6 +14,7 @@ pub async fn start_send_to_downstream( connection_id: u32, host: String, ) -> Result<(), Error<'static>> { + let task_manager_clone = task_manager.clone(); let handle = task::spawn(async move { while let Some(res) = receiver_outgoing.recv().await { let to_send = match serde_json::to_string(&res) { @@ -28,11 +29,14 @@ pub async fn start_send_to_downstream( break; } } - warn!( - "Downstream: Shutting down sv1 downstream writer: {}", - connection_id - ); - }); + warn!( + "Downstream: Shutting down sv1 downstream writer: {}", + connection_id + ); + if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() { + let _ = kill_signal.send(connection_id).await; + } + }); TaskManager::add_send_downstream(task_manager, handle.into(), connection_id) .await .map_err(|_| Error::TranslatorTaskManagerFailed)