Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/translator/downstream/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,13 @@ impl Downstream {
downstream.safe_lock(|s| (s.share_monitor.clone(), s.connection_id))?;

// Create an abortable task for the shares monitor
let task_manager_clone = task_manager.clone();
let abortable = tokio::spawn(async move {
info!("Starting shares monitor for downstream: {}", connection_id);
share_monitor.clone().monitor().await;
if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() {
let _ = kill_signal.send(connection_id).await;
}
});

// Register the task with the task manager so it can be aborted when needed
Expand Down
12 changes: 10 additions & 2 deletions src/translator/downstream/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub async fn start_notify(
upstream_difficulty_config
.safe_lock(|c| c.channel_nominal_hashrate += Configuration::downstream_hashrate())?;
stats_sender.setup_stats(connection_id);
let task_manager_clone = task_manager.clone();
task::spawn(async move {
let timeout_timer = std::time::Instant::now();
let mut authorized_in_time = true;
Expand Down Expand Up @@ -89,7 +90,7 @@ pub async fn start_notify(
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if let Err(e) = start_update(task_manager, downstream.clone(), connection_id).await {
if let Err(e) = start_update(task_manager_clone.clone(), downstream.clone(), connection_id).await {
warn!("Translator impossible to start update task: {e}");
} else if authorized_in_time {
// Get the mask after initialization since is set by configure message
Expand Down Expand Up @@ -127,6 +128,9 @@ pub async fn start_notify(
"Downstream: Shutting down sv1 downstream job notifier for {}",
&host
);
if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() {
let _ = kill_signal.send(connection_id).await;
}
})
};
TaskManager::add_notify(task_manager, handle.into(), connection_id)
Expand All @@ -139,6 +143,7 @@ async fn start_update(
downstream: Arc<Mutex<Downstream>>,
connection_id: u32,
) -> Result<(), Error<'static>> {
let task_manager_clone = task_manager.clone();
let handle = task::spawn(async move {
// Prevent difficulty adjustments until after delay elapses
tokio::time::sleep(std::time::Duration::from_secs(crate::Configuration::delay())).await;
Expand All @@ -159,9 +164,12 @@ async fn start_update(
// mining.set_difficulty
if let Err(e) = Downstream::try_update_difficulty_settings(&downstream).await {
error!("{e}");
return;
break;
};
}
if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() {
let _ = kill_signal.send(connection_id).await;
}
});
TaskManager::add_update(task_manager, handle.into(), connection_id)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/translator/downstream/receive_from_downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn start_receive_downstream(
sv1_api::error::Error::InvalidJsonRpcMessageKind
))
);
return;
break;
}
}
if let Ok(stats_sender) = downstream.safe_lock(|d| d.stats_sender.clone()) {
Expand Down
14 changes: 9 additions & 5 deletions src/translator/downstream/send_to_downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub async fn start_send_to_downstream(
connection_id: u32,
host: String,
) -> Result<(), Error<'static>> {
let task_manager_clone = task_manager.clone();
let handle = task::spawn(async move {
while let Some(res) = receiver_outgoing.recv().await {
let to_send = match serde_json::to_string(&res) {
Expand All @@ -28,11 +29,14 @@ pub async fn start_send_to_downstream(
break;
}
}
warn!(
"Downstream: Shutting down sv1 downstream writer: {}",
connection_id
);
});
warn!(
"Downstream: Shutting down sv1 downstream writer: {}",
connection_id
);
if let Some(kill_signal) = task_manager_clone.safe_lock(|tm| tm.send_kill_signal.clone()).ok() {
let _ = kill_signal.send(connection_id).await;
}
});
TaskManager::add_send_downstream(task_manager, handle.into(), connection_id)
.await
.map_err(|_| Error::TranslatorTaskManagerFailed)
Expand Down