Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 57 additions & 47 deletions src/api/bitcoin_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,28 @@
use axum::http::StatusCode;
use serde::Deserialize;
use serde_json::{json, Value};
use std::{fmt, time::Duration};
use tracing::warn;
use std::{error::Error as StdError, fmt, time::Duration};
use tracing::{debug, info};

const DEFAULT_RPC_URL: &str = "http://127.0.0.1:8332";
const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
const DEFAULT_RPC_FEE_DELTA: i64 = 100_000_000;

pub(crate) struct BitcoindRpc {
url: Option<String>,
user: Option<String>,
pwd: Option<String>,
client: reqwest::Client,
url: String,
user: String,
pwd: String,
fee_delta: i64,
}

impl BitcoindRpc {
pub(crate) fn new(
url: Option<String>,
user: Option<String>,
pwd: Option<String>,
fee_delta: Option<i64>,
) -> Self {
let client = reqwest::Client::builder()
.connect_timeout(RPC_CONNECT_TIMEOUT)
.timeout(RPC_REQUEST_TIMEOUT)
.build()
.expect("Failed to build client");
pub(crate) fn new(url: String, user: String, pwd: String, fee_delta: Option<i64>) -> Self {
let fee_delta = fee_delta.unwrap_or(DEFAULT_RPC_FEE_DELTA);

Self {
url,
user,
pwd,
client,
fee_delta,
}
}
Expand All @@ -49,56 +36,72 @@ impl BitcoindRpc {
BitcoindRpcError::InvalidResponse("empty response from bitcoind".to_string())
})?;
self.prioritise_transaction(&txid).await?;
crate::prioritized_transactions::record(&txid);
Ok(txid)
}

async fn prioritise_transaction(&self, txid: &str) -> Result<(), BitcoindRpcError> {
let (status, text) = self
.send_request("prioritisetransaction", json!([txid, 0, self.fee_delta]))
.await?;
RpcResponse::from_response(status, &text)
info!(
txid,
fee_delta = self.fee_delta,
%status,
response = %text,
"bitcoind prioritisetransaction response"
);

let result = RpcResponse::from_response(status, &text)
.map_err(|e| BitcoindRpcError::Prioritize(e.to_string()))?;
Ok(())

match result.and_then(|value| value.as_bool()) {
Some(true) => Ok(()),
Some(false) => Err(BitcoindRpcError::Prioritize(format!(
"bitcoind returned false for prioritisetransaction: {text}"
))),
None => Err(BitcoindRpcError::Prioritize(format!(
"invalid prioritisetransaction response from bitcoind: {text}"
))),
}
}

async fn send_request(
&self,
method: &str,
params: Value,
) -> Result<(StatusCode, String), BitcoindRpcError> {
let (user, pwd) = match (&self.user, &self.pwd) {
(Some(u), Some(p)) => (u.as_str(), p.as_str()),
_ => {
let msg = match (&self.user, &self.pwd) {
(None, None) => "RPC_USER and RPC_PWD are not set.".to_string(),
(None, _) => "RPC_USER is not set.".to_string(),
_ => "RPC_PWD is not set.".to_string(),
};
return Err(BitcoindRpcError::MissingConfig(msg));
}
};
let url = self.url.as_deref().unwrap_or_else(|| {
warn!("RPC_URL not configured, using default: {DEFAULT_RPC_URL}");
DEFAULT_RPC_URL
});

let body = json!({
"jsonrpc": "1.0",
"id": "dmnd-client",
"method": method,
"params": params
});

let response = self
.client
.post(url)
.basic_auth(user, Some(pwd))
debug!(
method,
url = self.url.as_str(),
rpc_user = self.user.as_str(),
fee_delta = self.fee_delta,
"sending bitcoind RPC request"
);

let client = reqwest::Client::builder()
.connect_timeout(RPC_CONNECT_TIMEOUT)
.timeout(RPC_REQUEST_TIMEOUT)
.build()
.expect("Failed to build client");

let response = client
.post(&self.url)
.basic_auth(&self.user, Some(&self.pwd))
.json(&body)
.send()
.await?;

let status = response.status();
let text = response.text().await?;
debug!(method, %status, response = %text, "received bitcoind RPC response");
Ok((status, text))
}
}
Expand Down Expand Up @@ -160,10 +163,9 @@ fn validate_transaction_hex(tx: &str) -> Result<&str, BitcoindRpcError> {
Ok(tx)
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub(crate) enum BitcoindRpcError {
InvalidTransaction(String),
MissingConfig(String),
Rejected(String),
Timeout(String),
Other(String),
Expand All @@ -177,7 +179,6 @@ impl BitcoindRpcError {
BitcoindRpcError::InvalidTransaction(_) | BitcoindRpcError::Rejected(_) => {
StatusCode::BAD_REQUEST
}
BitcoindRpcError::MissingConfig(_) => StatusCode::SERVICE_UNAVAILABLE,
BitcoindRpcError::Timeout(_) => StatusCode::GATEWAY_TIMEOUT,
BitcoindRpcError::Other(_)
| BitcoindRpcError::InvalidResponse(_)
Expand All @@ -188,10 +189,20 @@ impl BitcoindRpcError {

impl From<reqwest::Error> for BitcoindRpcError {
fn from(error: reqwest::Error) -> Self {
let source = error
.source()
.map(|source| format!("; source: {source}"))
.unwrap_or_default();
if error.is_timeout() {
BitcoindRpcError::Timeout(format!("timed out while connecting to bitcoind: {error}"))
BitcoindRpcError::Timeout(format!(
"timed out while connecting to bitcoind: {error}{source}"
))
} else if error.is_builder() {
BitcoindRpcError::Other(format!(
"failed to build bitcoind RPC request: {error}{source}"
))
} else {
BitcoindRpcError::Other(format!("failed to connect to bitcoind: {error}"))
BitcoindRpcError::Other(format!("failed to connect to bitcoind: {error}{source}"))
}
}
}
Expand All @@ -200,7 +211,6 @@ impl fmt::Display for BitcoindRpcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BitcoindRpcError::InvalidTransaction(msg)
| BitcoindRpcError::MissingConfig(msg)
| BitcoindRpcError::Rejected(msg)
| BitcoindRpcError::Timeout(msg)
| BitcoindRpcError::Other(msg)
Expand Down
5 changes: 4 additions & 1 deletion src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ async fn health_check_reports_full_translator_handoff() {
stats_sender: crate::api::stats::StatsSender::new(),
downstream_handoff: handoff_tx,
rpc: std::sync::Arc::new(crate::api::bitcoin_rpc::BitcoindRpc::new(
None, None, None, None,
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
None,
)),
};

Expand Down
33 changes: 18 additions & 15 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ pub struct Configuration {
auto_update: bool,
signature: String,
miner_name: Option<String>,
rpc_url: Option<String>,
rpc_user: Option<String>,
rpc_pwd: Option<String>,
rpc_url: String,
rpc_user: String,
rpc_pwd: String,
rpc_fee_delta: Option<i64>,
}
impl Configuration {
Expand Down Expand Up @@ -221,9 +221,9 @@ and make that test pass."
auto_update: bool,
signature: String,
miner_name: Option<String>,
rpc_url: Option<String>,
rpc_user: Option<String>,
rpc_pwd: Option<String>,
rpc_url: String,
rpc_user: String,
rpc_pwd: String,
rpc_fee_delta: Option<i64>,
) -> Self {
if let Err(error) = Self::validate_supported_delay(delay) {
Expand Down Expand Up @@ -295,9 +295,9 @@ and make that test pass."
false,
"DDxDD".to_string(),
None,
None,
None,
None,
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
None,
)
}
Expand Down Expand Up @@ -462,15 +462,15 @@ and make that test pass."
Self::cfg().miner_name.clone()
}

pub fn rpc_url() -> Option<String> {
pub fn rpc_url() -> String {
Self::cfg().rpc_url.clone()
}

pub fn rpc_user() -> Option<String> {
pub fn rpc_user() -> String {
Self::cfg().rpc_user.clone()
}

pub fn rpc_pwd() -> Option<String> {
pub fn rpc_pwd() -> String {
Self::cfg().rpc_pwd.clone()
}

Expand Down Expand Up @@ -529,15 +529,18 @@ and make that test pass."
let rpc_url = args
.rpc_url
.or(config.rpc_url)
.or_else(|| std::env::var("RPC_URL").ok());
.or_else(|| std::env::var("RPC_URL").ok())
.expect("RPC_URL is not set");
let rpc_user = args
.rpc_user
.or(config.rpc_user)
.or_else(|| std::env::var("RPC_USER").ok());
.or_else(|| std::env::var("RPC_USER").ok())
.expect("RPC_USER is not set");
let rpc_pwd = args
.rpc_pwd
.or(config.rpc_pwd)
.or_else(|| std::env::var("RPC_PWD").ok());
.or_else(|| std::env::var("RPC_PWD").ok())
.expect("RPC_PWD is not set");
let rpc_fee_delta = args.rpc_fee_delta.or(config.rpc_fee_delta).or_else(|| {
std::env::var("RPC_FEE_DELTA")
.ok()
Expand Down
54 changes: 52 additions & 2 deletions src/jd_client/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use roles_logic_sv2::{
template_distribution_sv2::SetNewPrevHash,
utils::Mutex,
};
use std::{collections::HashMap, convert::TryInto};
use std::{
collections::{HashMap, HashSet},
convert::TryInto,
};
use task_manager::TaskManager;
use tokio::sync::mpsc::{Receiver as TReceiver, Sender as TSender};
use tracing::{error, info};
Expand Down Expand Up @@ -242,13 +245,17 @@ impl JobDeclarator {
.safe_lock(|s| (s.req_ids.next(), s.min_extranonce_size, s.sender.clone()))
.map_err(|_| Error::JobDeclaratorMutexCorrupted)?;

let template_transactions = tx_list_.to_vec();
let prioritized_txids = crate::prioritized_transactions::snapshot();
let mut template_txids = HashSet::with_capacity(template_transactions.len());
let mut tx_list: Vec<Transaction> = Vec::new();
let mut tx_ids = vec![];
for tx in tx_list_.to_vec() {
for tx in template_transactions {
let transaction: Result<Transaction, bitcoin::consensus::encode::Error> =
bitcoin::consensus::deserialize(&tx);
match transaction {
Ok(tx) => {
template_txids.insert(tx.compute_txid().to_string());
let w_tx_id: U256 = tx.compute_wtxid().to_raw_hash().to_byte_array().into();
tx_list.push(tx);
tx_ids.push(w_tx_id);
Expand All @@ -259,6 +266,13 @@ impl JobDeclarator {
}
}
}
for txid in missing_prioritized_txids(&prioritized_txids, &template_txids) {
error!(
txid,
template_id = template.template_id,
"prioritized transaction is missing from template transaction list"
);
}
let tx_ids: Seq064K<'static, U256> = Seq064K::from(tx_ids);

let coinbase_prefix = self_mutex
Expand Down Expand Up @@ -581,3 +595,39 @@ impl JobDeclarator {
})
}
}

fn missing_prioritized_txids(
prioritized_txids: &[String],
template_txids: &HashSet<String>,
) -> Vec<String> {
prioritized_txids
.iter()
.filter(|txid| !template_txids.contains(*txid))
.cloned()
.collect()
}

#[cfg(test)]
mod tests {
use super::missing_prioritized_txids;
use std::collections::HashSet;

#[test]
fn no_missing_prioritized_txids_when_all_are_in_template() {
let prioritized = vec!["a".to_string(), "b".to_string()];
let template = HashSet::from(["a".to_string(), "b".to_string(), "c".to_string()]);

assert!(missing_prioritized_txids(&prioritized, &template).is_empty());
}

#[test]
fn returns_only_prioritized_txids_missing_from_template() {
let prioritized = vec!["a".to_string(), "b".to_string(), "c".to_string()];
let template = HashSet::from(["a".to_string(), "c".to_string()]);

assert_eq!(
missing_prioritized_txids(&prioritized, &template),
vec!["b".to_string()]
);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use config::Configuration;
pub mod jd_client;
mod minin_pool_connection;
mod monitor;
mod prioritized_transactions;
mod proxy_state;
mod router;
mod share_accounter;
Expand Down
Loading