Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/api/bitcoin_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use tracing::{debug, info};

const RPC_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const RPC_REQUEST_TIMEOUT: Duration = Duration::from_secs(15);
const DEFAULT_RPC_FEE_DELTA: i64 = 100_000_000;

pub(crate) struct BitcoindRpc {
url: String,
Expand All @@ -16,9 +15,7 @@ pub(crate) struct BitcoindRpc {
}

impl BitcoindRpc {
pub(crate) fn new(url: String, user: String, pwd: String, fee_delta: Option<i64>) -> Self {
let fee_delta = fee_delta.unwrap_or(DEFAULT_RPC_FEE_DELTA);

pub(crate) fn new(url: String, user: String, pwd: String, fee_delta: i64) -> Self {
Self {
url,
user,
Expand Down
20 changes: 10 additions & 10 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ pub struct AppState {
router: Router,
stats_sender: StatsSender,
downstream_handoff: crate::DownstreamHandoffSender,
rpc: Arc<BitcoindRpc>,
rpc: Option<Arc<BitcoindRpc>>,
}

pub(crate) async fn start(
router: Router,
stats_sender: StatsSender,
downstream_handoff: crate::DownstreamHandoffSender,
) {
let rpc_url = Configuration::rpc_url();
let rpc_user = Configuration::rpc_user();
let rpc_pwd = Configuration::rpc_pwd();
let rpc = Configuration::bitcoind_rpc_config().map(|config| {
Arc::new(BitcoindRpc::new(
config.url,
config.user,
config.pwd,
config.fee_delta,
))
});

let state = AppState {
router,
stats_sender,
downstream_handoff,
rpc: Arc::new(BitcoindRpc::new(
rpc_url,
rpc_user,
rpc_pwd,
Configuration::rpc_fee_delta(),
)),
rpc,
};
let app = AxumRouter::new()
.route("/api/health", get(Api::health_check))
Expand Down
50 changes: 43 additions & 7 deletions src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::{
Json,
};
use serde::Serialize;
use tracing::{error, info};
use tracing::{error, info, warn};

pub struct Api {}

Expand Down Expand Up @@ -156,7 +156,17 @@ impl Api {
State(state): State<AppState>,
Path(tx): Path<String>,
) -> impl IntoResponse {
match state.rpc.submit_transaction(&tx).await {
let Some(rpc) = state.rpc.as_ref() else {
warn!("PRIORITIZING TXS NOT ENABLED");
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(APIResponse::error(Some(
"PRIORITIZING TXS NOT ENABLED".to_string(),
))),
);
};

match rpc.submit_transaction(&tx).await {
Ok(txid) => {
info!("transaction sent to bitcoind: {txid}");
(StatusCode::OK, Json(APIResponse::success(Some(txid))))
Expand Down Expand Up @@ -232,15 +242,41 @@ async fn health_check_reports_full_translator_handoff() {
router,
stats_sender: crate::api::stats::StatsSender::new(),
downstream_handoff: handoff_tx,
rpc: std::sync::Arc::new(crate::api::bitcoin_rpc::BitcoindRpc::new(
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
None,
rpc: Some(std::sync::Arc::new(
crate::api::bitcoin_rpc::BitcoindRpc::new(
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
100_000_000,
),
)),
};

let response = Api::health_check(State(state)).await.into_response();

assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}

#[tokio::test]
async fn send_tx_reports_unavailable_when_rpc_is_disabled() {
use axum::extract::{Path, State};
use axum::response::IntoResponse;
use tokio::sync::mpsc;

let auth_pub_k = crate::AUTH_PUB_KEY.parse().expect("Invalid public key");
let router = crate::router::Router::new(vec![], auth_pub_k, None, None);

let (handoff_tx, _handoff_rx) = mpsc::channel(1);
let state = AppState {
router,
stats_sender: crate::api::stats::StatsSender::new(),
downstream_handoff: handoff_tx,
rpc: None,
};

let response = Api::send_tx_to_bitcoind(State(state), Path("00".to_string()))
.await
.into_response();

assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
}
156 changes: 128 additions & 28 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
sync::OnceLock,
time::Duration,
};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::{
shared::{
Expand Down Expand Up @@ -170,11 +170,18 @@ pub struct Configuration {
auto_update: bool,
signature: String,
miner_name: Option<String>,
rpc_url: String,
rpc_user: String,
rpc_pwd: String,
rpc_fee_delta: Option<i64>,
prioritizing_txs_config: Option<BitcoindRpcConfig>,
missing_prioritizing_txs_variables: Vec<&'static str>,
}

#[derive(Clone, Debug)]
pub(crate) struct BitcoindRpcConfig {
pub url: String,
pub user: String,
pub pwd: String,
pub fee_delta: i64,
}

impl Configuration {
fn validate_supported_delay(delay: u64) -> Result<(), String> {
if delay == 0 {
Expand Down Expand Up @@ -224,12 +231,15 @@ and make that test pass."
rpc_url: String,
rpc_user: String,
rpc_pwd: String,
rpc_fee_delta: Option<i64>,
rpc_fee_delta: String,
) -> Self {
if let Err(error) = Self::validate_supported_delay(delay) {
panic!("{error}");
}

let (prioritizing_txs_config, missing_prioritizing_txs_variables) =
Self::build_prioritizing_txs_config(rpc_url, rpc_user, rpc_pwd, rpc_fee_delta);

Configuration {
token,
tp_address,
Expand All @@ -255,11 +265,49 @@ and make that test pass."
auto_update,
signature,
miner_name,
rpc_url,
rpc_user,
rpc_pwd,
rpc_fee_delta,
prioritizing_txs_config,
missing_prioritizing_txs_variables,
}
}

fn build_prioritizing_txs_config(
url: String,
user: String,
pwd: String,
fee_delta: String,
) -> (Option<BitcoindRpcConfig>, Vec<&'static str>) {
let mut missing = Vec::new();
if url.trim().is_empty() {
missing.push("RPC_URL");
}
if user.trim().is_empty() {
missing.push("RPC_USER");
}
if pwd.trim().is_empty() {
missing.push("RPC_PWD");
}
if fee_delta.trim().is_empty() {
missing.push("RPC_FEE_DELTA");
}

if !missing.is_empty() {
return (None, missing);
}

let fee_delta = match fee_delta.trim().parse() {
Ok(fee_delta) => fee_delta,
Err(_) => return (None, vec!["RPC_FEE_DELTA"]),
};

(
Some(BitcoindRpcConfig {
url,
user,
pwd,
fee_delta,
}),
missing,
)
}

pub(crate) fn init(config: Configuration) {
Expand Down Expand Up @@ -298,7 +346,7 @@ and make that test pass."
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
None,
"100000000".to_string(),
)
}

Expand Down Expand Up @@ -462,20 +510,32 @@ and make that test pass."
Self::cfg().miner_name.clone()
}

pub fn rpc_url() -> String {
Self::cfg().rpc_url.clone()
pub(crate) fn prioritizing_txs_enabled() -> bool {
Self::cfg().prioritizing_txs_config.is_some()
}

pub fn rpc_user() -> String {
Self::cfg().rpc_user.clone()
}
pub(crate) fn bitcoind_rpc_config() -> Option<BitcoindRpcConfig> {
if !Self::prioritizing_txs_enabled() {
return None;
}

pub fn rpc_pwd() -> String {
Self::cfg().rpc_pwd.clone()
Self::cfg().prioritizing_txs_config.clone()
}

pub fn rpc_fee_delta() -> Option<i64> {
Self::cfg().rpc_fee_delta
pub(crate) fn log_prioritizing_txs_status() {
let missing = &Self::cfg().missing_prioritizing_txs_variables;
if missing.is_empty() {
return;
}

if missing.len() == 4 {
warn!("PRIORITIZING TXS NOT ENABLED");
} else {
error!(
missing_env_variables = %missing.join(", "),
"PRIORITIZING TXS NOT ENABLED, missing env variable"
);
}
}

// Loads config from CLI args, config file, and env vars with precedence: CLI > file > env.
Expand Down Expand Up @@ -530,22 +590,23 @@ and make that test pass."
.rpc_url
.or(config.rpc_url)
.or_else(|| std::env::var("RPC_URL").ok())
.expect("RPC_URL is not set");
.unwrap_or_default();
let rpc_user = args
.rpc_user
.or(config.rpc_user)
.or_else(|| std::env::var("RPC_USER").ok())
.expect("RPC_USER is not set");
.unwrap_or_default();
let rpc_pwd = args
.rpc_pwd
.or(config.rpc_pwd)
.or_else(|| std::env::var("RPC_PWD").ok())
.expect("RPC_PWD is not set");
let rpc_fee_delta = args.rpc_fee_delta.or(config.rpc_fee_delta).or_else(|| {
std::env::var("RPC_FEE_DELTA")
.ok()
.and_then(|s| s.parse().ok())
});
.unwrap_or_default();
let rpc_fee_delta = args
.rpc_fee_delta
.map(|value| value.to_string())
.or_else(|| config.rpc_fee_delta.map(|value| value.to_string()))
.or_else(|| std::env::var("RPC_FEE_DELTA").ok())
.unwrap_or_default();
if let Some(ref miner_name) = miner_name {
validate_miner_name(miner_name).unwrap_or_else(|e| panic!("{e}"));
}
Expand Down Expand Up @@ -899,4 +960,43 @@ mod tests {
.contains("positive_delay_does_not_replay_stale_bootstrap_difficulty_after_retarget"));
assert!(error.contains("Do not use `--delay`, `DELAY`, or config `delay`"));
}

#[test]
fn prioritizing_txs_requires_all_rpc_values() {
let (config, missing) = Configuration::build_prioritizing_txs_config(
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
"42".to_string(),
);

assert!(missing.is_empty());
assert_eq!(config.expect("config should be enabled").fee_delta, 42);

let (config, missing) = Configuration::build_prioritizing_txs_config(
"http://127.0.0.1:8332".to_string(),
"".to_string(),
"password".to_string(),
"42".to_string(),
);

assert!(config.is_none());
assert_eq!(missing, vec!["RPC_USER"]);
}

#[test]
fn prioritizing_txs_reports_all_missing_rpc_values() {
let (config, missing) = Configuration::build_prioritizing_txs_config(
"".to_string(),
"".to_string(),
"".to_string(),
"".to_string(),
);

assert!(config.is_none());
assert_eq!(
missing,
vec!["RPC_URL", "RPC_USER", "RPC_PWD", "RPC_FEE_DELTA"]
);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async fn start_internal() {
if let Err(e) = tracing_init_result {
eprintln!("Tracing subscriber already set, skipping: {e}");
}
Configuration::log_prioritizing_txs_status();

Configuration::token().expect("TOKEN is not set");

Expand Down
2 changes: 1 addition & 1 deletion tests/library_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn library_init_sv2_setup_connection() {
"http://127.0.0.1:8332".to_string(),
"user".to_string(),
"password".to_string(),
None,
"100000000".to_string(),
);

let proxy = tokio::spawn(dmnd_client::start(config));
Expand Down
Loading