diff --git a/src/api/bitcoin_rpc.rs b/src/api/bitcoin_rpc.rs index c4b13a3..845bc4b 100644 --- a/src/api/bitcoin_rpc.rs +++ b/src/api/bitcoin_rpc.rs @@ -1,41 +1,28 @@ use axum::http::StatusCode; use serde::Deserialize; use serde_json::{json, Value}; -use std::{fmt, time::Duration}; -use tracing::warn; +use std::{error::Error as StdError, fmt, time::Duration}; +use tracing::{debug, info}; -const DEFAULT_RPC_URL: &str = "http://127.0.0.1:8332"; const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); const DEFAULT_RPC_FEE_DELTA: i64 = 100_000_000; pub(crate) struct BitcoindRpc { - url: Option, - user: Option, - pwd: Option, - client: reqwest::Client, + url: String, + user: String, + pwd: String, fee_delta: i64, } impl BitcoindRpc { - pub(crate) fn new( - url: Option, - user: Option, - pwd: Option, - fee_delta: Option, - ) -> Self { - let client = reqwest::Client::builder() - .connect_timeout(RPC_CONNECT_TIMEOUT) - .timeout(RPC_REQUEST_TIMEOUT) - .build() - .expect("Failed to build client"); + pub(crate) fn new(url: String, user: String, pwd: String, fee_delta: Option) -> Self { let fee_delta = fee_delta.unwrap_or(DEFAULT_RPC_FEE_DELTA); Self { url, user, pwd, - client, fee_delta, } } @@ -49,6 +36,7 @@ impl BitcoindRpc { BitcoindRpcError::InvalidResponse("empty response from bitcoind".to_string()) })?; self.prioritise_transaction(&txid).await?; + crate::prioritized_transactions::record(&txid); Ok(txid) } @@ -56,9 +44,26 @@ impl BitcoindRpc { let (status, text) = self .send_request("prioritisetransaction", json!([txid, 0, self.fee_delta])) .await?; - RpcResponse::from_response(status, &text) + info!( + txid, + fee_delta = self.fee_delta, + %status, + response = %text, + "bitcoind prioritisetransaction response" + ); + + let result = RpcResponse::from_response(status, &text) .map_err(|e| BitcoindRpcError::Prioritize(e.to_string()))?; - Ok(()) + + match result.and_then(|value| value.as_bool()) { + Some(true) => Ok(()), + Some(false) => Err(BitcoindRpcError::Prioritize(format!( + "bitcoind returned false for prioritisetransaction: {text}" + ))), + None => Err(BitcoindRpcError::Prioritize(format!( + "invalid prioritisetransaction response from bitcoind: {text}" + ))), + } } async fn send_request( @@ -66,22 +71,6 @@ impl BitcoindRpc { method: &str, params: Value, ) -> Result<(StatusCode, String), BitcoindRpcError> { - let (user, pwd) = match (&self.user, &self.pwd) { - (Some(u), Some(p)) => (u.as_str(), p.as_str()), - _ => { - let msg = match (&self.user, &self.pwd) { - (None, None) => "RPC_USER and RPC_PWD are not set.".to_string(), - (None, _) => "RPC_USER is not set.".to_string(), - _ => "RPC_PWD is not set.".to_string(), - }; - return Err(BitcoindRpcError::MissingConfig(msg)); - } - }; - let url = self.url.as_deref().unwrap_or_else(|| { - warn!("RPC_URL not configured, using default: {DEFAULT_RPC_URL}"); - DEFAULT_RPC_URL - }); - let body = json!({ "jsonrpc": "1.0", "id": "dmnd-client", @@ -89,16 +78,30 @@ impl BitcoindRpc { "params": params }); - let response = self - .client - .post(url) - .basic_auth(user, Some(pwd)) + debug!( + method, + url = self.url.as_str(), + rpc_user = self.user.as_str(), + fee_delta = self.fee_delta, + "sending bitcoind RPC request" + ); + + let client = reqwest::Client::builder() + .connect_timeout(RPC_CONNECT_TIMEOUT) + .timeout(RPC_REQUEST_TIMEOUT) + .build() + .expect("Failed to build client"); + + let response = client + .post(&self.url) + .basic_auth(&self.user, Some(&self.pwd)) .json(&body) .send() .await?; let status = response.status(); let text = response.text().await?; + debug!(method, %status, response = %text, "received bitcoind RPC response"); Ok((status, text)) } } @@ -160,10 +163,9 @@ fn validate_transaction_hex(tx: &str) -> Result<&str, BitcoindRpcError> { Ok(tx) } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum BitcoindRpcError { InvalidTransaction(String), - MissingConfig(String), Rejected(String), Timeout(String), Other(String), @@ -177,7 +179,6 @@ impl BitcoindRpcError { BitcoindRpcError::InvalidTransaction(_) | BitcoindRpcError::Rejected(_) => { StatusCode::BAD_REQUEST } - BitcoindRpcError::MissingConfig(_) => StatusCode::SERVICE_UNAVAILABLE, BitcoindRpcError::Timeout(_) => StatusCode::GATEWAY_TIMEOUT, BitcoindRpcError::Other(_) | BitcoindRpcError::InvalidResponse(_) @@ -188,10 +189,20 @@ impl BitcoindRpcError { impl From for BitcoindRpcError { fn from(error: reqwest::Error) -> Self { + let source = error + .source() + .map(|source| format!("; source: {source}")) + .unwrap_or_default(); if error.is_timeout() { - BitcoindRpcError::Timeout(format!("timed out while connecting to bitcoind: {error}")) + BitcoindRpcError::Timeout(format!( + "timed out while connecting to bitcoind: {error}{source}" + )) + } else if error.is_builder() { + BitcoindRpcError::Other(format!( + "failed to build bitcoind RPC request: {error}{source}" + )) } else { - BitcoindRpcError::Other(format!("failed to connect to bitcoind: {error}")) + BitcoindRpcError::Other(format!("failed to connect to bitcoind: {error}{source}")) } } } @@ -200,7 +211,6 @@ impl fmt::Display for BitcoindRpcError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BitcoindRpcError::InvalidTransaction(msg) - | BitcoindRpcError::MissingConfig(msg) | BitcoindRpcError::Rejected(msg) | BitcoindRpcError::Timeout(msg) | BitcoindRpcError::Other(msg) diff --git a/src/api/routes.rs b/src/api/routes.rs index be1dba8..06b8507 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -233,7 +233,10 @@ async fn health_check_reports_full_translator_handoff() { stats_sender: crate::api::stats::StatsSender::new(), downstream_handoff: handoff_tx, rpc: std::sync::Arc::new(crate::api::bitcoin_rpc::BitcoindRpc::new( - None, None, None, None, + "http://127.0.0.1:8332".to_string(), + "user".to_string(), + "password".to_string(), + None, )), }; diff --git a/src/config.rs b/src/config.rs index 7efbde8..a4df163 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,9 +170,9 @@ pub struct Configuration { auto_update: bool, signature: String, miner_name: Option, - rpc_url: Option, - rpc_user: Option, - rpc_pwd: Option, + rpc_url: String, + rpc_user: String, + rpc_pwd: String, rpc_fee_delta: Option, } impl Configuration { @@ -221,9 +221,9 @@ and make that test pass." auto_update: bool, signature: String, miner_name: Option, - rpc_url: Option, - rpc_user: Option, - rpc_pwd: Option, + rpc_url: String, + rpc_user: String, + rpc_pwd: String, rpc_fee_delta: Option, ) -> Self { if let Err(error) = Self::validate_supported_delay(delay) { @@ -295,9 +295,9 @@ and make that test pass." false, "DDxDD".to_string(), None, - None, - None, - None, + "http://127.0.0.1:8332".to_string(), + "user".to_string(), + "password".to_string(), None, ) } @@ -462,15 +462,15 @@ and make that test pass." Self::cfg().miner_name.clone() } - pub fn rpc_url() -> Option { + pub fn rpc_url() -> String { Self::cfg().rpc_url.clone() } - pub fn rpc_user() -> Option { + pub fn rpc_user() -> String { Self::cfg().rpc_user.clone() } - pub fn rpc_pwd() -> Option { + pub fn rpc_pwd() -> String { Self::cfg().rpc_pwd.clone() } @@ -529,15 +529,18 @@ and make that test pass." let rpc_url = args .rpc_url .or(config.rpc_url) - .or_else(|| std::env::var("RPC_URL").ok()); + .or_else(|| std::env::var("RPC_URL").ok()) + .expect("RPC_URL is not set"); let rpc_user = args .rpc_user .or(config.rpc_user) - .or_else(|| std::env::var("RPC_USER").ok()); + .or_else(|| std::env::var("RPC_USER").ok()) + .expect("RPC_USER is not set"); let rpc_pwd = args .rpc_pwd .or(config.rpc_pwd) - .or_else(|| std::env::var("RPC_PWD").ok()); + .or_else(|| std::env::var("RPC_PWD").ok()) + .expect("RPC_PWD is not set"); let rpc_fee_delta = args.rpc_fee_delta.or(config.rpc_fee_delta).or_else(|| { std::env::var("RPC_FEE_DELTA") .ok() diff --git a/src/jd_client/job_declarator/mod.rs b/src/jd_client/job_declarator/mod.rs index 5d01765..4eeb234 100644 --- a/src/jd_client/job_declarator/mod.rs +++ b/src/jd_client/job_declarator/mod.rs @@ -12,7 +12,10 @@ use roles_logic_sv2::{ template_distribution_sv2::SetNewPrevHash, utils::Mutex, }; -use std::{collections::HashMap, convert::TryInto}; +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, +}; use task_manager::TaskManager; use tokio::sync::mpsc::{Receiver as TReceiver, Sender as TSender}; use tracing::{error, info}; @@ -242,13 +245,17 @@ impl JobDeclarator { .safe_lock(|s| (s.req_ids.next(), s.min_extranonce_size, s.sender.clone())) .map_err(|_| Error::JobDeclaratorMutexCorrupted)?; + let template_transactions = tx_list_.to_vec(); + let prioritized_txids = crate::prioritized_transactions::snapshot(); + let mut template_txids = HashSet::with_capacity(template_transactions.len()); let mut tx_list: Vec = Vec::new(); let mut tx_ids = vec![]; - for tx in tx_list_.to_vec() { + for tx in template_transactions { let transaction: Result = bitcoin::consensus::deserialize(&tx); match transaction { Ok(tx) => { + template_txids.insert(tx.compute_txid().to_string()); let w_tx_id: U256 = tx.compute_wtxid().to_raw_hash().to_byte_array().into(); tx_list.push(tx); tx_ids.push(w_tx_id); @@ -259,6 +266,13 @@ impl JobDeclarator { } } } + for txid in missing_prioritized_txids(&prioritized_txids, &template_txids) { + error!( + txid, + template_id = template.template_id, + "prioritized transaction is missing from template transaction list" + ); + } let tx_ids: Seq064K<'static, U256> = Seq064K::from(tx_ids); let coinbase_prefix = self_mutex @@ -581,3 +595,39 @@ impl JobDeclarator { }) } } + +fn missing_prioritized_txids( + prioritized_txids: &[String], + template_txids: &HashSet, +) -> Vec { + prioritized_txids + .iter() + .filter(|txid| !template_txids.contains(*txid)) + .cloned() + .collect() +} + +#[cfg(test)] +mod tests { + use super::missing_prioritized_txids; + use std::collections::HashSet; + + #[test] + fn no_missing_prioritized_txids_when_all_are_in_template() { + let prioritized = vec!["a".to_string(), "b".to_string()]; + let template = HashSet::from(["a".to_string(), "b".to_string(), "c".to_string()]); + + assert!(missing_prioritized_txids(&prioritized, &template).is_empty()); + } + + #[test] + fn returns_only_prioritized_txids_missing_from_template() { + let prioritized = vec!["a".to_string(), "b".to_string(), "c".to_string()]; + let template = HashSet::from(["a".to_string(), "c".to_string()]); + + assert_eq!( + missing_prioritized_txids(&prioritized, &template), + vec!["b".to_string()] + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 956eb05..0c46bee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ pub use config::Configuration; pub mod jd_client; mod minin_pool_connection; mod monitor; +mod prioritized_transactions; mod proxy_state; mod router; mod share_accounter; diff --git a/src/prioritized_transactions.rs b/src/prioritized_transactions.rs new file mode 100644 index 0000000..bec6050 --- /dev/null +++ b/src/prioritized_transactions.rs @@ -0,0 +1,26 @@ +use std::{ + collections::HashSet, + sync::{Mutex, OnceLock}, +}; + +static PRIORITIZED_TXIDS: OnceLock>> = OnceLock::new(); + +fn txids() -> &'static Mutex> { + PRIORITIZED_TXIDS.get_or_init(|| Mutex::new(HashSet::new())) +} + +pub(crate) fn record(txid: &str) { + txids() + .lock() + .expect("prioritized transactions mutex poisoned") + .insert(txid.to_string()); +} + +pub(crate) fn snapshot() -> Vec { + txids() + .lock() + .expect("prioritized transactions mutex poisoned") + .iter() + .cloned() + .collect() +} diff --git a/tests/library_init.rs b/tests/library_init.rs index b2c3ffd..96e9670 100644 --- a/tests/library_init.rs +++ b/tests/library_init.rs @@ -146,9 +146,9 @@ async fn library_init_sv2_setup_connection() { false, "DDxDD".to_string(), None, - None, - None, - None, + "http://127.0.0.1:8332".to_string(), + "user".to_string(), + "password".to_string(), None, );