From 98f6fe36797b9425b852bd3d788a17cd21319e31 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Fri, 13 Feb 2026 13:13:06 -0500 Subject: [PATCH 1/8] translator: fix extension negotiation race condition Wait for RequestExtensionsSuccess/Error response before starting the SV1 server. This ensures the ChannelManager knows which extensions are active BEFORE downstream miners can connect. Previously, RequestExtensions was sent asynchronously and the SV1 server started immediately without waiting for the response. This caused a race condition where extension-dependent behavior (like UserIdentity TLV for worker hashrate tracking) could not be reliably determined. Changes: - setup_connection() now returns Vec of negotiated extensions - negotiate_extensions() handles the extension negotiation flow synchronously - handle_extension_response() parses RequestExtensionsSuccess/Error - ChannelManager::new() accepts negotiated_extensions parameter - ChannelManager::set_negotiated_extensions() for upstream fallback scenarios - Added ExtensionNegotiationTimeout error variant (30 second timeout) Fixes: stratum-mining/sv2-apps#264 --- miner-apps/translator/src/lib/error.rs | 5 + miner-apps/translator/src/lib/mod.rs | 92 ++++--- .../sv2/channel_manager/channel_manager.rs | 16 +- .../src/lib/sv2/upstream/upstream.rs | 231 +++++++++++++++--- 4 files changed, 284 insertions(+), 60 deletions(-) diff --git a/miner-apps/translator/src/lib/error.rs b/miner-apps/translator/src/lib/error.rs index 8854b2a44..4f7653bac 100644 --- a/miner-apps/translator/src/lib/error.rs +++ b/miner-apps/translator/src/lib/error.rs @@ -176,6 +176,8 @@ pub enum TproxyErrorKind { RequiredExtensionsNotSupported(Vec), /// Server requires extensions that the translator doesn't support ServerRequiresUnsupportedExtensions(Vec), + /// Extension negotiation timed out waiting for response + ExtensionNegotiationTimeout, /// Represents a generic channel send failure, described by a string. General(String), /// Error bubbling up from translator-core library @@ -256,6 +258,9 @@ impl fmt::Display for TproxyErrorKind { extensions ) } + ExtensionNegotiationTimeout => { + write!(f, "Extension negotiation timed out waiting for response") + } SV1Error => write!(f, "Sv1 error"), TranslatorCore(ref e) => write!(f, "Translator core error: {e:?}"), NetworkHelpersError(ref e) => write!(f, "Network helpers error: {e:?}"), diff --git a/miner-apps/translator/src/lib/mod.rs b/miner-apps/translator/src/lib/mod.rs index aa65bf50d..c44c9b42f 100644 --- a/miner-apps/translator/src/lib/mod.rs +++ b/miner-apps/translator/src/lib/mod.rs @@ -134,7 +134,7 @@ impl TranslatorSv2 { info!("Initializing upstream connection..."); - if let Err(e) = self + let init_result = match self .initialize_upstream( &mut upstream_addresses, channel_manager_to_upstream_receiver.clone(), @@ -148,12 +148,18 @@ impl TranslatorSv2 { ) .await { - error!("Failed to initialize any upstream connection: {e:?}"); - self.shutdown_notify.notify_waiters(); - self.is_alive.store(false, Ordering::Relaxed); - return; - } + Ok(result) => result, + Err(e) => { + error!("Failed to initialize any upstream connection: {e:?}"); + self.shutdown_notify.notify_waiters(); + self.is_alive.store(false, Ordering::Relaxed); + return; + } + }; + // Create ChannelManager with the negotiated extensions from upstream + // This ensures the ChannelManager knows which extensions are active + // BEFORE the SV1 server starts accepting connections let mut channel_manager: Arc = Arc::new(ChannelManager::new( channel_manager_to_upstream_sender, upstream_to_channel_manager_receiver, @@ -162,6 +168,7 @@ impl TranslatorSv2 { status_sender.clone(), self.config.supported_extensions.clone(), self.config.required_extensions.clone(), + init_result.negotiated_extensions, )); info!("Launching ChannelManager tasks..."); @@ -285,7 +292,7 @@ impl TranslatorSv2 { self.config.clone(), )); - if let Err(e) = self.initialize_upstream( + match self.initialize_upstream( &mut upstream_addresses, channel_manager_to_upstream_receiver, upstream_to_channel_manager_sender, @@ -296,21 +303,30 @@ impl TranslatorSv2 { sv1_server.clone(), self.config.required_extensions.clone(), ).await { - error!("Couldn't perform fallback, shutting system down: {e:?}"); - cancellation_token.cancel(); - break; + Ok(fallback_result) => { + info!( + "Upstream restarted successfully with extensions: {:?}", + fallback_result.negotiated_extensions + ); + + channel_manager = Arc::new(ChannelManager::new( + channel_manager_to_upstream_sender, + upstream_to_channel_manager_receiver, + channel_manager_to_sv1_server_sender, + sv1_server_to_channel_manager_receiver, + status_sender.clone(), + self.config.supported_extensions.clone(), + self.config.required_extensions.clone(), + fallback_result.negotiated_extensions, + )); + } + Err(e) => { + error!("Couldn't perform fallback, shutting system down: {e:?}"); + cancellation_token.cancel(); + break; + } } - channel_manager = Arc::new(ChannelManager::new( - channel_manager_to_upstream_sender, - upstream_to_channel_manager_receiver, - channel_manager_to_sv1_server_sender, - sv1_server_to_channel_manager_receiver, - status_sender.clone(), - self.config.supported_extensions.clone(), - self.config.required_extensions.clone(), - )); - info!("Launching ChannelManager tasks..."); ChannelManager::run_channel_manager_tasks( channel_manager.clone(), @@ -424,6 +440,10 @@ impl TranslatorSv2 { /// `false` means "never tried", while `true` means "already connected or marked as /// malicious". Once an upstream is flagged we skip it on future loops /// to avoid hammering known-bad endpoints during failover. + /// + /// # Returns + /// * `Ok(UpstreamInitResult)` - Contains the negotiated extensions to be passed to ChannelManager + /// * `Err(TproxyErrorKind)` - All upstreams failed #[allow(clippy::too_many_arguments)] pub async fn initialize_upstream( &self, @@ -436,11 +456,11 @@ impl TranslatorSv2 { task_manager: Arc, sv1_server_instance: Arc, required_extensions: Vec, - ) -> Result<(), TproxyErrorKind> { + ) -> Result { const MAX_RETRIES: usize = 3; let upstream_len = upstreams.len(); for (i, upstream_entry) in upstreams.iter_mut().enumerate() { - // Skip upstreams already marked as malicious. We’ve previously failed or + // Skip upstreams already marked as malicious. We've previously failed or // blacklisted them, so no need to warn or attempt reconnecting again. if upstream_entry.tried_or_flagged { debug!( @@ -472,8 +492,13 @@ impl TranslatorSv2 { ) .await { - Ok(()) => { - // starting sv1 server instance + Ok(init_result) => { + info!( + "Extension negotiation complete. Negotiated extensions: {:?}", + init_result.negotiated_extensions + ); + + // Now that extensions are negotiated, start the SV1 server if let Err(e) = sv1_server_instance .clone() .start( @@ -489,7 +514,7 @@ impl TranslatorSv2 { } upstream_entry.tried_or_flagged = true; - return Ok(()); + return Ok(init_result); } Err(e) => { warn!( @@ -513,6 +538,14 @@ impl TranslatorSv2 { } } +/// Result of successful upstream initialization. +/// Contains the negotiated extensions that should be passed to the ChannelManager. +pub struct UpstreamInitResult { + /// Extensions that were successfully negotiated with the upstream server. + /// This should be stored in the ChannelManager before starting the SV1 server. + pub negotiated_extensions: Vec, +} + // Attempts to initialize a single upstream. #[allow(clippy::too_many_arguments)] #[cfg_attr(not(test), hotpath::measure)] @@ -525,7 +558,7 @@ async fn try_initialize_upstream( status_sender: Sender, task_manager: Arc, required_extensions: Vec, -) -> Result<(), TproxyErrorKind> { +) -> Result { let upstream = Upstream::new( upstream_addr, upstream_to_channel_manager_sender, @@ -537,7 +570,7 @@ async fn try_initialize_upstream( ) .await?; - upstream + let negotiated_extensions = upstream .start( cancellation_token, fallback_coordinator, @@ -545,7 +578,10 @@ async fn try_initialize_upstream( task_manager, ) .await?; - Ok(()) + + Ok(UpstreamInitResult { + negotiated_extensions, + }) } /// Defines the operational mode for Translator Proxy. diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs index 72e951aeb..85088dacb 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs @@ -108,6 +108,8 @@ impl ChannelManager { /// by server) /// * `required_extensions` - Extensions that the translator requires (must be supported by /// server) + /// * `negotiated_extensions` - Extensions that were successfully negotiated with the upstream + /// server during setup_connection /// /// # Returns /// A new ChannelManager instance ready to handle message routing @@ -120,6 +122,7 @@ impl ChannelManager { status_sender: Sender, supported_extensions: Vec, required_extensions: Vec, + negotiated_extensions: Vec, ) -> Self { let channel_state = ChannelState::new( upstream_sender, @@ -137,12 +140,22 @@ impl ChannelManager { extended_channels: Arc::new(DashMap::new()), group_channels: Arc::new(DashMap::new()), share_sequence_counters: Arc::new(DashMap::new()), - negotiated_extensions: Arc::new(Mutex::new(Vec::new())), + negotiated_extensions: Arc::new(Mutex::new(negotiated_extensions)), extranonce_factories: Arc::new(DashMap::new()), aggregated_channel_state: AtomicAggregatedState::new(AggregatedState::NoChannel), } } + /// Sets the negotiated extensions. + /// + /// This is used after upstream fallback to update the negotiated extensions + /// with the new upstream server. + pub fn set_negotiated_extensions(&self, extensions: Vec) { + self.negotiated_extensions.super_safe_lock(|data| { + *data = extensions; + }); + } + /// Spawns and runs the main channel manager task loop. /// /// This method creates an async task that handles all message routing for the @@ -807,6 +820,7 @@ mod tests { status_sender, vec![], vec![], + vec![], // negotiated_extensions ) } diff --git a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs index b91ee9da4..aa913eab1 100644 --- a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs +++ b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs @@ -6,14 +6,15 @@ use crate::{ utils::UpstreamEntry, }; use async_channel::{unbounded, Receiver, Sender}; -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use stratum_apps::{ fallback_coordinator::FallbackCoordinator, network_helpers::{self, connect_with_noise, resolve_host}, stratum_core::{ - binary_sv2::Seq064K, + binary_sv2::{self, Seq064K}, + codec_sv2::HandshakeRole, common_messages_sv2::{Protocol, SetupConnection}, - extensions_sv2::RequestExtensions, + extensions_sv2::{RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess}, handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::{AnyMessage, Mining}, }, @@ -28,6 +29,9 @@ use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; +/// Timeout for extension negotiation response (30 seconds) +const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 30; + /// Manages the upstream SV2 connection to a mining pool or proxy. /// /// This struct handles the SV2 protocol communication with upstream servers, @@ -175,36 +179,47 @@ impl Upstream { /// /// This method: /// - Completes the SV2 handshake with the upstream server + /// - Negotiates extensions synchronously (waits for response) /// - Spawns the main message processing task /// - Handles graceful shutdown coordination /// /// The method will first attempt to complete the SV2 setup connection /// handshake. If successful, it spawns a task to handle bidirectional /// message flow between the channel manager and upstream server. + /// + /// # Returns + /// * `Ok(Vec)` - Successfully started, returns negotiated extensions + /// * `Err(TproxyError)` - Failed to start or negotiate extensions pub async fn start( mut self, cancellation_token: CancellationToken, fallback_coordinator: FallbackCoordinator, status_sender: Sender, task_manager: Arc, - ) -> TproxyResult<(), error::Upstream> { + ) -> TproxyResult, error::Upstream> { let fallback_token: CancellationToken = fallback_coordinator.token(); + let negotiated_extensions; - // wait for connection setup or cancellation signal + // Wait for connection setup or cancellation signal tokio::select! { result = self.setup_connection() => { - if let Err(e) = result { - error!("Upstream: failed to set up SV2 connection: {e:?}"); - return Err(e); + match result { + Ok(extensions) => { + negotiated_extensions = extensions; + } + Err(e) => { + error!("Upstream: failed to set up SV2 connection: {e:?}"); + return Err(e); + } } } _ = cancellation_token.cancelled() => { info!("Upstream: shutdown signal received during connection setup."); - return Ok(()); + return Ok(vec![]); } _ = fallback_token.cancelled() => { info!("Upstream: fallback signal received during connection setup."); - return Ok(()); + return Ok(vec![]); } } @@ -218,7 +233,7 @@ impl Upstream { task_manager, )?; - Ok(()) + Ok(negotiated_extensions) } /// Performs the SV2 handshake setup with the upstream server. @@ -227,10 +242,16 @@ impl Upstream { /// - Creating and sending a SetupConnection message /// - Waiting for the handshake response /// - Validating and processing the response + /// - Sending RequestExtensions if required extensions are configured + /// - **Waiting for RequestExtensionsSuccess/Error response** before returning /// /// The handshake establishes the protocol version, capabilities, and /// other connection parameters needed for SV2 communication. - pub async fn setup_connection(&mut self) -> TproxyResult<(), error::Upstream> { + /// + /// # Returns + /// * `Ok(Vec)` - The list of negotiated extensions (empty if none were requested) + /// * `Err(TproxyError)` - Error during handshake or extension negotiation + pub async fn setup_connection(&mut self) -> TproxyResult, error::Upstream> { debug!("Upstream: initiating SV2 handshake..."); // Build SetupConnection message let setup_conn_msg = Self::get_setup_connection_message(2, 2, &self.address, false) @@ -277,32 +298,180 @@ impl Upstream { debug!("Upstream: handshake completed successfully."); // Send RequestExtensions message if there are any required extensions + // and wait for the response before returning if !self.required_extensions.is_empty() { - let require_extensions = RequestExtensions { - request_id: 1, - requested_extensions: Seq064K::new(self.required_extensions.clone()).unwrap(), - }; + let negotiated = self.negotiate_extensions().await?; + return Ok(negotiated); + } - let sv2_frame: Sv2Frame = - AnyMessage::Extensions(require_extensions.into_static().into()) - .try_into() - .map_err(TproxyError::shutdown)?; + Ok(vec![]) + } + + /// Sends RequestExtensions and waits for the response. + /// + /// This method handles the extension negotiation flow: + /// 1. Sends RequestExtensions with required extensions + /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError + /// 3. Validates that all required extensions are supported + /// 4. Handles retry if server requires additional extensions we support + /// + /// # Returns + /// * `Ok(Vec)` - The list of successfully negotiated extensions + /// * `Err(TproxyError)` - Extension negotiation failed + async fn negotiate_extensions(&mut self) -> TproxyResult, error::Upstream> { + let request_extensions = RequestExtensions { + request_id: 1, + requested_extensions: Seq064K::new(self.required_extensions.clone()).unwrap(), + }; + + let sv2_frame: Sv2Frame = AnyMessage::Extensions(request_extensions.into_static().into()) + .try_into() + .map_err(TproxyError::shutdown)?; + + info!( + "Sending RequestExtensions to upstream with required extensions: {:?}", + self.required_extensions + ); + + self.upstream_channel_state + .upstream_sender + .send(sv2_frame) + .await + .map_err(|e| { + error!("Failed to send RequestExtensions to upstream: {:?}", e); + TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) + })?; - info!( - "Sending RequestExtensions message to upstream: {:?}", - sv2_frame + // Wait for extension negotiation response with timeout + let response = tokio::time::timeout( + Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), + self.upstream_channel_state.upstream_receiver.recv(), + ) + .await + .map_err(|_| { + error!( + "Extension negotiation timed out after {} seconds", + EXTENSION_NEGOTIATION_TIMEOUT_SECS ); + TproxyError::fallback(TproxyErrorKind::ExtensionNegotiationTimeout) + })? + .map_err(|e| { + error!("Failed to receive extension negotiation response: {}", e); + TproxyError::fallback(e) + })?; + + self.handle_extension_response(response).await + } + + /// Handles the extension negotiation response (Success or Error). + async fn handle_extension_response( + &mut self, + mut response: Sv2Frame, + ) -> TproxyResult, error::Upstream> { + let header = response.get_header().ok_or_else(|| { + error!("Extension response frame missing header"); + TproxyError::fallback(TproxyErrorKind::UnexpectedMessage(0, 0)) + })?; - self.upstream_channel_state - .upstream_sender - .send(sv2_frame) - .await - .map_err(|e| { - error!("Failed to send message to upstream: {:?}", e); - TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) + let msg_type = header.msg_type(); + let payload = response.payload(); + + // Message types for extension negotiation: + // 0x00 = RequestExtensions + // 0x01 = RequestExtensionsSuccess + // 0x02 = RequestExtensionsError + match msg_type { + 0x01 => { + // RequestExtensionsSuccess + let msg: RequestExtensionsSuccess = + binary_sv2::from_bytes(payload).map_err(|e| { + error!("Failed to parse RequestExtensionsSuccess: {:?}", e); + TproxyError::fallback(TproxyErrorKind::BinarySv2(e)) + })?; + + let supported: Vec = msg.supported_extensions.into_inner(); + info!("Extension negotiation success: supported={:?}", supported); + + // Check if all required extensions are supported + let missing_required: Vec = self + .required_extensions + .iter() + .filter(|ext| !supported.contains(ext)) + .copied() + .collect(); + + if !missing_required.is_empty() { + error!( + "Server does not support required extensions: {:?}", + missing_required + ); + return Err(TproxyError::fallback( + TproxyErrorKind::RequiredExtensionsNotSupported(missing_required), + )); + } + + info!("Successfully negotiated extensions: {:?}", supported); + Ok(supported) + } + 0x02 => { + // RequestExtensionsError + let msg: RequestExtensionsError = binary_sv2::from_bytes(payload).map_err(|e| { + error!("Failed to parse RequestExtensionsError: {:?}", e); + TproxyError::fallback(TproxyErrorKind::BinarySv2(e)) })?; + + let unsupported: Vec = msg.unsupported_extensions.into_inner(); + let required_by_server: Vec = msg.required_extensions.into_inner(); + + error!( + "Extension negotiation error: unsupported={:?}, required_by_server={:?}", + unsupported, required_by_server + ); + + // Check if any of our required extensions were not supported + let missing_required: Vec = self + .required_extensions + .iter() + .filter(|ext| unsupported.contains(ext)) + .copied() + .collect(); + + if !missing_required.is_empty() { + error!( + "Server does not support required extensions: {:?}", + missing_required + ); + return Err(TproxyError::fallback( + TproxyErrorKind::RequiredExtensionsNotSupported(missing_required), + )); + } + + // If server requires extensions we don't support, fail + if !required_by_server.is_empty() { + error!( + "Server requires extensions that we don't support: {:?}", + required_by_server + ); + return Err(TproxyError::fallback( + TproxyErrorKind::ServerRequiresUnsupportedExtensions(required_by_server), + )); + } + + // No required extensions failed, return empty (negotiation succeeded with no + // extensions) + Ok(vec![]) + } + _ => { + error!( + "Unexpected message type during extension negotiation: {}", + msg_type + ); + Err(TproxyError::fallback(TproxyErrorKind::UnexpectedMessage( + header.ext_type(), + msg_type, + ))) + } } - Ok(()) } /// Processes incoming messages from the upstream SV2 server. From 17667ad9804e2d20bab0febdf8ae108b26f73d7a Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Fri, 13 Feb 2026 13:16:14 -0500 Subject: [PATCH 2/8] jd-client: fix extension negotiation race condition Wait for RequestExtensionsSuccess/Error response before starting the downstream server. This ensures the ChannelManager knows which extensions are active BEFORE downstream miners can connect. Previously, RequestExtensions was sent asynchronously and the downstream server started immediately without waiting for the response. This caused a race condition where extension-dependent behavior could not be reliably determined. Changes: - setup_connection() now returns Vec of negotiated extensions - negotiate_extensions() handles the extension negotiation flow synchronously - handle_extension_response() parses RequestExtensionsSuccess/Error - Upstream::start() now returns the negotiated extensions - ChannelManager::set_negotiated_extensions() stores extensions before downstream server starts - Added ExtensionNegotiationTimeout error variant (30 second timeout) Fixes: stratum-mining/sv2-apps#264 --- .../jd-client/src/lib/channel_manager/mod.rs | 10 + miner-apps/jd-client/src/lib/error.rs | 15 +- miner-apps/jd-client/src/lib/mod.rs | 14 +- miner-apps/jd-client/src/lib/upstream/mod.rs | 215 +++++++++++++++--- 4 files changed, 220 insertions(+), 34 deletions(-) diff --git a/miner-apps/jd-client/src/lib/channel_manager/mod.rs b/miner-apps/jd-client/src/lib/channel_manager/mod.rs index 939e9731d..dbe7f1aa9 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/mod.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/mod.rs @@ -354,6 +354,16 @@ impl ChannelManager { Ok(channel_manager) } + /// Sets the negotiated extensions. + /// + /// This is used after upstream connection setup to store the extensions + /// that were successfully negotiated with the upstream server. + pub fn set_negotiated_extensions(&self, extensions: Vec) { + self.channel_manager_data.super_safe_lock(|data| { + data.negotiated_extensions = extensions; + }); + } + // Bootstraps a group channel with the given parameters. // Returns a `GroupChannel` if successful, otherwise returns `None`. // diff --git a/miner-apps/jd-client/src/lib/error.rs b/miner-apps/jd-client/src/lib/error.rs index cd245a3fa..e6a90ca24 100644 --- a/miner-apps/jd-client/src/lib/error.rs +++ b/miner-apps/jd-client/src/lib/error.rs @@ -224,9 +224,11 @@ pub enum JDCErrorKind { RequiredExtensionsNotSupported(Vec), /// Server requires extensions that the translator doesn't support ServerRequiresUnsupportedExtensions(Vec), - /// BitcoinCoreSv2TDP cancellation token activated - BitcoinCoreSv2TDPCancellationTokenActivated, - /// Failed to create BitcoinCoreSv2TDP tokio runtime + /// Extension negotiation timed out waiting for response + ExtensionNegotiationTimeout, + /// BitcoinCoreSv2 cancellation token activated + BitcoinCoreSv2CancellationTokenActivated, + /// Failed to create BitcoinCoreSv2 tokio runtime FailedToCreateBitcoinCoreTokioRuntime, /// Failed to send CoinbaseOutputConstraints message FailedToSendCoinbaseOutputConstraints, @@ -368,8 +370,11 @@ impl fmt::Display for JDCErrorKind { ServerRequiresUnsupportedExtensions(extensions) => { write!(f, "Server requires extensions that the translator doesn't support: {extensions:?}") } - BitcoinCoreSv2TDPCancellationTokenActivated => { - write!(f, "BitcoinCoreSv2TDP cancellation token activated") + ExtensionNegotiationTimeout => { + write!(f, "Extension negotiation timed out waiting for response") + } + BitcoinCoreSv2CancellationTokenActivated => { + write!(f, "BitcoinCoreSv2 cancellation token activated") } FailedToCreateBitcoinCoreTokioRuntime => { write!(f, "Failed to create BitcoinCoreSv2TDP tokio runtime") diff --git a/miner-apps/jd-client/src/lib/mod.rs b/miner-apps/jd-client/src/lib/mod.rs index afe94a853..2606db878 100644 --- a/miner-apps/jd-client/src/lib/mod.rs +++ b/miner-apps/jd-client/src/lib/mod.rs @@ -291,7 +291,10 @@ impl JobDeclaratorClient { .await { Ok((upstream, job_declarator)) => { - upstream + // Start upstream and wait for extension negotiation to complete + // This returns the negotiated extensions which we need to store + // BEFORE starting the downstream server + let negotiated_extensions = upstream .start( self.config.min_supported_version(), self.config.max_supported_version(), @@ -302,6 +305,15 @@ impl JobDeclaratorClient { ) .await; + // Store the negotiated extensions in the ChannelManager + // This ensures the ChannelManager knows which extensions are active + // BEFORE the downstream server starts accepting connections + info!( + "Upstream extension negotiation complete. Negotiated extensions: {:?}", + negotiated_extensions + ); + channel_manager_clone.set_negotiated_extensions(negotiated_extensions); + job_declarator .start( self.cancellation_token.clone(), diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index 34d3cb533..ace92d491 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -6,20 +6,27 @@ //! Responsibilities: //! - Establish a TCP + Noise encrypted connection to upstream //! - Perform `SetupConnection` handshake +//! - Negotiate extensions synchronously before returning //! - Forward SV2 mining messages between upstream and channel manager //! - Handle common messages from upstream -use std::{net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use async_channel::{unbounded, Receiver, Sender}; use bitcoin_core_sv2::template_distribution_protocol::CancellationToken; use stratum_apps::{ custom_mutex::Mutex, fallback_coordinator::FallbackCoordinator, - network_helpers::{connect_with_noise, resolve_host}, + key_utils::Secp256k1PublicKey, + network_helpers::{connect_with_noise, noise_stream::NoiseTcpStream, resolve_host}, stratum_core::{ - binary_sv2::Seq064K, extensions_sv2::RequestExtensions, framing_sv2, - handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::AnyMessage, + binary_sv2::{self, Seq064K}, + codec_sv2::HandshakeRole, + extensions_sv2::{RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess}, + framing_sv2, + handlers_sv2::HandleCommonMessagesFromServerAsync, + noise_sv2::Initiator, + parsers_sv2::AnyMessage, }, task_manager::TaskManager, utils::{ @@ -37,6 +44,9 @@ use crate::{ utils::{get_setup_connection_message, UpstreamEntry}, }; +/// Timeout for extension negotiation response (30 seconds) +const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 30; + mod message_handler; /// Placeholder for future upstream-specific data/state. @@ -151,11 +161,17 @@ impl Upstream { /// Perform `SetupConnection` handshake with upstream. /// /// Sends [`SetupConnection`] and awaits response. + /// If required extensions are configured, negotiates them synchronously + /// before returning. + /// + /// # Returns + /// * `Ok(Vec)` - The list of negotiated extensions (empty if none were requested) + /// * `Err(JDCError)` - Error during handshake or extension negotiation pub async fn setup_connection( &mut self, min_version: u16, max_version: u16, - ) -> JDCResult<(), error::Upstream> { + ) -> JDCResult, error::Upstream> { info!("Upstream: initiating SV2 handshake..."); let setup_connection = get_setup_connection_message(min_version, max_version, &self.address) @@ -197,25 +213,26 @@ impl Upstream { .await?; // Send RequestExtensions after successful SetupConnection if there are required extensions + // and wait for the response before returning if !self.required_extensions.is_empty() { - self.send_request_extensions().await?; + let negotiated = self.negotiate_extensions().await?; + return Ok(negotiated); } - Ok(()) + Ok(vec![]) } - /// Send `RequestExtensions` message to upstream. - /// The supported extensions are stored for potential retry if the server requires additional - /// extensions. - async fn send_request_extensions(&mut self) -> JDCResult<(), error::Upstream> { - info!( - "Sending RequestExtensions to upstream with required extensions: {:?}", - self.required_extensions - ); - if self.required_extensions.is_empty() { - return Ok(()); - } - + /// Sends RequestExtensions and waits for the response. + /// + /// This method handles the extension negotiation flow: + /// 1. Sends RequestExtensions with required extensions + /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError + /// 3. Validates that all required extensions are supported + /// + /// # Returns + /// * `Ok(Vec)` - The list of successfully negotiated extensions + /// * `Err(JDCError)` - Extension negotiation failed + async fn negotiate_extensions(&mut self) -> JDCResult, error::Upstream> { let requested_extensions = Seq064K::new(self.required_extensions.clone()).map_err(JDCError::shutdown)?; @@ -242,18 +259,149 @@ impl Upstream { JDCError::fallback(JDCErrorKind::ChannelErrorSender) })?; - info!("Sent RequestExtensions to upstream"); - Ok(()) + // Wait for extension negotiation response with timeout + let response = tokio::time::timeout( + Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), + self.upstream_channel.upstream_receiver.recv(), + ) + .await + .map_err(|_| { + error!( + "Extension negotiation timed out after {} seconds", + EXTENSION_NEGOTIATION_TIMEOUT_SECS + ); + JDCError::fallback(JDCErrorKind::ExtensionNegotiationTimeout) + })? + .map_err(|e| { + error!("Failed to receive extension negotiation response: {}", e); + JDCError::fallback(e) + })?; + + self.handle_extension_response(response).await + } + + /// Handles the extension negotiation response (Success or Error). + async fn handle_extension_response( + &mut self, + mut response: Sv2Frame, + ) -> JDCResult, error::Upstream> { + let header = response.get_header().ok_or_else(|| { + error!("Extension response frame missing header"); + JDCError::fallback(JDCErrorKind::UnexpectedMessage(0, 0)) + })?; + + let msg_type = header.msg_type(); + let payload = response.payload(); + + // Message types for extension negotiation: + // 0x00 = RequestExtensions + // 0x01 = RequestExtensionsSuccess + // 0x02 = RequestExtensionsError + match msg_type { + 0x01 => { + // RequestExtensionsSuccess + let msg: RequestExtensionsSuccess = + binary_sv2::from_bytes(payload).map_err(|e| { + error!("Failed to parse RequestExtensionsSuccess: {:?}", e); + JDCError::fallback(JDCErrorKind::BinarySv2(e)) + })?; + + let supported: Vec = msg.supported_extensions.into_inner(); + info!("Extension negotiation success: supported={:?}", supported); + + // Check if all required extensions are supported + let missing_required: Vec = self + .required_extensions + .iter() + .filter(|ext| !supported.contains(ext)) + .copied() + .collect(); + + if !missing_required.is_empty() { + error!( + "Server does not support required extensions: {:?}", + missing_required + ); + return Err(JDCError::fallback( + JDCErrorKind::RequiredExtensionsNotSupported(missing_required), + )); + } + + info!("Successfully negotiated extensions: {:?}", supported); + Ok(supported) + } + 0x02 => { + // RequestExtensionsError + let msg: RequestExtensionsError = binary_sv2::from_bytes(payload).map_err(|e| { + error!("Failed to parse RequestExtensionsError: {:?}", e); + JDCError::fallback(JDCErrorKind::BinarySv2(e)) + })?; + + let unsupported: Vec = msg.unsupported_extensions.into_inner(); + let required_by_server: Vec = msg.required_extensions.into_inner(); + + error!( + "Extension negotiation error: unsupported={:?}, required_by_server={:?}", + unsupported, required_by_server + ); + + // Check if any of our required extensions were not supported + let missing_required: Vec = self + .required_extensions + .iter() + .filter(|ext| unsupported.contains(ext)) + .copied() + .collect(); + + if !missing_required.is_empty() { + error!( + "Server does not support required extensions: {:?}", + missing_required + ); + return Err(JDCError::fallback( + JDCErrorKind::RequiredExtensionsNotSupported(missing_required), + )); + } + + // If server requires extensions we don't support, fail + if !required_by_server.is_empty() { + error!( + "Server requires extensions that we don't support: {:?}", + required_by_server + ); + return Err(JDCError::fallback( + JDCErrorKind::ServerRequiresUnsupportedExtensions(required_by_server), + )); + } + + // No required extensions failed, return empty + Ok(vec![]) + } + _ => { + error!( + "Unexpected message type during extension negotiation: {}", + msg_type + ); + Err(JDCError::fallback(JDCErrorKind::UnexpectedMessage( + header.ext_type(), + msg_type, + ))) + } + } } /// Start unified upstream loop. /// /// Responsibilities: - /// - Run `setup_connection` + /// - Run `setup_connection` (including extension negotiation) /// - Handle messages from upstream (pool) and channel manager /// - React to shutdown signals /// - /// This function spawns an async task and returns immediately. + /// This function spawns an async task and returns the negotiated extensions. + /// + /// # Returns + /// * `Vec` - The list of negotiated extensions (empty if none were requested or setup + /// failed) #[allow(clippy::too_many_arguments)] pub async fn start( mut self, @@ -263,13 +411,22 @@ impl Upstream { fallback_coordinator: FallbackCoordinator, status_sender: Sender, task_manager: Arc, - ) { + ) -> Vec { let status_sender = StatusSender::Upstream(status_sender); - if let Err(e) = self.setup_connection(min_version, max_version).await { - error!(error = ?e, "Upstream: connection setup failed."); - return; - } + let negotiated_extensions = match self.setup_connection(min_version, max_version).await { + Ok(extensions) => { + info!( + "Upstream: extension negotiation complete. Extensions: {:?}", + extensions + ); + extensions + } + Err(e) => { + error!(error = ?e, "Upstream: connection setup failed."); + return vec![]; + } + }; task_manager.spawn(async move { // we just spawned a new task that's relevant to fallback coordination @@ -315,6 +472,8 @@ impl Upstream { // signal fallback coordinator that this task has completed its cleanup fallback_handler.done(); }); + + negotiated_extensions } // Handle incoming frames from upstream (pool). From d9ae6ac8925ab601d74dfe87bfdf7e1a85792026 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Fri, 13 Feb 2026 13:19:17 -0500 Subject: [PATCH 3/8] translator: truncate username to 32 bytes for UserIdentity TLV When a miner's username exceeds 32 bytes (the protocol limit for the UserIdentity TLV in SV2 extensions), truncate it at a valid UTF-8 character boundary and log a warning. This ensures usernames are passed through to the pool even when they exceed the limit, preserving user recognition on pool dashboards while maintaining protocol compliance. --- .../sv1_server/downstream_message_handler.rs | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs index 4a47a265a..d87ed6d86 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs @@ -12,6 +12,33 @@ use crate::{ utils::{validate_sv1_share, AGGREGATED_CHANNEL_ID}, }; +/// Maximum length for user identity in bytes. +/// This is a protocol limit from the UserIdentity TLV in SV2 extensions. +const MAX_USER_IDENTITY_BYTES: usize = 32; + +/// Truncates a string to a maximum byte length, respecting UTF-8 character boundaries. +/// +/// If the input string exceeds `max_bytes`, it is truncated at the last valid +/// UTF-8 character boundary before or at `max_bytes`. +/// +/// # Arguments +/// * `s` - The input string to potentially truncate +/// * `max_bytes` - Maximum number of bytes allowed +/// +/// # Returns +/// A string slice that is at most `max_bytes` long +fn truncate_to_bytes(s: &str, max_bytes: usize) -> &str { + if s.len() <= max_bytes { + return s; + } + // Find the last valid UTF-8 char boundary at or before max_bytes + let mut end = max_bytes; + while end > 0 && !s.is_char_boundary(end) { + end -= 1; + } + &s[..end] +} + // Implements `IsServer` for `Sv1Server` to handle the Sv1 messages. #[hotpath::measure_all] impl IsServer<'static> for Sv1Server { @@ -192,6 +219,9 @@ impl IsServer<'static> for Sv1Server { } /// Authorizes a Downstream role. + /// + /// If the username exceeds 32 bytes (the protocol limit for UserIdentity TLV), + /// it will be truncated and a warning will be logged. fn authorize(&mut self, client_id: Option, name: &str) { let downstream_id = client_id.expect("Downstream id should exist"); let downstream = self @@ -200,11 +230,29 @@ impl IsServer<'static> for Sv1Server { .expect("Downstream should exist"); let is_authorized = self.is_authorized(client_id, name); + + // Truncate user_identity if it exceeds the protocol limit (32 bytes) + let user_identity = if name.len() > MAX_USER_IDENTITY_BYTES { + let truncated = truncate_to_bytes(name, MAX_USER_IDENTITY_BYTES); + warn!( + "Downstream {}: Username '{}' exceeds {} bytes ({} bytes), truncating to '{}'. \ + Consider using a shorter username for full visibility on the pool dashboard.", + downstream_id, + name, + MAX_USER_IDENTITY_BYTES, + name.len(), + truncated + ); + truncated.to_string() + } else { + name.to_string() + }; + downstream.downstream_data.super_safe_lock(|data| { if !is_authorized { data.authorized_worker_name = name.to_string(); } - data.user_identity = name.to_string(); + data.user_identity = user_identity.clone(); debug!( "Down: Set user_identity to '{}' for downstream {}", data.user_identity, downstream_id From 833ba036040ce99f9d2eb8f41f788c070e32f8cb Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Tue, 17 Feb 2026 09:55:27 -0500 Subject: [PATCH 4/8] refactor: use named constants for extension message types Replace magic numbers 0x01 and 0x02 with MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS and MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR constants for better readability. --- miner-apps/jd-client/src/lib/upstream/mod.rs | 15 ++++++--------- .../translator/src/lib/sv2/upstream/upstream.rs | 15 ++++++--------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index ace92d491..3fd0f3c57 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -22,7 +22,10 @@ use stratum_apps::{ stratum_core::{ binary_sv2::{self, Seq064K}, codec_sv2::HandshakeRole, - extensions_sv2::{RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess}, + extensions_sv2::{ + RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess, + MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR, MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS, + }, framing_sv2, handlers_sv2::HandleCommonMessagesFromServerAsync, noise_sv2::Initiator, @@ -293,13 +296,8 @@ impl Upstream { let msg_type = header.msg_type(); let payload = response.payload(); - // Message types for extension negotiation: - // 0x00 = RequestExtensions - // 0x01 = RequestExtensionsSuccess - // 0x02 = RequestExtensionsError match msg_type { - 0x01 => { - // RequestExtensionsSuccess + MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS => { let msg: RequestExtensionsSuccess = binary_sv2::from_bytes(payload).map_err(|e| { error!("Failed to parse RequestExtensionsSuccess: {:?}", e); @@ -330,8 +328,7 @@ impl Upstream { info!("Successfully negotiated extensions: {:?}", supported); Ok(supported) } - 0x02 => { - // RequestExtensionsError + MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR => { let msg: RequestExtensionsError = binary_sv2::from_bytes(payload).map_err(|e| { error!("Failed to parse RequestExtensionsError: {:?}", e); JDCError::fallback(JDCErrorKind::BinarySv2(e)) diff --git a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs index aa913eab1..663dbc66d 100644 --- a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs +++ b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs @@ -14,7 +14,10 @@ use stratum_apps::{ binary_sv2::{self, Seq064K}, codec_sv2::HandshakeRole, common_messages_sv2::{Protocol, SetupConnection}, - extensions_sv2::{RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess}, + extensions_sv2::{ + RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess, + MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR, MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS, + }, handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::{AnyMessage, Mining}, }, @@ -376,13 +379,8 @@ impl Upstream { let msg_type = header.msg_type(); let payload = response.payload(); - // Message types for extension negotiation: - // 0x00 = RequestExtensions - // 0x01 = RequestExtensionsSuccess - // 0x02 = RequestExtensionsError match msg_type { - 0x01 => { - // RequestExtensionsSuccess + MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS => { let msg: RequestExtensionsSuccess = binary_sv2::from_bytes(payload).map_err(|e| { error!("Failed to parse RequestExtensionsSuccess: {:?}", e); @@ -413,8 +411,7 @@ impl Upstream { info!("Successfully negotiated extensions: {:?}", supported); Ok(supported) } - 0x02 => { - // RequestExtensionsError + MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR => { let msg: RequestExtensionsError = binary_sv2::from_bytes(payload).map_err(|e| { error!("Failed to parse RequestExtensionsError: {:?}", e); TproxyError::fallback(TproxyErrorKind::BinarySv2(e)) From 1e4857e1a733acab2807d410d5f5b32db1547386 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Tue, 17 Feb 2026 10:10:23 -0500 Subject: [PATCH 5/8] refactor: remove UpstreamInitResult wrapper, use Vec directly Align Translator with JDC by returning Vec directly from initialize_upstream() instead of wrapping it in UpstreamInitResult. This simplifies the code and makes both implementations consistent. --- miner-apps/translator/src/lib/mod.rs | 41 ++++++++++------------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/miner-apps/translator/src/lib/mod.rs b/miner-apps/translator/src/lib/mod.rs index c44c9b42f..9e1adfeaf 100644 --- a/miner-apps/translator/src/lib/mod.rs +++ b/miner-apps/translator/src/lib/mod.rs @@ -134,7 +134,7 @@ impl TranslatorSv2 { info!("Initializing upstream connection..."); - let init_result = match self + let negotiated_extensions = match self .initialize_upstream( &mut upstream_addresses, channel_manager_to_upstream_receiver.clone(), @@ -148,7 +148,7 @@ impl TranslatorSv2 { ) .await { - Ok(result) => result, + Ok(extensions) => extensions, Err(e) => { error!("Failed to initialize any upstream connection: {e:?}"); self.shutdown_notify.notify_waiters(); @@ -168,7 +168,7 @@ impl TranslatorSv2 { status_sender.clone(), self.config.supported_extensions.clone(), self.config.required_extensions.clone(), - init_result.negotiated_extensions, + negotiated_extensions, )); info!("Launching ChannelManager tasks..."); @@ -303,10 +303,10 @@ impl TranslatorSv2 { sv1_server.clone(), self.config.required_extensions.clone(), ).await { - Ok(fallback_result) => { + Ok(negotiated_extensions) => { info!( "Upstream restarted successfully with extensions: {:?}", - fallback_result.negotiated_extensions + negotiated_extensions ); channel_manager = Arc::new(ChannelManager::new( @@ -317,7 +317,7 @@ impl TranslatorSv2 { status_sender.clone(), self.config.supported_extensions.clone(), self.config.required_extensions.clone(), - fallback_result.negotiated_extensions, + negotiated_extensions, )); } Err(e) => { @@ -442,7 +442,7 @@ impl TranslatorSv2 { /// to avoid hammering known-bad endpoints during failover. /// /// # Returns - /// * `Ok(UpstreamInitResult)` - Contains the negotiated extensions to be passed to ChannelManager + /// * `Ok(Vec)` - The negotiated extensions to be passed to ChannelManager /// * `Err(TproxyErrorKind)` - All upstreams failed #[allow(clippy::too_many_arguments)] pub async fn initialize_upstream( @@ -456,7 +456,7 @@ impl TranslatorSv2 { task_manager: Arc, sv1_server_instance: Arc, required_extensions: Vec, - ) -> Result { + ) -> Result, TproxyErrorKind> { const MAX_RETRIES: usize = 3; let upstream_len = upstreams.len(); for (i, upstream_entry) in upstreams.iter_mut().enumerate() { @@ -492,10 +492,10 @@ impl TranslatorSv2 { ) .await { - Ok(init_result) => { + Ok(negotiated_extensions) => { info!( "Extension negotiation complete. Negotiated extensions: {:?}", - init_result.negotiated_extensions + negotiated_extensions ); // Now that extensions are negotiated, start the SV1 server @@ -514,7 +514,7 @@ impl TranslatorSv2 { } upstream_entry.tried_or_flagged = true; - return Ok(init_result); + return Ok(negotiated_extensions); } Err(e) => { warn!( @@ -538,14 +538,6 @@ impl TranslatorSv2 { } } -/// Result of successful upstream initialization. -/// Contains the negotiated extensions that should be passed to the ChannelManager. -pub struct UpstreamInitResult { - /// Extensions that were successfully negotiated with the upstream server. - /// This should be stored in the ChannelManager before starting the SV1 server. - pub negotiated_extensions: Vec, -} - // Attempts to initialize a single upstream. #[allow(clippy::too_many_arguments)] #[cfg_attr(not(test), hotpath::measure)] @@ -558,7 +550,7 @@ async fn try_initialize_upstream( status_sender: Sender, task_manager: Arc, required_extensions: Vec, -) -> Result { +) -> Result, TproxyErrorKind> { let upstream = Upstream::new( upstream_addr, upstream_to_channel_manager_sender, @@ -570,18 +562,15 @@ async fn try_initialize_upstream( ) .await?; - let negotiated_extensions = upstream + upstream .start( cancellation_token, fallback_coordinator, status_sender, task_manager, ) - .await?; - - Ok(UpstreamInitResult { - negotiated_extensions, - }) + .await + .map_err(|e| e.kind) } /// Defines the operational mode for Translator Proxy. From 5962f24b1ce41846f69cd1b5ef2339b86cc9776b Mon Sep 17 00:00:00 2001 From: average-gary <165834679+average-gary@users.noreply.github.com> Date: Thu, 5 Mar 2026 11:31:05 -0500 Subject: [PATCH 6/8] refactor: extension negotiation timeout Co-authored-by: Gabriele Vernetti <62447440+GitGab19@users.noreply.github.com> --- miner-apps/jd-client/src/lib/upstream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index 3fd0f3c57..5084925b5 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -48,7 +48,7 @@ use crate::{ }; /// Timeout for extension negotiation response (30 seconds) -const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 30; +const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 10; mod message_handler; From eb7585866893fc1d3356db2bb4869f9b0f140fb8 Mon Sep 17 00:00:00 2001 From: Gary Krause Date: Wed, 11 Mar 2026 10:36:51 -0400 Subject: [PATCH 7/8] refactor: delegate extension negotiation to HandleExtensionsFromServerAsync trait MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In both JDC and Translator, `handle_extension_response` was manually reimplementing the parsing and validation logic already present in each `ChannelManager`'s `extensions_message_handler.rs` trait impl. Replace that duplicated logic by calling `handle_extensions_message_frame_from_server` directly on the `ChannelManager`, which internally dispatches to the existing `handle_request_extensions_success` / `handle_request_extensions_error` handlers. To support this, thread `channel_manager: &mut ChannelManager` through the call chains: start() → setup_connection() → negotiate_extensions() → handle_extension_response() The `ChannelManager` is now created before `initialize_upstream` / `upstream.start()` so the trait impl has access to it during extension negotiation and can store the result internally. The caller no longer needs to pass negotiated extensions back to the `ChannelManager` after the fact. A retry forwarding loop is added to `negotiate_extensions`: after each `handle_extension_response` call, if the trait impl sent a retry `RequestExtensions` frame via its upstream channel, the frame is detected via `try_recv()` and forwarded to the pool before looping to await the next response. --- miner-apps/jd-client/src/lib/mod.rs | 10 +- miner-apps/jd-client/src/lib/upstream/mod.rs | 203 +++++++---------- miner-apps/translator/src/lib/mod.rs | 98 +++++---- .../src/lib/sv2/upstream/upstream.rs | 208 +++++++----------- 4 files changed, 215 insertions(+), 304 deletions(-) diff --git a/miner-apps/jd-client/src/lib/mod.rs b/miner-apps/jd-client/src/lib/mod.rs index 2606db878..3ca80b9a4 100644 --- a/miner-apps/jd-client/src/lib/mod.rs +++ b/miner-apps/jd-client/src/lib/mod.rs @@ -291,9 +291,7 @@ impl JobDeclaratorClient { .await { Ok((upstream, job_declarator)) => { - // Start upstream and wait for extension negotiation to complete - // This returns the negotiated extensions which we need to store - // BEFORE starting the downstream server + // Start upstream and wait for extension negotiation to complete. let negotiated_extensions = upstream .start( self.config.min_supported_version(), @@ -302,17 +300,14 @@ impl JobDeclaratorClient { fallback_coordinator.clone(), status_sender.clone(), task_manager.clone(), + &mut channel_manager_clone, ) .await; - // Store the negotiated extensions in the ChannelManager - // This ensures the ChannelManager knows which extensions are active - // BEFORE the downstream server starts accepting connections info!( "Upstream extension negotiation complete. Negotiated extensions: {:?}", negotiated_extensions ); - channel_manager_clone.set_negotiated_extensions(negotiated_extensions); job_declarator .start( @@ -471,6 +466,7 @@ impl JobDeclaratorClient { fallback_coordinator.clone(), status_sender.clone(), task_manager.clone(), + &mut channel_manager, ) .await; diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index 5084925b5..fa4cb7300 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -20,15 +20,10 @@ use stratum_apps::{ key_utils::Secp256k1PublicKey, network_helpers::{connect_with_noise, noise_stream::NoiseTcpStream, resolve_host}, stratum_core::{ - binary_sv2::{self, Seq064K}, - codec_sv2::HandshakeRole, - extensions_sv2::{ - RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess, - MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR, MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS, - }, + binary_sv2::Seq064K, + extensions_sv2::RequestExtensions, framing_sv2, - handlers_sv2::HandleCommonMessagesFromServerAsync, - noise_sv2::Initiator, + handlers_sv2::{HandleCommonMessagesFromServerAsync, HandleExtensionsFromServerAsync}, parsers_sv2::AnyMessage, }, task_manager::TaskManager, @@ -41,13 +36,14 @@ use tokio::net::TcpStream; use tracing::{debug, error, info, warn}; use crate::{ + channel_manager::ChannelManager, error::{self, JDCError, JDCErrorKind, JDCResult}, io_task::spawn_io_tasks, status::{handle_error, Status, StatusSender}, utils::{get_setup_connection_message, UpstreamEntry}, }; -/// Timeout for extension negotiation response (30 seconds) +/// Timeout for extension negotiation response (10 seconds) const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 10; mod message_handler; @@ -174,6 +170,7 @@ impl Upstream { &mut self, min_version: u16, max_version: u16, + channel_manager: &mut ChannelManager, ) -> JDCResult, error::Upstream> { info!("Upstream: initiating SV2 handshake..."); let setup_connection = @@ -218,7 +215,7 @@ impl Upstream { // Send RequestExtensions after successful SetupConnection if there are required extensions // and wait for the response before returning if !self.required_extensions.is_empty() { - let negotiated = self.negotiate_extensions().await?; + let negotiated = self.negotiate_extensions(channel_manager).await?; return Ok(negotiated); } @@ -230,12 +227,18 @@ impl Upstream { /// This method handles the extension negotiation flow: /// 1. Sends RequestExtensions with required extensions /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError - /// 3. Validates that all required extensions are supported + /// 3. Delegates response handling to the `ChannelManager` via its + /// `HandleExtensionsFromServerAsync` trait implementation + /// 4. If the server requires additional extensions we support, the ChannelManager + /// sends a retry `RequestExtensions`; this method detects and forwards it /// /// # Returns /// * `Ok(Vec)` - The list of successfully negotiated extensions /// * `Err(JDCError)` - Extension negotiation failed - async fn negotiate_extensions(&mut self) -> JDCResult, error::Upstream> { + async fn negotiate_extensions( + &mut self, + channel_manager: &mut ChannelManager, + ) -> JDCResult, error::Upstream> { let requested_extensions = Seq064K::new(self.required_extensions.clone()).map_err(JDCError::shutdown)?; @@ -262,129 +265,77 @@ impl Upstream { JDCError::fallback(JDCErrorKind::ChannelErrorSender) })?; - // Wait for extension negotiation response with timeout - let response = tokio::time::timeout( - Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), - self.upstream_channel.upstream_receiver.recv(), - ) - .await - .map_err(|_| { - error!( - "Extension negotiation timed out after {} seconds", - EXTENSION_NEGOTIATION_TIMEOUT_SECS - ); - JDCError::fallback(JDCErrorKind::ExtensionNegotiationTimeout) - })? - .map_err(|e| { - error!("Failed to receive extension negotiation response: {}", e); - JDCError::fallback(e) - })?; + loop { + // Wait for extension negotiation response with timeout + let response = tokio::time::timeout( + Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), + self.upstream_channel.upstream_receiver.recv(), + ) + .await + .map_err(|_| { + error!( + "Extension negotiation timed out after {} seconds", + EXTENSION_NEGOTIATION_TIMEOUT_SECS + ); + JDCError::fallback(JDCErrorKind::ExtensionNegotiationTimeout) + })? + .map_err(|e| { + error!("Failed to receive extension negotiation response: {}", e); + JDCError::fallback(e) + })?; + + // Delegate response handling to the ChannelManager's trait implementation. + // This checks the frame, validates required extensions, and may send a + // retry RequestExtensions if the server requires extensions we support. + self.handle_extension_response(response, channel_manager) + .await?; - self.handle_extension_response(response).await + // If the ChannelManager sent a retry RequestExtensions (via its upstream_sender), + // pick it up and forward it directly to the pool, then loop to await the next response. + if let Ok(retry_frame) = self.upstream_channel.channel_manager_receiver.try_recv() { + info!("Forwarding retry RequestExtensions to upstream pool..."); + self.upstream_channel + .upstream_sender + .send(retry_frame) + .await + .map_err(|e| { + error!(?e, "Failed to forward retry RequestExtensions to pool"); + JDCError::fallback(JDCErrorKind::ChannelErrorSender) + })?; + continue; + } + + // No retry pending — negotiation is complete. + // Return the extensions stored by the ChannelManager's trait implementation. + return channel_manager + .get_negotiated_extensions_with_server(None) + .map_err(|e| JDCError::fallback(e.kind)); + } } - /// Handles the extension negotiation response (Success or Error). + /// Checks that the response is an extension message and delegates handling to the + /// `ChannelManager` via its `HandleExtensionsFromServerAsync` trait implementation. + /// + /// The ChannelManager's implementation in `extensions_message_handler.rs` performs + /// validation and stores the negotiated extensions. On a `RequestExtensionsError` + /// where the server requires extensions we support, it sends a retry `RequestExtensions` + /// via its upstream channel. async fn handle_extension_response( &mut self, mut response: Sv2Frame, - ) -> JDCResult, error::Upstream> { + channel_manager: &mut ChannelManager, + ) -> JDCResult<(), error::Upstream> { let header = response.get_header().ok_or_else(|| { error!("Extension response frame missing header"); JDCError::fallback(JDCErrorKind::UnexpectedMessage(0, 0)) })?; - let msg_type = header.msg_type(); - let payload = response.payload(); - - match msg_type { - MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS => { - let msg: RequestExtensionsSuccess = - binary_sv2::from_bytes(payload).map_err(|e| { - error!("Failed to parse RequestExtensionsSuccess: {:?}", e); - JDCError::fallback(JDCErrorKind::BinarySv2(e)) - })?; - - let supported: Vec = msg.supported_extensions.into_inner(); - info!("Extension negotiation success: supported={:?}", supported); - - // Check if all required extensions are supported - let missing_required: Vec = self - .required_extensions - .iter() - .filter(|ext| !supported.contains(ext)) - .copied() - .collect(); - - if !missing_required.is_empty() { - error!( - "Server does not support required extensions: {:?}", - missing_required - ); - return Err(JDCError::fallback( - JDCErrorKind::RequiredExtensionsNotSupported(missing_required), - )); - } - - info!("Successfully negotiated extensions: {:?}", supported); - Ok(supported) - } - MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR => { - let msg: RequestExtensionsError = binary_sv2::from_bytes(payload).map_err(|e| { - error!("Failed to parse RequestExtensionsError: {:?}", e); - JDCError::fallback(JDCErrorKind::BinarySv2(e)) - })?; - - let unsupported: Vec = msg.unsupported_extensions.into_inner(); - let required_by_server: Vec = msg.required_extensions.into_inner(); - - error!( - "Extension negotiation error: unsupported={:?}, required_by_server={:?}", - unsupported, required_by_server - ); - - // Check if any of our required extensions were not supported - let missing_required: Vec = self - .required_extensions - .iter() - .filter(|ext| unsupported.contains(ext)) - .copied() - .collect(); - - if !missing_required.is_empty() { - error!( - "Server does not support required extensions: {:?}", - missing_required - ); - return Err(JDCError::fallback( - JDCErrorKind::RequiredExtensionsNotSupported(missing_required), - )); - } - - // If server requires extensions we don't support, fail - if !required_by_server.is_empty() { - error!( - "Server requires extensions that we don't support: {:?}", - required_by_server - ); - return Err(JDCError::fallback( - JDCErrorKind::ServerRequiresUnsupportedExtensions(required_by_server), - )); - } + channel_manager + .handle_extensions_message_frame_from_server(None, header, response.payload()) + .await + .map_err(|e| JDCError::fallback(e.kind))?; - // No required extensions failed, return empty - Ok(vec![]) - } - _ => { - error!( - "Unexpected message type during extension negotiation: {}", - msg_type - ); - Err(JDCError::fallback(JDCErrorKind::UnexpectedMessage( - header.ext_type(), - msg_type, - ))) - } - } + Ok(()) } /// Start unified upstream loop. @@ -408,10 +359,14 @@ impl Upstream { fallback_coordinator: FallbackCoordinator, status_sender: Sender, task_manager: Arc, + channel_manager: &mut ChannelManager, ) -> Vec { let status_sender = StatusSender::Upstream(status_sender); - let negotiated_extensions = match self.setup_connection(min_version, max_version).await { + let negotiated_extensions = match self + .setup_connection(min_version, max_version, channel_manager) + .await + { Ok(extensions) => { info!( "Upstream: extension negotiation complete. Extensions: {:?}", diff --git a/miner-apps/translator/src/lib/mod.rs b/miner-apps/translator/src/lib/mod.rs index 9e1adfeaf..618436864 100644 --- a/miner-apps/translator/src/lib/mod.rs +++ b/miner-apps/translator/src/lib/mod.rs @@ -134,7 +134,21 @@ impl TranslatorSv2 { info!("Initializing upstream connection..."); - let negotiated_extensions = match self + // Create ChannelManager BEFORE initialize_upstream so the HandleExtensionsFromServerAsync + // trait impl can store negotiated extensions in it during extension negotiation. + // The ChannelManager holds the CM-side channel ends; Upstream-side ends go to initialize_upstream. + let mut channel_manager_raw = ChannelManager::new( + channel_manager_to_upstream_sender, + upstream_to_channel_manager_receiver, + channel_manager_to_sv1_server_sender.clone(), + sv1_server_to_channel_manager_receiver, + status_sender.clone(), + self.config.supported_extensions.clone(), + self.config.required_extensions.clone(), + vec![], + ); + + if let Err(e) = self .initialize_upstream( &mut upstream_addresses, channel_manager_to_upstream_receiver.clone(), @@ -145,31 +159,18 @@ impl TranslatorSv2 { task_manager.clone(), sv1_server.clone(), self.config.required_extensions.clone(), + &mut channel_manager_raw, ) .await { - Ok(extensions) => extensions, - Err(e) => { - error!("Failed to initialize any upstream connection: {e:?}"); - self.shutdown_notify.notify_waiters(); - self.is_alive.store(false, Ordering::Relaxed); - return; - } - }; + error!("Failed to initialize any upstream connection: {e:?}"); + self.shutdown_notify.notify_waiters(); + self.is_alive.store(false, Ordering::Relaxed); + return; + } - // Create ChannelManager with the negotiated extensions from upstream - // This ensures the ChannelManager knows which extensions are active - // BEFORE the SV1 server starts accepting connections - let mut channel_manager: Arc = Arc::new(ChannelManager::new( - channel_manager_to_upstream_sender, - upstream_to_channel_manager_receiver, - channel_manager_to_sv1_server_sender.clone(), - sv1_server_to_channel_manager_receiver, - status_sender.clone(), - self.config.supported_extensions.clone(), - self.config.required_extensions.clone(), - negotiated_extensions, - )); + // Wrap in Arc now that negotiation is complete and extensions are stored in channel_manager_raw + let mut channel_manager: Arc = Arc::new(channel_manager_raw); info!("Launching ChannelManager tasks..."); ChannelManager::run_channel_manager_tasks( @@ -292,6 +293,19 @@ impl TranslatorSv2 { self.config.clone(), )); + // Create ChannelManager BEFORE initialize_upstream so the trait + // impl can store negotiated extensions in it during negotiation. + let mut channel_manager_raw = ChannelManager::new( + channel_manager_to_upstream_sender, + upstream_to_channel_manager_receiver, + channel_manager_to_sv1_server_sender, + sv1_server_to_channel_manager_receiver, + status_sender.clone(), + self.config.supported_extensions.clone(), + self.config.required_extensions.clone(), + vec![], + ); + match self.initialize_upstream( &mut upstream_addresses, channel_manager_to_upstream_receiver, @@ -302,23 +316,11 @@ impl TranslatorSv2 { task_manager.clone(), sv1_server.clone(), self.config.required_extensions.clone(), + &mut channel_manager_raw, ).await { - Ok(negotiated_extensions) => { - info!( - "Upstream restarted successfully with extensions: {:?}", - negotiated_extensions - ); - - channel_manager = Arc::new(ChannelManager::new( - channel_manager_to_upstream_sender, - upstream_to_channel_manager_receiver, - channel_manager_to_sv1_server_sender, - sv1_server_to_channel_manager_receiver, - status_sender.clone(), - self.config.supported_extensions.clone(), - self.config.required_extensions.clone(), - negotiated_extensions, - )); + Ok(()) => { + info!("Upstream restarted successfully."); + channel_manager = Arc::new(channel_manager_raw); } Err(e) => { error!("Couldn't perform fallback, shutting system down: {e:?}"); @@ -442,7 +444,7 @@ impl TranslatorSv2 { /// to avoid hammering known-bad endpoints during failover. /// /// # Returns - /// * `Ok(Vec)` - The negotiated extensions to be passed to ChannelManager + /// * `Ok(())` - Upstream connected and extensions negotiated (stored in `channel_manager`) /// * `Err(TproxyErrorKind)` - All upstreams failed #[allow(clippy::too_many_arguments)] pub async fn initialize_upstream( @@ -456,7 +458,8 @@ impl TranslatorSv2 { task_manager: Arc, sv1_server_instance: Arc, required_extensions: Vec, - ) -> Result, TproxyErrorKind> { + channel_manager: &mut ChannelManager, + ) -> Result<(), TproxyErrorKind> { const MAX_RETRIES: usize = 3; let upstream_len = upstreams.len(); for (i, upstream_entry) in upstreams.iter_mut().enumerate() { @@ -489,14 +492,12 @@ impl TranslatorSv2 { status_sender.clone(), task_manager.clone(), required_extensions.clone(), + channel_manager, ) .await { - Ok(negotiated_extensions) => { - info!( - "Extension negotiation complete. Negotiated extensions: {:?}", - negotiated_extensions - ); + Ok(()) => { + info!("Extension negotiation complete."); // Now that extensions are negotiated, start the SV1 server if let Err(e) = sv1_server_instance @@ -514,7 +515,7 @@ impl TranslatorSv2 { } upstream_entry.tried_or_flagged = true; - return Ok(negotiated_extensions); + return Ok(()); } Err(e) => { warn!( @@ -550,7 +551,8 @@ async fn try_initialize_upstream( status_sender: Sender, task_manager: Arc, required_extensions: Vec, -) -> Result, TproxyErrorKind> { + channel_manager: &mut ChannelManager, +) -> Result<(), TproxyErrorKind> { let upstream = Upstream::new( upstream_addr, upstream_to_channel_manager_sender, @@ -568,8 +570,10 @@ async fn try_initialize_upstream( fallback_coordinator, status_sender, task_manager, + channel_manager, ) .await + .map(|_| ()) .map_err(|e| e.kind) } diff --git a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs index 663dbc66d..ea091a0de 100644 --- a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs +++ b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs @@ -2,7 +2,7 @@ use crate::{ error::{self, TproxyError, TproxyErrorKind, TproxyResult}, io_task::spawn_io_tasks, status::{handle_error, Status, StatusSender}, - sv2::upstream::channel::UpstreamChannelState, + sv2::{channel_manager::ChannelManager, upstream::channel::UpstreamChannelState}, utils::UpstreamEntry, }; use async_channel::{unbounded, Receiver, Sender}; @@ -11,14 +11,10 @@ use stratum_apps::{ fallback_coordinator::FallbackCoordinator, network_helpers::{self, connect_with_noise, resolve_host}, stratum_core::{ - binary_sv2::{self, Seq064K}, - codec_sv2::HandshakeRole, + binary_sv2::Seq064K, common_messages_sv2::{Protocol, SetupConnection}, - extensions_sv2::{ - RequestExtensions, RequestExtensionsError, RequestExtensionsSuccess, - MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR, MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS, - }, - handlers_sv2::HandleCommonMessagesFromServerAsync, + extensions_sv2::RequestExtensions, + handlers_sv2::{HandleCommonMessagesFromServerAsync, HandleExtensionsFromServerAsync}, parsers_sv2::{AnyMessage, Mining}, }, task_manager::TaskManager, @@ -199,13 +195,14 @@ impl Upstream { fallback_coordinator: FallbackCoordinator, status_sender: Sender, task_manager: Arc, + channel_manager: &mut ChannelManager, ) -> TproxyResult, error::Upstream> { let fallback_token: CancellationToken = fallback_coordinator.token(); let negotiated_extensions; // Wait for connection setup or cancellation signal tokio::select! { - result = self.setup_connection() => { + result = self.setup_connection(channel_manager) => { match result { Ok(extensions) => { negotiated_extensions = extensions; @@ -254,7 +251,10 @@ impl Upstream { /// # Returns /// * `Ok(Vec)` - The list of negotiated extensions (empty if none were requested) /// * `Err(TproxyError)` - Error during handshake or extension negotiation - pub async fn setup_connection(&mut self) -> TproxyResult, error::Upstream> { + pub async fn setup_connection( + &mut self, + channel_manager: &mut ChannelManager, + ) -> TproxyResult, error::Upstream> { debug!("Upstream: initiating SV2 handshake..."); // Build SetupConnection message let setup_conn_msg = Self::get_setup_connection_message(2, 2, &self.address, false) @@ -303,7 +303,7 @@ impl Upstream { // Send RequestExtensions message if there are any required extensions // and wait for the response before returning if !self.required_extensions.is_empty() { - let negotiated = self.negotiate_extensions().await?; + let negotiated = self.negotiate_extensions(channel_manager).await?; return Ok(negotiated); } @@ -315,13 +315,18 @@ impl Upstream { /// This method handles the extension negotiation flow: /// 1. Sends RequestExtensions with required extensions /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError - /// 3. Validates that all required extensions are supported - /// 4. Handles retry if server requires additional extensions we support + /// 3. Delegates response handling to the `ChannelManager` via its + /// `HandleExtensionsFromServerAsync` trait implementation + /// 4. If the server requires additional extensions we support, the ChannelManager + /// sends a retry `RequestExtensions`; this method detects and forwards it /// /// # Returns /// * `Ok(Vec)` - The list of successfully negotiated extensions /// * `Err(TproxyError)` - Extension negotiation failed - async fn negotiate_extensions(&mut self) -> TproxyResult, error::Upstream> { + async fn negotiate_extensions( + &mut self, + channel_manager: &mut ChannelManager, + ) -> TproxyResult, error::Upstream> { let request_extensions = RequestExtensions { request_id: 1, requested_extensions: Seq064K::new(self.required_extensions.clone()).unwrap(), @@ -345,130 +350,81 @@ impl Upstream { TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) })?; - // Wait for extension negotiation response with timeout - let response = tokio::time::timeout( - Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), - self.upstream_channel_state.upstream_receiver.recv(), - ) - .await - .map_err(|_| { - error!( - "Extension negotiation timed out after {} seconds", - EXTENSION_NEGOTIATION_TIMEOUT_SECS - ); - TproxyError::fallback(TproxyErrorKind::ExtensionNegotiationTimeout) - })? - .map_err(|e| { - error!("Failed to receive extension negotiation response: {}", e); - TproxyError::fallback(e) - })?; + loop { + // Wait for extension negotiation response with timeout + let response = tokio::time::timeout( + Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), + self.upstream_channel_state.upstream_receiver.recv(), + ) + .await + .map_err(|_| { + error!( + "Extension negotiation timed out after {} seconds", + EXTENSION_NEGOTIATION_TIMEOUT_SECS + ); + TproxyError::fallback(TproxyErrorKind::ExtensionNegotiationTimeout) + })? + .map_err(|e| { + error!("Failed to receive extension negotiation response: {}", e); + TproxyError::fallback(e) + })?; + + // Delegate response handling to the ChannelManager's trait implementation. + // This validates required extensions and may send a retry RequestExtensions + // if the server requires extensions we support. + self.handle_extension_response(response, channel_manager) + .await?; + + // If the ChannelManager sent a retry RequestExtensions (via its upstream_sender), + // pick it up and forward it directly to the pool, then loop to await the next response. + if let Ok(retry_frame) = self + .upstream_channel_state + .channel_manager_receiver + .try_recv() + { + info!("Forwarding retry RequestExtensions to upstream pool..."); + self.upstream_channel_state + .upstream_sender + .send(retry_frame) + .await + .map_err(|e| { + error!("Failed to forward retry RequestExtensions to pool: {:?}", e); + TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) + })?; + continue; + } - self.handle_extension_response(response).await + // No retry pending — negotiation is complete. + // Return the extensions stored by the ChannelManager's trait implementation. + return channel_manager + .get_negotiated_extensions_with_server(None) + .map_err(|e| TproxyError::fallback(e.kind)); + } } - /// Handles the extension negotiation response (Success or Error). + /// Checks that the response is an extension message and delegates handling to the + /// `ChannelManager` via its `HandleExtensionsFromServerAsync` trait implementation. + /// + /// The ChannelManager's implementation in `extensions_message_handler.rs` performs + /// validation and stores the negotiated extensions. On a `RequestExtensionsError` + /// where the server requires extensions we support, it sends a retry `RequestExtensions` + /// via its upstream channel. async fn handle_extension_response( &mut self, mut response: Sv2Frame, - ) -> TproxyResult, error::Upstream> { + channel_manager: &mut ChannelManager, + ) -> TproxyResult<(), error::Upstream> { let header = response.get_header().ok_or_else(|| { error!("Extension response frame missing header"); TproxyError::fallback(TproxyErrorKind::UnexpectedMessage(0, 0)) })?; - let msg_type = header.msg_type(); - let payload = response.payload(); - - match msg_type { - MESSAGE_TYPE_REQUEST_EXTENSIONS_SUCCESS => { - let msg: RequestExtensionsSuccess = - binary_sv2::from_bytes(payload).map_err(|e| { - error!("Failed to parse RequestExtensionsSuccess: {:?}", e); - TproxyError::fallback(TproxyErrorKind::BinarySv2(e)) - })?; - - let supported: Vec = msg.supported_extensions.into_inner(); - info!("Extension negotiation success: supported={:?}", supported); - - // Check if all required extensions are supported - let missing_required: Vec = self - .required_extensions - .iter() - .filter(|ext| !supported.contains(ext)) - .copied() - .collect(); - - if !missing_required.is_empty() { - error!( - "Server does not support required extensions: {:?}", - missing_required - ); - return Err(TproxyError::fallback( - TproxyErrorKind::RequiredExtensionsNotSupported(missing_required), - )); - } - - info!("Successfully negotiated extensions: {:?}", supported); - Ok(supported) - } - MESSAGE_TYPE_REQUEST_EXTENSIONS_ERROR => { - let msg: RequestExtensionsError = binary_sv2::from_bytes(payload).map_err(|e| { - error!("Failed to parse RequestExtensionsError: {:?}", e); - TproxyError::fallback(TproxyErrorKind::BinarySv2(e)) - })?; - - let unsupported: Vec = msg.unsupported_extensions.into_inner(); - let required_by_server: Vec = msg.required_extensions.into_inner(); - - error!( - "Extension negotiation error: unsupported={:?}, required_by_server={:?}", - unsupported, required_by_server - ); - - // Check if any of our required extensions were not supported - let missing_required: Vec = self - .required_extensions - .iter() - .filter(|ext| unsupported.contains(ext)) - .copied() - .collect(); - - if !missing_required.is_empty() { - error!( - "Server does not support required extensions: {:?}", - missing_required - ); - return Err(TproxyError::fallback( - TproxyErrorKind::RequiredExtensionsNotSupported(missing_required), - )); - } - - // If server requires extensions we don't support, fail - if !required_by_server.is_empty() { - error!( - "Server requires extensions that we don't support: {:?}", - required_by_server - ); - return Err(TproxyError::fallback( - TproxyErrorKind::ServerRequiresUnsupportedExtensions(required_by_server), - )); - } + channel_manager + .handle_extensions_message_frame_from_server(None, header, response.payload()) + .await + .map_err(|e| TproxyError::fallback(e.kind))?; - // No required extensions failed, return empty (negotiation succeeded with no - // extensions) - Ok(vec![]) - } - _ => { - error!( - "Unexpected message type during extension negotiation: {}", - msg_type - ); - Err(TproxyError::fallback(TproxyErrorKind::UnexpectedMessage( - header.ext_type(), - msg_type, - ))) - } - } + Ok(()) } /// Processes incoming messages from the upstream SV2 server. From 56e060bfff5ab1bbfaabaa83afdf86e65721525a Mon Sep 17 00:00:00 2001 From: Test User Date: Fri, 27 Mar 2026 14:46:16 -0400 Subject: [PATCH 8/8] refactor: implement shared extension negotiation utilities and error handling --- miner-apps/jd-client/src/lib/error.rs | 15 ++ miner-apps/jd-client/src/lib/mod.rs | 2 +- .../src/lib/template_receiver/bitcoin_core.rs | 2 +- miner-apps/jd-client/src/lib/upstream/mod.rs | 131 ++---------------- miner-apps/translator/src/lib/error.rs | 15 ++ .../src/lib/sv2/upstream/upstream.rs | 125 ++--------------- stratum-apps/src/extensions_negotiation.rs | 123 ++++++++++++++++ stratum-apps/src/lib.rs | 4 + 8 files changed, 186 insertions(+), 231 deletions(-) create mode 100644 stratum-apps/src/extensions_negotiation.rs diff --git a/miner-apps/jd-client/src/lib/error.rs b/miner-apps/jd-client/src/lib/error.rs index e6a90ca24..2d77c3417 100644 --- a/miner-apps/jd-client/src/lib/error.rs +++ b/miner-apps/jd-client/src/lib/error.rs @@ -17,6 +17,7 @@ use std::{ marker::PhantomData, }; use stratum_apps::{ + extensions_negotiation::ExtensionNegotiationError, network_helpers, stratum_core::{ binary_sv2, bitcoin, @@ -509,6 +510,20 @@ impl From for JDCErrorKind { } } +impl From for JDCErrorKind { + fn from(e: ExtensionNegotiationError) -> Self { + match e { + ExtensionNegotiationError::SendError => JDCErrorKind::ChannelErrorSender, + ExtensionNegotiationError::ReceiveError(_) => JDCErrorKind::UnexpectedMessage(0, 0), + ExtensionNegotiationError::Timeout => JDCErrorKind::ExtensionNegotiationTimeout, + ExtensionNegotiationError::UnexpectedMessage(ext, msg) => { + JDCErrorKind::UnexpectedMessage(ext, msg as u8) + } + ExtensionNegotiationError::HandlerError(_) => JDCErrorKind::UnexpectedMessage(0, 0), + } + } +} + impl HandlerErrorType for JDCError { fn parse_error(error: ParserError) -> Self { Self { diff --git a/miner-apps/jd-client/src/lib/mod.rs b/miner-apps/jd-client/src/lib/mod.rs index 3ca80b9a4..e2116c4ba 100644 --- a/miner-apps/jd-client/src/lib/mod.rs +++ b/miner-apps/jd-client/src/lib/mod.rs @@ -171,7 +171,7 @@ impl JobDeclaratorClient { }); } - let channel_manager_clone = channel_manager.clone(); + let mut channel_manager_clone = channel_manager.clone(); let mut bitcoin_core_sv2_join_handle: Option> = None; match self.config.template_provider_type().clone() { diff --git a/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs b/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs index aa2171ccc..0116b4373 100644 --- a/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs +++ b/miner-apps/jd-client/src/lib/template_receiver/bitcoin_core.rs @@ -39,7 +39,7 @@ pub async fn connect_to_bitcoin_core( let status_sender = StatusSender::TemplateReceiver(status_sender_clone); handle_error( &status_sender, - JDCError::::shutdown(JDCErrorKind::BitcoinCoreSv2TDPCancellationTokenActivated), + JDCError::::shutdown(JDCErrorKind::BitcoinCoreSv2CancellationTokenActivated), ) .await; } diff --git a/miner-apps/jd-client/src/lib/upstream/mod.rs b/miner-apps/jd-client/src/lib/upstream/mod.rs index fa4cb7300..f25ca35d0 100644 --- a/miner-apps/jd-client/src/lib/upstream/mod.rs +++ b/miner-apps/jd-client/src/lib/upstream/mod.rs @@ -10,22 +10,16 @@ //! - Forward SV2 mining messages between upstream and channel manager //! - Handle common messages from upstream -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc}; use async_channel::{unbounded, Receiver, Sender}; use bitcoin_core_sv2::template_distribution_protocol::CancellationToken; use stratum_apps::{ custom_mutex::Mutex, + extensions_negotiation::negotiate_extensions, fallback_coordinator::FallbackCoordinator, - key_utils::Secp256k1PublicKey, - network_helpers::{connect_with_noise, noise_stream::NoiseTcpStream, resolve_host}, - stratum_core::{ - binary_sv2::Seq064K, - extensions_sv2::RequestExtensions, - framing_sv2, - handlers_sv2::{HandleCommonMessagesFromServerAsync, HandleExtensionsFromServerAsync}, - parsers_sv2::AnyMessage, - }, + network_helpers::{connect_with_noise, resolve_host}, + stratum_core::{framing_sv2, handlers_sv2::HandleCommonMessagesFromServerAsync}, task_manager::TaskManager, utils::{ protocol_message_type::{protocol_message_type, MessageType}, @@ -43,9 +37,6 @@ use crate::{ utils::{get_setup_connection_message, UpstreamEntry}, }; -/// Timeout for extension negotiation response (10 seconds) -const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 10; - mod message_handler; /// Placeholder for future upstream-specific data/state. @@ -224,13 +215,7 @@ impl Upstream { /// Sends RequestExtensions and waits for the response. /// - /// This method handles the extension negotiation flow: - /// 1. Sends RequestExtensions with required extensions - /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError - /// 3. Delegates response handling to the `ChannelManager` via its - /// `HandleExtensionsFromServerAsync` trait implementation - /// 4. If the server requires additional extensions we support, the ChannelManager - /// sends a retry `RequestExtensions`; this method detects and forwards it + /// Delegates to the shared [`stratum_apps::extensions_negotiation::negotiate_extensions`] function. /// /// # Returns /// * `Ok(Vec)` - The list of successfully negotiated extensions @@ -239,103 +224,15 @@ impl Upstream { &mut self, channel_manager: &mut ChannelManager, ) -> JDCResult, error::Upstream> { - let requested_extensions = - Seq064K::new(self.required_extensions.clone()).map_err(JDCError::shutdown)?; - - let request_extensions = RequestExtensions { - request_id: 0, - requested_extensions, - }; - - info!( - "Sending RequestExtensions to upstream with required extensions: {:?}", - self.required_extensions - ); - - let sv2_frame: Sv2Frame = AnyMessage::Extensions(request_extensions.into_static().into()) - .try_into() - .map_err(JDCError::shutdown)?; - - self.upstream_channel - .upstream_sender - .send(sv2_frame) - .await - .map_err(|e| { - error!(?e, "Failed to send RequestExtensions to upstream"); - JDCError::fallback(JDCErrorKind::ChannelErrorSender) - })?; - - loop { - // Wait for extension negotiation response with timeout - let response = tokio::time::timeout( - Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), - self.upstream_channel.upstream_receiver.recv(), - ) - .await - .map_err(|_| { - error!( - "Extension negotiation timed out after {} seconds", - EXTENSION_NEGOTIATION_TIMEOUT_SECS - ); - JDCError::fallback(JDCErrorKind::ExtensionNegotiationTimeout) - })? - .map_err(|e| { - error!("Failed to receive extension negotiation response: {}", e); - JDCError::fallback(e) - })?; - - // Delegate response handling to the ChannelManager's trait implementation. - // This checks the frame, validates required extensions, and may send a - // retry RequestExtensions if the server requires extensions we support. - self.handle_extension_response(response, channel_manager) - .await?; - - // If the ChannelManager sent a retry RequestExtensions (via its upstream_sender), - // pick it up and forward it directly to the pool, then loop to await the next response. - if let Ok(retry_frame) = self.upstream_channel.channel_manager_receiver.try_recv() { - info!("Forwarding retry RequestExtensions to upstream pool..."); - self.upstream_channel - .upstream_sender - .send(retry_frame) - .await - .map_err(|e| { - error!(?e, "Failed to forward retry RequestExtensions to pool"); - JDCError::fallback(JDCErrorKind::ChannelErrorSender) - })?; - continue; - } - - // No retry pending — negotiation is complete. - // Return the extensions stored by the ChannelManager's trait implementation. - return channel_manager - .get_negotiated_extensions_with_server(None) - .map_err(|e| JDCError::fallback(e.kind)); - } - } - - /// Checks that the response is an extension message and delegates handling to the - /// `ChannelManager` via its `HandleExtensionsFromServerAsync` trait implementation. - /// - /// The ChannelManager's implementation in `extensions_message_handler.rs` performs - /// validation and stores the negotiated extensions. On a `RequestExtensionsError` - /// where the server requires extensions we support, it sends a retry `RequestExtensions` - /// via its upstream channel. - async fn handle_extension_response( - &mut self, - mut response: Sv2Frame, - channel_manager: &mut ChannelManager, - ) -> JDCResult<(), error::Upstream> { - let header = response.get_header().ok_or_else(|| { - error!("Extension response frame missing header"); - JDCError::fallback(JDCErrorKind::UnexpectedMessage(0, 0)) - })?; - - channel_manager - .handle_extensions_message_frame_from_server(None, header, response.payload()) - .await - .map_err(|e| JDCError::fallback(e.kind))?; - - Ok(()) + negotiate_extensions( + self.required_extensions.clone(), + self.upstream_channel.upstream_sender.clone(), + self.upstream_channel.upstream_receiver.clone(), + self.upstream_channel.channel_manager_receiver.clone(), + channel_manager, + ) + .await + .map_err(|e| JDCError::fallback(JDCErrorKind::from(e))) } /// Start unified upstream loop. diff --git a/miner-apps/translator/src/lib/error.rs b/miner-apps/translator/src/lib/error.rs index 4f7653bac..e6a467ace 100644 --- a/miner-apps/translator/src/lib/error.rs +++ b/miner-apps/translator/src/lib/error.rs @@ -15,6 +15,7 @@ use std::{ sync::PoisonError, }; use stratum_apps::{ + extensions_negotiation::ExtensionNegotiationError, stratum_core::{ binary_sv2, channels_sv2::client::error::GroupChannelError, @@ -401,6 +402,20 @@ impl HandlerErrorType for TproxyErrorKind { } } +impl From for TproxyErrorKind { + fn from(e: ExtensionNegotiationError) -> Self { + match e { + ExtensionNegotiationError::SendError => TproxyErrorKind::ChannelErrorSender, + ExtensionNegotiationError::ReceiveError(_) => TproxyErrorKind::UnexpectedMessage(0, 0), + ExtensionNegotiationError::Timeout => TproxyErrorKind::ExtensionNegotiationTimeout, + ExtensionNegotiationError::UnexpectedMessage(ext, msg) => { + TproxyErrorKind::UnexpectedMessage(ext, msg as u8) + } + ExtensionNegotiationError::HandlerError(_) => TproxyErrorKind::UnexpectedMessage(0, 0), + } + } +} + impl HandlerErrorType for TproxyError { fn parse_error(error: ParserError) -> Self { Self { diff --git a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs index ea091a0de..32b5410bf 100644 --- a/miner-apps/translator/src/lib/sv2/upstream/upstream.rs +++ b/miner-apps/translator/src/lib/sv2/upstream/upstream.rs @@ -6,15 +6,14 @@ use crate::{ utils::UpstreamEntry, }; use async_channel::{unbounded, Receiver, Sender}; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc}; use stratum_apps::{ + extensions_negotiation::negotiate_extensions, fallback_coordinator::FallbackCoordinator, network_helpers::{self, connect_with_noise, resolve_host}, stratum_core::{ - binary_sv2::Seq064K, common_messages_sv2::{Protocol, SetupConnection}, - extensions_sv2::RequestExtensions, - handlers_sv2::{HandleCommonMessagesFromServerAsync, HandleExtensionsFromServerAsync}, + handlers_sv2::HandleCommonMessagesFromServerAsync, parsers_sv2::{AnyMessage, Mining}, }, task_manager::TaskManager, @@ -28,9 +27,6 @@ use tokio::net::TcpStream; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; -/// Timeout for extension negotiation response (30 seconds) -const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 30; - /// Manages the upstream SV2 connection to a mining pool or proxy. /// /// This struct handles the SV2 protocol communication with upstream servers, @@ -312,13 +308,7 @@ impl Upstream { /// Sends RequestExtensions and waits for the response. /// - /// This method handles the extension negotiation flow: - /// 1. Sends RequestExtensions with required extensions - /// 2. Waits for RequestExtensionsSuccess or RequestExtensionsError - /// 3. Delegates response handling to the `ChannelManager` via its - /// `HandleExtensionsFromServerAsync` trait implementation - /// 4. If the server requires additional extensions we support, the ChannelManager - /// sends a retry `RequestExtensions`; this method detects and forwards it + /// Delegates to the shared [`stratum_apps::extensions_negotiation::negotiate_extensions`] function. /// /// # Returns /// * `Ok(Vec)` - The list of successfully negotiated extensions @@ -327,104 +317,15 @@ impl Upstream { &mut self, channel_manager: &mut ChannelManager, ) -> TproxyResult, error::Upstream> { - let request_extensions = RequestExtensions { - request_id: 1, - requested_extensions: Seq064K::new(self.required_extensions.clone()).unwrap(), - }; - - let sv2_frame: Sv2Frame = AnyMessage::Extensions(request_extensions.into_static().into()) - .try_into() - .map_err(TproxyError::shutdown)?; - - info!( - "Sending RequestExtensions to upstream with required extensions: {:?}", - self.required_extensions - ); - - self.upstream_channel_state - .upstream_sender - .send(sv2_frame) - .await - .map_err(|e| { - error!("Failed to send RequestExtensions to upstream: {:?}", e); - TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) - })?; - - loop { - // Wait for extension negotiation response with timeout - let response = tokio::time::timeout( - Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), - self.upstream_channel_state.upstream_receiver.recv(), - ) - .await - .map_err(|_| { - error!( - "Extension negotiation timed out after {} seconds", - EXTENSION_NEGOTIATION_TIMEOUT_SECS - ); - TproxyError::fallback(TproxyErrorKind::ExtensionNegotiationTimeout) - })? - .map_err(|e| { - error!("Failed to receive extension negotiation response: {}", e); - TproxyError::fallback(e) - })?; - - // Delegate response handling to the ChannelManager's trait implementation. - // This validates required extensions and may send a retry RequestExtensions - // if the server requires extensions we support. - self.handle_extension_response(response, channel_manager) - .await?; - - // If the ChannelManager sent a retry RequestExtensions (via its upstream_sender), - // pick it up and forward it directly to the pool, then loop to await the next response. - if let Ok(retry_frame) = self - .upstream_channel_state - .channel_manager_receiver - .try_recv() - { - info!("Forwarding retry RequestExtensions to upstream pool..."); - self.upstream_channel_state - .upstream_sender - .send(retry_frame) - .await - .map_err(|e| { - error!("Failed to forward retry RequestExtensions to pool: {:?}", e); - TproxyError::fallback(TproxyErrorKind::ChannelErrorSender) - })?; - continue; - } - - // No retry pending — negotiation is complete. - // Return the extensions stored by the ChannelManager's trait implementation. - return channel_manager - .get_negotiated_extensions_with_server(None) - .map_err(|e| TproxyError::fallback(e.kind)); - } - } - - /// Checks that the response is an extension message and delegates handling to the - /// `ChannelManager` via its `HandleExtensionsFromServerAsync` trait implementation. - /// - /// The ChannelManager's implementation in `extensions_message_handler.rs` performs - /// validation and stores the negotiated extensions. On a `RequestExtensionsError` - /// where the server requires extensions we support, it sends a retry `RequestExtensions` - /// via its upstream channel. - async fn handle_extension_response( - &mut self, - mut response: Sv2Frame, - channel_manager: &mut ChannelManager, - ) -> TproxyResult<(), error::Upstream> { - let header = response.get_header().ok_or_else(|| { - error!("Extension response frame missing header"); - TproxyError::fallback(TproxyErrorKind::UnexpectedMessage(0, 0)) - })?; - - channel_manager - .handle_extensions_message_frame_from_server(None, header, response.payload()) - .await - .map_err(|e| TproxyError::fallback(e.kind))?; - - Ok(()) + negotiate_extensions( + self.required_extensions.clone(), + self.upstream_channel_state.upstream_sender.clone(), + self.upstream_channel_state.upstream_receiver.clone(), + self.upstream_channel_state.channel_manager_receiver.clone(), + channel_manager, + ) + .await + .map_err(|e| TproxyError::fallback(TproxyErrorKind::from(e))) } /// Processes incoming messages from the upstream SV2 server. diff --git a/stratum-apps/src/extensions_negotiation.rs b/stratum-apps/src/extensions_negotiation.rs new file mode 100644 index 000000000..996d0d76b --- /dev/null +++ b/stratum-apps/src/extensions_negotiation.rs @@ -0,0 +1,123 @@ +//! Extension negotiation utilities shared by JDC and Translator +//! +//! This module provides the shared logic for negotiating SV2 extensions with +//! an upstream server, handling the RequestExtensions/Response flow. + +use async_channel::{Receiver, Sender}; +use std::time::Duration; +use tracing::{error, info}; + +use stratum_core::{ + binary_sv2::Seq064K, codec_sv2::StandardSv2Frame, extensions_sv2::RequestExtensions, + handlers_sv2::HandleExtensionsFromServerAsync, parsers_sv2::AnyMessage, +}; + +use crate::utils::types::Message; + +const EXTENSION_NEGOTIATION_TIMEOUT_SECS: u64 = 30; + +#[derive(Debug)] +pub enum ExtensionNegotiationError { + SendError, + ReceiveError(async_channel::RecvError), + Timeout, + UnexpectedMessage(u16, u16), + HandlerError(String), +} + +impl std::fmt::Display for ExtensionNegotiationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ExtensionNegotiationError::SendError => write!(f, "Failed to send RequestExtensions"), + ExtensionNegotiationError::ReceiveError(e) => { + write!(f, "Failed to receive extension response: {}", e) + } + ExtensionNegotiationError::Timeout => write!(f, "Extension negotiation timed out"), + ExtensionNegotiationError::UnexpectedMessage(ext, msg) => { + write!(f, "Unexpected message: ext={}, msg={}", ext, msg) + } + ExtensionNegotiationError::HandlerError(s) => { + write!(f, "Handler error: {}", s) + } + } + } +} + +impl std::error::Error for ExtensionNegotiationError {} + +pub async fn negotiate_extensions( + required_extensions: Vec, + upstream_sender: Sender>, + upstream_receiver: Receiver>, + channel_manager_receiver: Receiver>, + channel_manager: &mut CM, +) -> Result, ExtensionNegotiationError> +where + CM: HandleExtensionsFromServerAsync + Send, + E: std::fmt::Debug, +{ + let requested_extensions = Seq064K::new(required_extensions.clone()).map_err(|e| { + ExtensionNegotiationError::HandlerError(format!("Failed to create Seq064K: {:?}", e)) + })?; + + let request_extensions = RequestExtensions { + request_id: 0, + requested_extensions, + }; + + let sv2_frame: StandardSv2Frame = + AnyMessage::Extensions(request_extensions.into_static().into()) + .try_into() + .map_err(|e| { + ExtensionNegotiationError::HandlerError(format!("Failed to frame: {:?}", e)) + })?; + + info!( + "Sending RequestExtensions to upstream with required extensions: {:?}", + required_extensions + ); + + upstream_sender + .send(sv2_frame) + .await + .map_err(|_| ExtensionNegotiationError::SendError)?; + + loop { + let mut response = tokio::time::timeout( + Duration::from_secs(EXTENSION_NEGOTIATION_TIMEOUT_SECS), + upstream_receiver.recv(), + ) + .await + .map_err(|_| { + error!( + "Extension negotiation timed out after {} seconds", + EXTENSION_NEGOTIATION_TIMEOUT_SECS + ); + ExtensionNegotiationError::Timeout + })? + .map_err(ExtensionNegotiationError::ReceiveError)?; + + let header = response.get_header().ok_or_else(|| { + error!("Extension response frame missing header"); + ExtensionNegotiationError::UnexpectedMessage(0, 0) + })?; + + channel_manager + .handle_extensions_message_frame_from_server(None, header, response.payload()) + .await + .map_err(|e| ExtensionNegotiationError::HandlerError(format!("{:?}", e)))?; + + if let Ok(retry_frame) = channel_manager_receiver.try_recv() { + info!("Forwarding retry RequestExtensions to upstream pool..."); + upstream_sender + .send(retry_frame) + .await + .map_err(|_| ExtensionNegotiationError::SendError)?; + continue; + } + + return channel_manager + .get_negotiated_extensions_with_server(None) + .map_err(|e| ExtensionNegotiationError::HandlerError(format!("{:?}", e))); + } +} diff --git a/stratum-apps/src/lib.rs b/stratum-apps/src/lib.rs index 308a9e917..a6bc84c0d 100644 --- a/stratum-apps/src/lib.rs +++ b/stratum-apps/src/lib.rs @@ -78,3 +78,7 @@ pub mod coinbase_output_constraints; /// Fallback coordinator pub mod fallback_coordinator; + +/// Extension negotiation utilities shared by JDC and Translator +pub mod extensions_negotiation; +pub use extensions_negotiation::{negotiate_extensions, ExtensionNegotiationError};