From 6914f3cec350ddf56fa1f19766cafaa9f2b6e64c Mon Sep 17 00:00:00 2001 From: lorban Date: Tue, 5 May 2026 17:37:57 +0200 Subject: [PATCH] Add Bitcoin RPC transaction prioritization and template checks Submit raw transactions through Bitcoin Core RPC, prioritize accepted txids with the configured fee delta, and log the raw prioritization response. Require finalized RPC URL, user, and password config before constructing BitcoindRpc, and build the reqwest client per request instead of storing it. Track prioritized txids in memory and, when template transaction data arrives, log an error for any prioritized txid missing from the template tx list. --- src/api/bitcoin_rpc.rs | 104 +++++++++++++++------------- src/api/routes.rs | 5 +- src/config.rs | 33 +++++---- src/jd_client/job_declarator/mod.rs | 54 ++++++++++++++- src/lib.rs | 1 + src/prioritized_transactions.rs | 26 +++++++ tests/library_init.rs | 6 +- 7 files changed, 161 insertions(+), 68 deletions(-) create mode 100644 src/prioritized_transactions.rs diff --git a/src/api/bitcoin_rpc.rs b/src/api/bitcoin_rpc.rs index c4b13a3c..845bc4be 100644 --- a/src/api/bitcoin_rpc.rs +++ b/src/api/bitcoin_rpc.rs @@ -1,41 +1,28 @@ use axum::http::StatusCode; use serde::Deserialize; use serde_json::{json, Value}; -use std::{fmt, time::Duration}; -use tracing::warn; +use std::{error::Error as StdError, fmt, time::Duration}; +use tracing::{debug, info}; -const DEFAULT_RPC_URL: &str = "http://127.0.0.1:8332"; const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15); const DEFAULT_RPC_FEE_DELTA: i64 = 100_000_000; pub(crate) struct BitcoindRpc { - url: Option, - user: Option, - pwd: Option, - client: reqwest::Client, + url: String, + user: String, + pwd: String, fee_delta: i64, } impl BitcoindRpc { - pub(crate) fn new( - url: Option, - user: Option, - pwd: Option, - fee_delta: Option, - ) -> Self { - let client = reqwest::Client::builder() - .connect_timeout(RPC_CONNECT_TIMEOUT) - .timeout(RPC_REQUEST_TIMEOUT) - .build() - .expect("Failed to build client"); + pub(crate) fn new(url: String, user: String, pwd: String, fee_delta: Option) -> Self { let fee_delta = fee_delta.unwrap_or(DEFAULT_RPC_FEE_DELTA); Self { url, user, pwd, - client, fee_delta, } } @@ -49,6 +36,7 @@ impl BitcoindRpc { BitcoindRpcError::InvalidResponse("empty response from bitcoind".to_string()) })?; self.prioritise_transaction(&txid).await?; + crate::prioritized_transactions::record(&txid); Ok(txid) } @@ -56,9 +44,26 @@ impl BitcoindRpc { let (status, text) = self .send_request("prioritisetransaction", json!([txid, 0, self.fee_delta])) .await?; - RpcResponse::from_response(status, &text) + info!( + txid, + fee_delta = self.fee_delta, + %status, + response = %text, + "bitcoind prioritisetransaction response" + ); + + let result = RpcResponse::from_response(status, &text) .map_err(|e| BitcoindRpcError::Prioritize(e.to_string()))?; - Ok(()) + + match result.and_then(|value| value.as_bool()) { + Some(true) => Ok(()), + Some(false) => Err(BitcoindRpcError::Prioritize(format!( + "bitcoind returned false for prioritisetransaction: {text}" + ))), + None => Err(BitcoindRpcError::Prioritize(format!( + "invalid prioritisetransaction response from bitcoind: {text}" + ))), + } } async fn send_request( @@ -66,22 +71,6 @@ impl BitcoindRpc { method: &str, params: Value, ) -> Result<(StatusCode, String), BitcoindRpcError> { - let (user, pwd) = match (&self.user, &self.pwd) { - (Some(u), Some(p)) => (u.as_str(), p.as_str()), - _ => { - let msg = match (&self.user, &self.pwd) { - (None, None) => "RPC_USER and RPC_PWD are not set.".to_string(), - (None, _) => "RPC_USER is not set.".to_string(), - _ => "RPC_PWD is not set.".to_string(), - }; - return Err(BitcoindRpcError::MissingConfig(msg)); - } - }; - let url = self.url.as_deref().unwrap_or_else(|| { - warn!("RPC_URL not configured, using default: {DEFAULT_RPC_URL}"); - DEFAULT_RPC_URL - }); - let body = json!({ "jsonrpc": "1.0", "id": "dmnd-client", @@ -89,16 +78,30 @@ impl BitcoindRpc { "params": params }); - let response = self - .client - .post(url) - .basic_auth(user, Some(pwd)) + debug!( + method, + url = self.url.as_str(), + rpc_user = self.user.as_str(), + fee_delta = self.fee_delta, + "sending bitcoind RPC request" + ); + + let client = reqwest::Client::builder() + .connect_timeout(RPC_CONNECT_TIMEOUT) + .timeout(RPC_REQUEST_TIMEOUT) + .build() + .expect("Failed to build client"); + + let response = client + .post(&self.url) + .basic_auth(&self.user, Some(&self.pwd)) .json(&body) .send() .await?; let status = response.status(); let text = response.text().await?; + debug!(method, %status, response = %text, "received bitcoind RPC response"); Ok((status, text)) } } @@ -160,10 +163,9 @@ fn validate_transaction_hex(tx: &str) -> Result<&str, BitcoindRpcError> { Ok(tx) } -#[derive(Debug)] +#[derive(Clone, Debug)] pub(crate) enum BitcoindRpcError { InvalidTransaction(String), - MissingConfig(String), Rejected(String), Timeout(String), Other(String), @@ -177,7 +179,6 @@ impl BitcoindRpcError { BitcoindRpcError::InvalidTransaction(_) | BitcoindRpcError::Rejected(_) => { StatusCode::BAD_REQUEST } - BitcoindRpcError::MissingConfig(_) => StatusCode::SERVICE_UNAVAILABLE, BitcoindRpcError::Timeout(_) => StatusCode::GATEWAY_TIMEOUT, BitcoindRpcError::Other(_) | BitcoindRpcError::InvalidResponse(_) @@ -188,10 +189,20 @@ impl BitcoindRpcError { impl From for BitcoindRpcError { fn from(error: reqwest::Error) -> Self { + let source = error + .source() + .map(|source| format!("; source: {source}")) + .unwrap_or_default(); if error.is_timeout() { - BitcoindRpcError::Timeout(format!("timed out while connecting to bitcoind: {error}")) + BitcoindRpcError::Timeout(format!( + "timed out while connecting to bitcoind: {error}{source}" + )) + } else if error.is_builder() { + BitcoindRpcError::Other(format!( + "failed to build bitcoind RPC request: {error}{source}" + )) } else { - BitcoindRpcError::Other(format!("failed to connect to bitcoind: {error}")) + BitcoindRpcError::Other(format!("failed to connect to bitcoind: {error}{source}")) } } } @@ -200,7 +211,6 @@ impl fmt::Display for BitcoindRpcError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BitcoindRpcError::InvalidTransaction(msg) - | BitcoindRpcError::MissingConfig(msg) | BitcoindRpcError::Rejected(msg) | BitcoindRpcError::Timeout(msg) | BitcoindRpcError::Other(msg) diff --git a/src/api/routes.rs b/src/api/routes.rs index be1dba8c..06b8507e 100644 --- a/src/api/routes.rs +++ b/src/api/routes.rs @@ -233,7 +233,10 @@ async fn health_check_reports_full_translator_handoff() { stats_sender: crate::api::stats::StatsSender::new(), downstream_handoff: handoff_tx, rpc: std::sync::Arc::new(crate::api::bitcoin_rpc::BitcoindRpc::new( - None, None, None, None, + "http://127.0.0.1:8332".to_string(), + "user".to_string(), + "password".to_string(), + None, )), }; diff --git a/src/config.rs b/src/config.rs index 7efbde84..a4df163d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,9 +170,9 @@ pub struct Configuration { auto_update: bool, signature: String, miner_name: Option, - rpc_url: Option, - rpc_user: Option, - rpc_pwd: Option, + rpc_url: String, + rpc_user: String, + rpc_pwd: String, rpc_fee_delta: Option, } impl Configuration { @@ -221,9 +221,9 @@ and make that test pass." auto_update: bool, signature: String, miner_name: Option, - rpc_url: Option, - rpc_user: Option, - rpc_pwd: Option, + rpc_url: String, + rpc_user: String, + rpc_pwd: String, rpc_fee_delta: Option, ) -> Self { if let Err(error) = Self::validate_supported_delay(delay) { @@ -295,9 +295,9 @@ and make that test pass." false, "DDxDD".to_string(), None, - None, - None, - None, + "http://127.0.0.1:8332".to_string(), + "user".to_string(), + "password".to_string(), None, ) } @@ -462,15 +462,15 @@ and make that test pass." Self::cfg().miner_name.clone() } - pub fn rpc_url() -> Option { + pub fn rpc_url() -> String { Self::cfg().rpc_url.clone() } - pub fn rpc_user() -> Option { + pub fn rpc_user() -> String { Self::cfg().rpc_user.clone() } - pub fn rpc_pwd() -> Option { + pub fn rpc_pwd() -> String { Self::cfg().rpc_pwd.clone() } @@ -529,15 +529,18 @@ and make that test pass." let rpc_url = args .rpc_url .or(config.rpc_url) - .or_else(|| std::env::var("RPC_URL").ok()); + .or_else(|| std::env::var("RPC_URL").ok()) + .expect("RPC_URL is not set"); let rpc_user = args .rpc_user .or(config.rpc_user) - .or_else(|| std::env::var("RPC_USER").ok()); + .or_else(|| std::env::var("RPC_USER").ok()) + .expect("RPC_USER is not set"); let rpc_pwd = args .rpc_pwd .or(config.rpc_pwd) - .or_else(|| std::env::var("RPC_PWD").ok()); + .or_else(|| std::env::var("RPC_PWD").ok()) + .expect("RPC_PWD is not set"); let rpc_fee_delta = args.rpc_fee_delta.or(config.rpc_fee_delta).or_else(|| { std::env::var("RPC_FEE_DELTA") .ok() diff --git a/src/jd_client/job_declarator/mod.rs b/src/jd_client/job_declarator/mod.rs index 5d017657..4eeb234b 100644 --- a/src/jd_client/job_declarator/mod.rs +++ b/src/jd_client/job_declarator/mod.rs @@ -12,7 +12,10 @@ use roles_logic_sv2::{ template_distribution_sv2::SetNewPrevHash, utils::Mutex, }; -use std::{collections::HashMap, convert::TryInto}; +use std::{ + collections::{HashMap, HashSet}, + convert::TryInto, +}; use task_manager::TaskManager; use tokio::sync::mpsc::{Receiver as TReceiver, Sender as TSender}; use tracing::{error, info}; @@ -242,13 +245,17 @@ impl JobDeclarator { .safe_lock(|s| (s.req_ids.next(), s.min_extranonce_size, s.sender.clone())) .map_err(|_| Error::JobDeclaratorMutexCorrupted)?; + let template_transactions = tx_list_.to_vec(); + let prioritized_txids = crate::prioritized_transactions::snapshot(); + let mut template_txids = HashSet::with_capacity(template_transactions.len()); let mut tx_list: Vec = Vec::new(); let mut tx_ids = vec![]; - for tx in tx_list_.to_vec() { + for tx in template_transactions { let transaction: Result = bitcoin::consensus::deserialize(&tx); match transaction { Ok(tx) => { + template_txids.insert(tx.compute_txid().to_string()); let w_tx_id: U256 = tx.compute_wtxid().to_raw_hash().to_byte_array().into(); tx_list.push(tx); tx_ids.push(w_tx_id); @@ -259,6 +266,13 @@ impl JobDeclarator { } } } + for txid in missing_prioritized_txids(&prioritized_txids, &template_txids) { + error!( + txid, + template_id = template.template_id, + "prioritized transaction is missing from template transaction list" + ); + } let tx_ids: Seq064K<'static, U256> = Seq064K::from(tx_ids); let coinbase_prefix = self_mutex @@ -581,3 +595,39 @@ impl JobDeclarator { }) } } + +fn missing_prioritized_txids( + prioritized_txids: &[String], + template_txids: &HashSet, +) -> Vec { + prioritized_txids + .iter() + .filter(|txid| !template_txids.contains(*txid)) + .cloned() + .collect() +} + +#[cfg(test)] +mod tests { + use super::missing_prioritized_txids; + use std::collections::HashSet; + + #[test] + fn no_missing_prioritized_txids_when_all_are_in_template() { + let prioritized = vec!["a".to_string(), "b".to_string()]; + let template = HashSet::from(["a".to_string(), "b".to_string(), "c".to_string()]); + + assert!(missing_prioritized_txids(&prioritized, &template).is_empty()); + } + + #[test] + fn returns_only_prioritized_txids_missing_from_template() { + let prioritized = vec!["a".to_string(), "b".to_string(), "c".to_string()]; + let template = HashSet::from(["a".to_string(), "c".to_string()]); + + assert_eq!( + missing_prioritized_txids(&prioritized, &template), + vec!["b".to_string()] + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 956eb050..0c46bee6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,7 @@ pub use config::Configuration; pub mod jd_client; mod minin_pool_connection; mod monitor; +mod prioritized_transactions; mod proxy_state; mod router; mod share_accounter; diff --git a/src/prioritized_transactions.rs b/src/prioritized_transactions.rs new file mode 100644 index 00000000..bec6050b --- /dev/null +++ b/src/prioritized_transactions.rs @@ -0,0 +1,26 @@ +use std::{ + collections::HashSet, + sync::{Mutex, OnceLock}, +}; + +static PRIORITIZED_TXIDS: OnceLock>> = OnceLock::new(); + +fn txids() -> &'static Mutex> { + PRIORITIZED_TXIDS.get_or_init(|| Mutex::new(HashSet::new())) +} + +pub(crate) fn record(txid: &str) { + txids() + .lock() + .expect("prioritized transactions mutex poisoned") + .insert(txid.to_string()); +} + +pub(crate) fn snapshot() -> Vec { + txids() + .lock() + .expect("prioritized transactions mutex poisoned") + .iter() + .cloned() + .collect() +} diff --git a/tests/library_init.rs b/tests/library_init.rs index b2c3ffd4..96e96704 100644 --- a/tests/library_init.rs +++ b/tests/library_init.rs @@ -146,9 +146,9 @@ async fn library_init_sv2_setup_connection() { false, "DDxDD".to_string(), None, - None, - None, - None, + "http://127.0.0.1:8332".to_string(), + "user".to_string(), + "password".to_string(), None, );