From 950023976d220a40af9954835a0872e7fef73787 Mon Sep 17 00:00:00 2001 From: bitwalt Date: Fri, 1 May 2026 09:06:49 +0100 Subject: [PATCH] Remove bitcoind dependency Sync block/tx state via the configured indexer (electrum or esplora) through lightning-transaction-sync, dropping the lightning-block-sync RPC client and removing the bitcoind RPC fields from the unlock API, OpenAPI spec, README and Dockerfile. Async gossip verification is preserved on top of the new indexer client. Also stabilises two integration-test flakes that surface in long sequential runs: swap_roundtrip_multihop_asset_asset waits for all four channels to be usable before maker_init (both swap legs route over distinct RGB-asset channels), and upload_asset_media::fail tolerates the connection-reset that RequestBodyLimitLayer can produce mid-stream instead of a 413 response. --- Cargo.lock | 56 +- Cargo.toml | 5 +- README.md | 15 - openapi.yaml | 16 - src/bitcoind.rs | 358 --------- src/error.rs | 10 +- src/indexer.rs | 687 ++++++++++++++++++ src/ldk.rs | 258 +++---- src/main.rs | 2 +- src/routes.rs | 4 - src/test/mod.rs | 4 - .../swap_roundtrip_multihop_asset_asset.rs | 4 + src/test/upload_asset_media.rs | 21 +- 13 files changed, 817 insertions(+), 623 deletions(-) delete mode 100644 src/bitcoind.rs create mode 100644 src/indexer.rs diff --git a/Cargo.lock b/Cargo.lock index 8d2f31b7..9afeedda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,7 +476,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b59a3f7fbe678874fa34354097644a171276e02a49934c13b3d61c54610ddf39" dependencies = [ "bdk_core", - "electrum-client 0.24.1", + "electrum-client", ] [[package]] @@ -838,12 +838,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "chunked_transfer" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e4de3bc4ea267985becf712dc6d9eed8b04c953b3fcfb339ebc87acd9804901" - [[package]] name = "cipher" version = "0.4.4" @@ -1356,23 +1350,6 @@ dependencies = [ "serde", ] -[[package]] -name = "electrum-client" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c7b1f8783238bb18e6e137875b0a66f3dffe6c7ea84066e05d033cf180b150f" -dependencies = [ - "bitcoin", - "byteorder", - "libc", - "log", - "rustls 0.23.40", - "serde", - "serde_json", - "webpki-roots 0.25.4", - "winapi", -] - [[package]] name = "electrum-client" version = "0.24.1" @@ -2403,19 +2380,6 @@ dependencies = [ "possiblyrandom", ] -[[package]] -name = "lightning-block-sync" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee5069846b07a62aaecdaf25233e067bc69f245b7c8fd00cc9c217053221f875" -dependencies = [ - "bitcoin", - "chunked_transfer", - "lightning", - "serde_json", - "tokio", -] - [[package]] name = "lightning-dns-resolver" version = "0.3.0" @@ -2502,6 +2466,17 @@ dependencies = [ "lightning", ] +[[package]] +name = "lightning-transaction-sync" +version = "0.2.1" +dependencies = [ + "bitcoin", + "electrum-client", + "esplora-client", + "lightning", + "lightning-macros 0.2.1", +] + [[package]] name = "lightning-types" version = "0.3.1" @@ -3697,20 +3672,21 @@ dependencies = [ "clap", "dircmp", "dirs", - "electrum-client 0.20.0", + "electrum-client", + "esplora-client", "futures", "hex-conservative 0.3.2", "http", "lazy_static", "lightning", "lightning-background-processor", - "lightning-block-sync", "lightning-dns-resolver", "lightning-invoice", "lightning-macros 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lightning-net-tokio", "lightning-persister", "lightning-rapid-gossip-sync", + "lightning-transaction-sync", "magic-crypt", "once_cell", "rand 0.8.6", @@ -3747,7 +3723,7 @@ dependencies = [ "baid64", "base85", "chrono", - "electrum-client 0.24.1", + "electrum-client", "esplora-client", "getrandom 0.3.4", "indexmap", diff --git a/Cargo.toml b/Cargo.toml index fed84e29..b7f49615 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,17 +22,19 @@ chacha20poly1305 = { version = "0.10.1", features = ["stream"] } chrono = { version = "0.4", default-features = false, features = ["clock"] } clap = "4.5.20" dirs = "5.0.1" +electrum-client = { version = "0.24.0", default-features = false, features = ["use-rustls"] } +esplora-client = { version = "0.12", default-features = false, features = ["blocking-https-rustls"] } futures = "0.3" hex = { package = "hex-conservative", version = "0.3.0", default-features = false } lightning = { version = "0.2.0", path = "./rust-lightning/lightning", features = ["dnssec"] } lightning-background-processor = { version = "0.2.0", path = "./rust-lightning/lightning-background-processor" } -lightning-block-sync = { version = "0.2.0", features = ["rpc-client", "tokio"] } lightning-dns-resolver = { version = "0.3.0", path = "./rust-lightning/lightning-dns-resolver" } lightning-invoice = { version = "0.34.0", features = ["std"], path = "./rust-lightning/lightning-invoice" } lightning-macros = { version = "0.2.0" } lightning-net-tokio = { version = "0.2.0" } lightning-persister = { version = "0.2.0", path = "./rust-lightning/lightning-persister", features = ["tokio"] } lightning-rapid-gossip-sync = { version = "0.2.0", path = "./rust-lightning/lightning-rapid-gossip-sync" } +lightning-transaction-sync = { version = "0.2.0", path = "./rust-lightning/lightning-transaction-sync", features = ["electrum", "esplora-blocking"] } magic-crypt = "4.0.1" rand = "0.8.5" regex = { version = "1.11", default-features = false } @@ -59,7 +61,6 @@ zip = { version = "2.2.0", default-features = false, features = ["time", "zstd"] [dev-dependencies] dircmp = "0.2.0" -electrum-client = "0.20.0" http = "1.4.0" lazy_static = { version = "1.5.0", default-features = false } lightning = { version = "0.2.0", path = "./rust-lightning/lightning", features = ["_rln_test_hooks"] } diff --git a/README.md b/README.md index 5a89e408..0d957bcd 100644 --- a/README.md +++ b/README.md @@ -46,14 +46,12 @@ docker build -t rgb-lightning-node . ## Run In order to operate, the node will need: -- a bitcoind node - an indexer instance (electrum or esplora) - an [RGB proxy server] instance Once services are running, daemons can be started. Each daemon needs to be started in a separate shell with `rgb-lightning-node`, specifying: -- bitcoind user, password, host and port - node data directory - node listening port - LN peer listening port @@ -129,18 +127,10 @@ For more info about regtest utility commands, run: ``` When unlocking regtest nodes use the following local services: -- bitcoind_rpc_username: user -- bitcoind_rpc_password: password -- bitcoind_rpc_host: localhost -- bitcoind_rpc_port: 18433 - indexer_url: 127.0.0.1:50001 - proxy_endpoint: rpc://127.0.0.1:3000/json-rpc To unlock a regtest nodes running in docker use the following local services: -- bitcoind_rpc_username: user -- bitcoind_rpc_password: password -- bitcoind_rpc_host: bitcoind -- bitcoind_rpc_port: 18433 - indexer_url: electrs:50001 - proxy_endpoint: rpc://proxy:3000/json-rpc @@ -172,10 +162,6 @@ rgb-lightning-node dataldk2/ --daemon-listening-port 3003 \ ``` When unlocking testnet3 nodes you can use the following services: -- bitcoind_rpc_username: user -- bitcoind_rpc_username: password -- bitcoind_rpc_host: electrum.iriswallet.com -- bitcoind_rpc_port: 18332 - indexer_url: ssl://electrum.iriswallet.com:50013 - proxy_endpoint: rpcs://proxy.iriswallet.com/0.2/json-rpc @@ -183,7 +169,6 @@ When unlocking testnet3 nodes you can use the following services: To run testnet4 use the same options as testnet3 except for: - CLI arg: `--network testnet4` -- bitcoind_rpc_port: 18443 - indexer_url: ssl://electrum.iriswallet.com:50053 ## Use diff --git a/openapi.yaml b/openapi.yaml index 16f81d0d..44210c7c 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -3164,27 +3164,11 @@ components: type: object required: - password - - bitcoind_rpc_username - - bitcoind_rpc_password - - bitcoind_rpc_host - - bitcoind_rpc_port - announce_addresses properties: password: type: string example: nodepassword - bitcoind_rpc_username: - type: string - example: user - bitcoind_rpc_password: - type: string - example: password - bitcoind_rpc_host: - type: string - example: localhost - bitcoind_rpc_port: - type: integer - example: 18443 indexer_url: type: - string diff --git a/src/bitcoind.rs b/src/bitcoind.rs deleted file mode 100644 index 3150ae2b..00000000 --- a/src/bitcoind.rs +++ /dev/null @@ -1,358 +0,0 @@ -use base64::{engine::general_purpose, Engine as _}; -use bitcoin::blockdata::transaction::Transaction; -use bitcoin::consensus::encode; -use bitcoin::hash_types::BlockHash; -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use lightning::log_warn; -use lightning::util::logger::Logger; -use lightning_block_sync::http::HttpEndpoint; -use lightning_block_sync::http::JsonResponse; -use lightning_block_sync::rpc::RpcClient; -use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource}; -use std::collections::HashMap; -use std::convert::TryInto; -use std::str::FromStr; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -use crate::disk::FilesystemLogger; -#[cfg(test)] -use crate::test::mock_fee; - -pub struct BitcoindClient { - pub(crate) bitcoind_rpc_client: Arc, - fees: Arc>, - handle: tokio::runtime::Handle, - logger: Arc, -} - -impl BlockSource for BitcoindClient { - fn get_header<'a>( - &'a self, - header_hash: &'a BlockHash, - height_hint: Option, - ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { - self.bitcoind_rpc_client - .get_header(header_hash, height_hint) - .await - }) - } - - fn get_block<'a>( - &'a self, - header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, BlockData> { - Box::pin(async move { self.bitcoind_rpc_client.get_block(header_hash).await }) - } - - fn get_best_block(&self) -> AsyncBlockSourceResult<'_, (BlockHash, Option)> { - Box::pin(async move { self.bitcoind_rpc_client.get_best_block().await }) - } -} - -pub struct MempoolMinFeeResponse { - pub feerate_sat_per_kw: Option, - pub errored: bool, -} - -impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - let errored = !self.0["errors"].is_null(); - assert_eq!(self.0["maxmempool"].as_u64(), Some(300000000)); - Ok(MempoolMinFeeResponse { - errored, - feerate_sat_per_kw: self.0["mempoolminfee"] - .as_f64() - .map(|feerate_btc_per_kvbyte| { - (feerate_btc_per_kvbyte * 100_000_000.0 / 4.0).round() as u32 - }), - }) - } -} - -pub struct BlockchainInfo { - pub latest_height: usize, - pub latest_blockhash: BlockHash, - pub chain: String, -} - -impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - Ok(BlockchainInfo { - latest_height: self.0["blocks"].as_u64().unwrap() as usize, - latest_blockhash: BlockHash::from_str(self.0["bestblockhash"].as_str().unwrap()) - .unwrap(), - chain: self.0["chain"].as_str().unwrap().to_string(), - }) - } -} - -pub struct FeeResponse { - pub feerate_sat_per_kw: Option, - pub errored: bool, -} - -impl TryInto for JsonResponse { - type Error = std::io::Error; - fn try_into(self) -> std::io::Result { - let errored = !self.0["errors"].is_null(); - Ok(FeeResponse { - errored, - feerate_sat_per_kw: self.0["feerate"].as_f64().map(|feerate_btc_per_kvbyte| { - (feerate_btc_per_kvbyte * 100_000_000.0 / 4.0).round() as u32 - }), - }) - } -} - -/// The minimum feerate we are allowed to send, as specify by LDK. -const MIN_FEERATE: u32 = 253; - -impl BitcoindClient { - pub(crate) async fn new( - host: String, - port: u16, - rpc_user: String, - rpc_password: String, - handle: tokio::runtime::Handle, - logger: Arc, - ) -> std::io::Result { - let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); - let rpc_credentials = general_purpose::STANDARD.encode(format!( - "{}:{}", - rpc_user.clone(), - rpc_password.clone() - )); - let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint); - let _dummy = bitcoind_rpc_client - .call_method::("getblockchaininfo", &[]) - .await - .map_err(|_| { - std::io::Error::new(std::io::ErrorKind::PermissionDenied, - "failed to make initial call to bitcoind - please check your RPC user/password and access settings") - })?; - let mut fees: HashMap = HashMap::new(); - fees.insert( - ConfirmationTarget::MaximumFeeEstimate, - AtomicU32::new(50000), - ); - fees.insert(ConfirmationTarget::UrgentOnChainSweep, AtomicU32::new(5000)); - fees.insert( - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::AnchorChannelFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::NonAnchorChannelFee, - AtomicU32::new(2000), - ); - fees.insert( - ConfirmationTarget::ChannelCloseMinimum, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::OutputSpendingFee, - AtomicU32::new(MIN_FEERATE), - ); - - let client = Self { - bitcoind_rpc_client: Arc::new(bitcoind_rpc_client), - fees: Arc::new(fees), - handle: handle.clone(), - logger, - }; - BitcoindClient::poll_for_fee_estimates( - client.fees.clone(), - client.bitcoind_rpc_client.clone(), - client.logger.clone(), - handle, - ); - Ok(client) - } - - fn poll_for_fee_estimates( - fees: Arc>, - rpc_client: Arc, - logger: Arc, - handle: tokio::runtime::Handle, - ) { - handle.spawn(async move { - async fn get_estimate( - rpc_client: &Arc, - logger: &Arc, - params: &[serde_json::Value], - default: u32, - ) -> u32 { - match rpc_client - .call_method::("estimatesmartfee", params) - .await - { - Ok(res) => match res.feerate_sat_per_kw { - Some(feerate) => Some(std::cmp::max(feerate, MIN_FEERATE)), - None => { - log_warn!(logger, "Fee estimation unavailable"); - None - } - }, - Err(e) => { - log_warn!(logger, "Error getting fee estimate: {}", e); - None - } - } - .unwrap_or(default) - } - - loop { - let mempoolmin_estimate = { - match rpc_client - .call_method::("getmempoolinfo", &[]) - .await - { - Ok(res) => match res.feerate_sat_per_kw { - Some(feerate) => Some(std::cmp::max(feerate, MIN_FEERATE)), - None => { - log_warn!(logger, "Mempool info unavailable"); - None - } - }, - Err(e) => { - log_warn!(logger, "Error getting mepool info: {}", e); - None - } - } - .unwrap_or(MIN_FEERATE) - }; - let background_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(144), serde_json::json!("ECONOMICAL")], - MIN_FEERATE, - ) - .await; - - let normal_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(18), serde_json::json!("ECONOMICAL")], - 2000, - ) - .await; - - let high_prio_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(6), serde_json::json!("CONSERVATIVE")], - 5000, - ) - .await; - - let very_high_prio_estimate = get_estimate( - &rpc_client, - &logger, - &[serde_json::json!(2), serde_json::json!("CONSERVATIVE")], - 50000, - ) - .await; - - fees.get(&ConfirmationTarget::MaximumFeeEstimate) - .unwrap() - .store(very_high_prio_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::UrgentOnChainSweep) - .unwrap() - .store(high_prio_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::MinAllowedAnchorChannelRemoteFee) - .unwrap() - .store(mempoolmin_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee) - .unwrap() - .store(background_estimate - 250, Ordering::Release); - fees.get(&ConfirmationTarget::AnchorChannelFee) - .unwrap() - .store(background_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::NonAnchorChannelFee) - .unwrap() - .store(normal_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::ChannelCloseMinimum) - .unwrap() - .store(background_estimate, Ordering::Release); - fees.get(&ConfirmationTarget::OutputSpendingFee) - .unwrap() - .store(background_estimate, Ordering::Release); - - tokio::time::sleep(Duration::from_secs(60)).await; - } - }); - } - - pub async fn get_blockchain_info(&self) -> BlockchainInfo { - self.bitcoind_rpc_client - .call_method::("getblockchaininfo", &[]) - .await - .unwrap() - } -} - -impl FeeEstimator for BitcoindClient { - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - let fee = self - .fees - .get(&confirmation_target) - .unwrap() - .load(Ordering::Acquire); - #[cfg(test)] - let fee = mock_fee(fee); - fee - } -} - -impl BroadcasterInterface for BitcoindClient { - fn broadcast_transactions(&self, txs: &[&Transaction]) { - // As of Bitcoin Core 28, using `submitpackage` allows us to broadcast multiple - // transactions at once and have them propagate through the network as a whole, avoiding - // some pitfalls with anchor channels where the first transaction doesn't make it into the - // mempool at all. Several older versions of Bitcoin Core also support `submitpackage`, - // however, so we just use it unconditionally here. - // Sadly, Bitcoin Core has an arbitrary restriction on `submitpackage` - it must actually - // contain a package (see https://github.com/bitcoin/bitcoin/issues/31085). - let txn = txs.iter().map(encode::serialize_hex).collect::>(); - let bitcoind_rpc_client = Arc::clone(&self.bitcoind_rpc_client); - let logger = Arc::clone(&self.logger); - self.handle.spawn(async move { - let res = if txn.len() == 1 { - let tx_json = serde_json::json!(txn[0]); - bitcoind_rpc_client - .call_method::("sendrawtransaction", &[tx_json]) - .await - } else { - let tx_json = serde_json::json!(txn); - bitcoind_rpc_client - .call_method::("submitpackage", &[tx_json]) - .await - }; - // This may error due to RL calling `broadcast_transactions` with the same transaction - // multiple times, but the error is safe to ignore. - match res { - Ok(_) => {} - Err(e) => { - let err_str = e.get_ref().unwrap().to_string(); - log_warn!(logger, - "Warning, failed to broadcast a transaction, this is likely okay but may indicate an error: {}\nTransactions: {:?}", - err_str, - txn); - print!("Warning, failed to broadcast a transaction, this is likely okay but may indicate an error: {err_str}\n> "); - } - } - }); - } -} diff --git a/src/error.rs b/src/error.rs index fd32d4af..03c5b908 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,7 @@ use axum::{ response::{IntoResponse, Response}, Json, }; -use rgb_lib::{BitcoinNetwork, Error as RgbLibError}; +use rgb_lib::Error as RgbLibError; use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize)] @@ -56,9 +56,6 @@ pub enum APIError { #[error("Failed to sync BDK: {0}")] FailedBdkSync(String), - #[error("Failed to connect to bitcoind client: {0}")] - FailedBitcoindConnection(String), - #[error("Failed broadcast: {0}")] FailedBroadcast(String), @@ -245,9 +242,6 @@ pub enum APIError { #[error("Network error: {0}")] Network(String), - #[error("The network of the given bitcoind ({0}) doesn't match the node's chain ({1})")] - NetworkMismatch(String, BitcoinNetwork), - #[error("No uncolored UTXOs are available (hint: call createutxos)")] NoAvailableUtxos, @@ -498,7 +492,6 @@ impl IntoResponse for APIError { | APIError::ChangingState | APIError::DuplicatePayment(_) | APIError::FailedBdkSync(_) - | APIError::FailedBitcoindConnection(_) | APIError::FailedBroadcast(_) | APIError::FailedPeerConnection | APIError::InsufficientAssets @@ -510,7 +503,6 @@ impl IntoResponse for APIError { | APIError::LockedNode | APIError::MaxFeeExceeded(_) | APIError::MinFeeNotMet(_) - | APIError::NetworkMismatch(_, _) | APIError::NoAvailableUtxos | APIError::NoRoute | APIError::NotInitialized diff --git a/src/indexer.rs b/src/indexer.rs new file mode 100644 index 00000000..46e82c37 --- /dev/null +++ b/src/indexer.rs @@ -0,0 +1,687 @@ +use bitcoin::block::Header; +use bitcoin::blockdata::transaction::Transaction; +use bitcoin::consensus::encode; +use bitcoin::constants::ChainHash; +use bitcoin::{BlockHash, Network, Script, ScriptBuf, TxOut, Txid}; +use electrum_client::utils::validate_merkle_proof; +use electrum_client::{Client as ElectrumClient, ElectrumApi, Param}; +use esplora_client::blocking::BlockingClient as EsploraBlockingClient; +use esplora_client::Builder as EsploraBuilder; +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::{BestBlock, Confirm, Filter, WatchedOutput}; +use lightning::log_warn; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; +use lightning::util::logger::Logger; +use lightning_transaction_sync::{ElectrumSyncClient, EsploraSyncClient}; +use rgb_lib::wallet::rust_only::IndexerProtocol as RgbLibIndexerProtocol; +use std::collections::{BTreeMap, HashMap}; +use std::io; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::disk::FilesystemLogger; +use crate::ldk::PeerGossipSync; +#[cfg(test)] +use crate::test::mock_fee; + +type Confirmable = Arc; + +const MIN_FEERATE: u32 = 253; + +enum IndexerBackend { + Electrum(Arc), + Esplora(Arc), +} + +pub(crate) struct IndexerClient { + backend: IndexerBackend, + fees: Arc>, + network: Network, + handle: tokio::runtime::Handle, + logger: Arc, +} + +pub(crate) struct IndexerGossipVerifier { + client: Arc, + gossiper: Arc, + peer_manager_wake: Arc, +} + +pub(crate) enum IndexerSyncClient { + Electrum { + client: ElectrumSyncClient>, + registered_txs: Mutex>, + }, + Esplora(EsploraSyncClient>), +} + +pub(crate) struct RegisteredTx { + script_pubkey: ScriptBuf, + confirmed: Option<(u32, BlockHash)>, +} + +struct ConfirmedRegisteredTx { + tx: Transaction, + header: Header, + height: u32, + pos: usize, +} + +impl IndexerClient { + pub(crate) fn new( + server_url: String, + protocol: RgbLibIndexerProtocol, + network: Network, + handle: tokio::runtime::Handle, + logger: Arc, + ) -> io::Result { + let fees = Arc::new(default_fee_buckets()); + let backend = match protocol { + RgbLibIndexerProtocol::Electrum => { + let client = Arc::new(ElectrumClient::new(&server_url).map_err(|e| { + io::Error::other(format!("failed to connect to electrum server: {e}")) + })?); + client.server_features().map_err(|e| { + io::Error::other(format!("failed to query electrum server features: {e}")) + })?; + poll_electrum_fee_estimates( + fees.clone(), + client.clone(), + logger.clone(), + handle.clone(), + ); + IndexerBackend::Electrum(client) + } + RgbLibIndexerProtocol::Esplora => { + let client = Arc::new(EsploraBuilder::new(&server_url).build_blocking()); + client.get_tip_hash().map_err(|e| { + io::Error::other(format!("failed to connect to esplora server: {e}")) + })?; + client.get_height().map_err(|e| { + io::Error::other(format!("failed to query esplora tip height: {e}")) + })?; + poll_esplora_fee_estimates( + fees.clone(), + client.clone(), + logger.clone(), + handle.clone(), + ); + IndexerBackend::Esplora(client) + } + }; + + Ok(Self { + backend, + fees, + network, + handle, + logger, + }) + } + + pub(crate) fn get_best_block(&self) -> io::Result { + match &self.backend { + IndexerBackend::Electrum(client) => { + let tip = client.block_headers_subscribe().map_err(|e| { + io::Error::other(format!("failed to fetch electrum tip header: {e}")) + })?; + Ok(BestBlock::new(tip.header.block_hash(), tip.height as u32)) + } + IndexerBackend::Esplora(client) => { + let tip_hash = client.get_tip_hash().map_err(|e| { + io::Error::other(format!("failed to fetch esplora tip hash: {e}")) + })?; + let tip_height = client.get_height().map_err(|e| { + io::Error::other(format!("failed to fetch esplora tip height: {e}")) + })?; + Ok(BestBlock::new(tip_hash, tip_height)) + } + } + } + + fn lookup_utxo( + &self, + chain_hash: ChainHash, + short_channel_id: u64, + ) -> Result { + if chain_hash != ChainHash::using_genesis_block(self.network) { + return Err(UtxoLookupError::UnknownChain); + } + + let height = (short_channel_id >> 40) as u32; + let tx_index = ((short_channel_id >> 16) & 0x00ff_ffff) as usize; + let vout = (short_channel_id & 0xffff) as usize; + + let txout = match &self.backend { + IndexerBackend::Electrum(client) => { + match electrum_txid_from_pos(client, height as usize, tx_index) + .and_then(|txid| client.transaction_get(&txid)) + { + Ok(tx) => tx.output.get(vout).cloned(), + Err(_) => None, + } + } + IndexerBackend::Esplora(client) => client + .get_block_hash(height) + .and_then(|block_hash| client.get_txid_at_block_index(&block_hash, tx_index)) + .and_then(|txid| match txid { + Some(txid) => client.get_tx_no_opt(&txid).map(Some), + None => Ok(None), + }) + .ok() + .flatten() + .and_then(|tx| tx.output.get(vout).cloned()), + }; + + match txout { + Some(txout) => Ok(txout), + None => Err(UtxoLookupError::UnknownTx), + } + } +} + +impl UtxoLookup for IndexerClient { + fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult { + UtxoResult::Sync(self.lookup_utxo(*chain_hash, short_channel_id)) + } +} + +impl IndexerGossipVerifier { + pub(crate) fn new( + client: Arc, + gossiper: Arc, + peer_manager_wake: Arc, + ) -> Self { + Self { + client, + gossiper, + peer_manager_wake, + } + } +} + +impl UtxoLookup for IndexerGossipVerifier { + fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult { + let result = UtxoFuture::new(); + let future = result.clone(); + let chain_hash = *chain_hash; + let client = self.client.clone(); + let gossiper = self.gossiper.clone(); + let peer_manager_wake = self.peer_manager_wake.clone(); + self.client.handle.spawn(async move { + let lookup = tokio::task::spawn_blocking(move || { + client.lookup_utxo(chain_hash, short_channel_id) + }) + .await + .unwrap_or(Err(UtxoLookupError::UnknownTx)); + future.resolve(gossiper.network_graph(), &*gossiper, lookup); + peer_manager_wake(); + }); + UtxoResult::Async(result) + } +} + +fn electrum_txid_from_pos( + client: &ElectrumClient, + height: usize, + tx_pos: usize, +) -> Result { + let value = client.raw_call( + "blockchain.transaction.id_from_pos", + [ + Param::Usize(height), + Param::Usize(tx_pos), + Param::Bool(true), + ], + )?; + let txid = value + .as_str() + .or_else(|| value.get("tx_hash").and_then(serde_json::Value::as_str)) + .or_else(|| value.get("txid").and_then(serde_json::Value::as_str)) + .or_else(|| value.get("tx_id").and_then(serde_json::Value::as_str)) + .map(str::to_owned) + .ok_or_else(|| electrum_client::Error::InvalidResponse(value.clone()))?; + + Txid::from_str(&txid).map_err(|_| electrum_client::Error::InvalidResponse(value)) +} + +impl IndexerSyncClient { + pub(crate) fn new( + server_url: String, + protocol: RgbLibIndexerProtocol, + logger: Arc, + ) -> io::Result { + match protocol { + RgbLibIndexerProtocol::Electrum => { + let client = ElectrumSyncClient::new(server_url, logger).map_err(|e| { + io::Error::other(format!("failed to initialize electrum sync client: {e}")) + })?; + Ok(Self::Electrum { + client, + registered_txs: Mutex::new(HashMap::new()), + }) + } + RgbLibIndexerProtocol::Esplora => { + Ok(Self::Esplora(EsploraSyncClient::new(server_url, logger))) + } + } + } + + pub(crate) fn sync( + &self, + confirmables: Vec, + ) -> Result<(), Box> { + match self { + Self::Electrum { + client, + registered_txs, + } => { + client + .sync(confirmables.clone()) + .map_err(|e| -> Box { Box::new(e) })?; + sync_electrum_registered_txs(client.client(), registered_txs, &confirmables) + } + Self::Esplora(client) => client + .sync(confirmables) + .map_err(|e| -> Box { Box::new(e) }), + } + } +} + +impl Filter for IndexerSyncClient { + fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { + match self { + Self::Electrum { + client, + registered_txs, + } => { + registered_txs.lock().unwrap().insert( + *txid, + RegisteredTx { + script_pubkey: script_pubkey.to_owned(), + confirmed: None, + }, + ); + client.register_tx(txid, script_pubkey); + } + Self::Esplora(client) => client.register_tx(txid, script_pubkey), + } + } + + fn register_output(&self, output: WatchedOutput) { + match self { + Self::Electrum { client, .. } => client.register_output(output), + Self::Esplora(client) => client.register_output(output), + } + } +} + +fn sync_electrum_registered_txs( + client: Arc, + registered_txs: &Mutex>, + confirmables: &[Confirmable], +) -> Result<(), Box> { + let mut confirmed = Vec::new(); + let mut unconfirmed = Vec::new(); + + { + let mut registered_txs = registered_txs.lock().unwrap(); + for (txid, registered_tx) in registered_txs.iter_mut() { + let history = client.script_get_history(®istered_tx.script_pubkey)?; + let confirmed_history = history + .iter() + .find(|history| history.tx_hash == *txid && history.height > 0); + + let Some(confirmed_history) = confirmed_history else { + if registered_tx.confirmed.take().is_some() { + unconfirmed.push(*txid); + } + continue; + }; + + let height = confirmed_history.height as u32; + let tx = client.transaction_get(txid)?; + let merkle_res = client.transaction_get_merkle(txid, height as usize)?; + let header = client.block_header(height as usize)?; + if !validate_merkle_proof(txid, &header.merkle_root, &merkle_res) { + return Err(Box::new(io::Error::other(format!( + "invalid merkle proof for transaction {txid}" + )))); + } + + let block_hash = header.block_hash(); + if registered_tx.confirmed == Some((height, block_hash)) { + continue; + } + registered_tx.confirmed = Some((height, block_hash)); + confirmed.push(ConfirmedRegisteredTx { + tx, + header, + height, + pos: merkle_res.pos, + }); + } + for confirmed_tx in &confirmed { + registered_txs.remove(&confirmed_tx.tx.compute_txid()); + } + } + + for txid in unconfirmed { + for confirmable in confirmables { + confirmable.transaction_unconfirmed(&txid); + } + } + for confirmed_tx in confirmed { + for confirmable in confirmables { + confirmable.transactions_confirmed( + &confirmed_tx.header, + &[(confirmed_tx.pos, &confirmed_tx.tx)], + confirmed_tx.height, + ); + } + } + + Ok(()) +} + +impl FeeEstimator for IndexerClient { + fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { + let fee = self + .fees + .get(&confirmation_target) + .unwrap() + .load(Ordering::Acquire); + #[cfg(test)] + let fee = mock_fee(fee); + fee + } +} + +impl BroadcasterInterface for IndexerClient { + fn broadcast_transactions(&self, txs: &[&Transaction]) { + match &self.backend { + IndexerBackend::Electrum(client) => { + let txs = txs + .iter() + .map(|tx| encode::serialize(*tx)) + .collect::>(); + let client = client.clone(); + let logger = self.logger.clone(); + self.handle.spawn(async move { + let res = tokio::task::spawn_blocking(move || { + let mut last_error = None; + for tx in txs { + if let Err(e) = client.transaction_broadcast_raw(&tx) { + last_error = Some(e.to_string()); + } + } + last_error.map_or(Ok(()), Err) + }) + .await; + + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => { + log_warn!( + logger, + "Warning, failed to broadcast transaction(s) via electrum: {}", + e + ); + } + Err(e) => { + log_warn!( + logger, + "Warning, failed to spawn electrum broadcaster task: {}", + e + ); + } + } + }); + } + IndexerBackend::Esplora(client) => { + let txs = txs.iter().map(|tx| (*tx).clone()).collect::>(); + let client = client.clone(); + let logger = self.logger.clone(); + self.handle.spawn(async move { + let res = tokio::task::spawn_blocking(move || { + let mut last_error = None; + for tx in txs { + if let Err(e) = client.broadcast(&tx) { + last_error = Some(e.to_string()); + } + } + last_error.map_or(Ok(()), Err) + }) + .await; + + match res { + Ok(Ok(())) => {} + Ok(Err(e)) => { + log_warn!( + logger, + "Warning, failed to broadcast transaction(s) via esplora: {}", + e + ); + } + Err(e) => { + log_warn!( + logger, + "Warning, failed to spawn esplora broadcaster task: {}", + e + ); + } + } + }); + } + } + } +} + +fn default_fee_buckets() -> HashMap { + let mut fees = HashMap::new(); + fees.insert( + ConfirmationTarget::MaximumFeeEstimate, + AtomicU32::new(50000), + ); + fees.insert(ConfirmationTarget::UrgentOnChainSweep, AtomicU32::new(5000)); + fees.insert( + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::AnchorChannelFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::NonAnchorChannelFee, + AtomicU32::new(2000), + ); + fees.insert( + ConfirmationTarget::ChannelCloseMinimum, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::OutputSpendingFee, + AtomicU32::new(MIN_FEERATE), + ); + fees +} + +fn poll_electrum_fee_estimates( + fees: Arc>, + client: Arc, + logger: Arc, + handle: tokio::runtime::Handle, +) { + handle.spawn(async move { + loop { + let res = tokio::task::spawn_blocking({ + let client = client.clone(); + move || { + Ok::<_, electrum_client::Error>(( + client.estimate_fee(144)?, + client.estimate_fee(18)?, + client.estimate_fee(6)?, + client.estimate_fee(2)?, + )) + } + }) + .await; + + match res { + Ok(Ok((background, normal, high_prio, very_high_prio))) => { + let background_estimate = + fee_rate_from_btc_per_kb(background, MIN_FEERATE).unwrap_or(MIN_FEERATE); + let normal_estimate = fee_rate_from_btc_per_kb(normal, 2000).unwrap_or(2000); + let high_prio_estimate = + fee_rate_from_btc_per_kb(high_prio, 5000).unwrap_or(5000); + let very_high_prio_estimate = + fee_rate_from_btc_per_kb(very_high_prio, 50000).unwrap_or(50000); + + fees.get(&ConfirmationTarget::MaximumFeeEstimate) + .unwrap() + .store(very_high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::UrgentOnChainSweep) + .unwrap() + .store(high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedAnchorChannelRemoteFee) + .unwrap() + .store(MIN_FEERATE, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee) + .unwrap() + .store(background_estimate.saturating_sub(250), Ordering::Release); + fees.get(&ConfirmationTarget::AnchorChannelFee) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::NonAnchorChannelFee) + .unwrap() + .store(normal_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::ChannelCloseMinimum) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::OutputSpendingFee) + .unwrap() + .store(background_estimate, Ordering::Release); + } + Ok(Err(e)) => { + log_warn!(logger, "Error getting fee estimate from electrum: {}", e); + } + Err(e) => { + log_warn!(logger, "Error polling electrum fee estimates: {}", e); + } + } + + tokio::time::sleep(Duration::from_secs(60)).await; + } + }); +} + +fn poll_esplora_fee_estimates( + fees: Arc>, + client: Arc, + logger: Arc, + handle: tokio::runtime::Handle, +) { + handle.spawn(async move { + loop { + let res = tokio::task::spawn_blocking({ + let client = client.clone(); + move || client.get_fee_estimates() + }) + .await; + + match res { + Ok(Ok(estimate_map)) => { + let background_estimate = + estimate_fee_rate_sat_per_kw(&estimate_map, 144, MIN_FEERATE); + let normal_estimate = estimate_fee_rate_sat_per_kw(&estimate_map, 18, 2000); + let high_prio_estimate = estimate_fee_rate_sat_per_kw(&estimate_map, 6, 5000); + let very_high_prio_estimate = + estimate_fee_rate_sat_per_kw(&estimate_map, 2, 50000); + + fees.get(&ConfirmationTarget::MaximumFeeEstimate) + .unwrap() + .store(very_high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::UrgentOnChainSweep) + .unwrap() + .store(high_prio_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedAnchorChannelRemoteFee) + .unwrap() + .store(MIN_FEERATE, Ordering::Release); + fees.get(&ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee) + .unwrap() + .store(background_estimate.saturating_sub(250), Ordering::Release); + fees.get(&ConfirmationTarget::AnchorChannelFee) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::NonAnchorChannelFee) + .unwrap() + .store(normal_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::ChannelCloseMinimum) + .unwrap() + .store(background_estimate, Ordering::Release); + fees.get(&ConfirmationTarget::OutputSpendingFee) + .unwrap() + .store(background_estimate, Ordering::Release); + } + Ok(Err(e)) => { + log_warn!(logger, "Error getting fee estimate from esplora: {}", e) + } + Err(e) => log_warn!(logger, "Error polling esplora fee estimates: {}", e), + } + + tokio::time::sleep(Duration::from_secs(60)).await; + } + }); +} + +fn estimate_fee_rate_sat_per_kw( + fee_estimates: &HashMap, + blocks: u16, + default: u32, +) -> u32 { + let Some(sat_per_vb) = interpolate_fee_rate(fee_estimates, blocks) else { + return default; + }; + std::cmp::max((sat_per_vb * 250.0).round() as u32, MIN_FEERATE) +} + +fn interpolate_fee_rate(fee_estimates: &HashMap, blocks: u16) -> Option { + if blocks == 0 || fee_estimates.is_empty() { + return None; + } + + let estimate_map = BTreeMap::from_iter(fee_estimates.iter().map(|(k, v)| (*k, *v))); + if let Some(estimate) = estimate_map.get(&blocks) { + return Some(*estimate); + } + + let lower_key = estimate_map.range(..blocks).next_back().map(|(k, _)| *k); + let upper_key = estimate_map.range(blocks..).next().map(|(k, _)| *k); + + match (lower_key, upper_key) { + (Some(x1), Some(x2)) if x1 != x2 => { + let y1 = estimate_map[&x1]; + let y2 = estimate_map[&x2]; + Some(y1 + (blocks as f64 - x1 as f64) / (x2 as f64 - x1 as f64) * (y2 - y1)) + } + (Some(x), _) | (_, Some(x)) => estimate_map.get(&x).copied(), + _ => None, + } +} + +fn fee_rate_from_btc_per_kb(feerate_btc_per_kb: f64, default: u32) -> Option { + if !feerate_btc_per_kb.is_finite() || feerate_btc_per_kb.is_sign_negative() { + return Some(default); + } + Some(std::cmp::max( + (feerate_btc_per_kb * 100_000_000.0 / 4.0).round() as u32, + MIN_FEERATE, + )) +} diff --git a/src/ldk.rs b/src/ldk.rs index 0c2e51e3..a6375dff 100644 --- a/src/ldk.rs +++ b/src/ldk.rs @@ -3,13 +3,14 @@ use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::hex::DisplayHex; use bitcoin::psbt::{ExtractTxError, Psbt}; use bitcoin::secp256k1::{All, PublicKey, Secp256k1}; +use bitcoin::TxOut; use bitcoin::{io, Amount, Network}; -use bitcoin::{BlockHash, TxOut}; use bitcoin_bech32::WitnessProgram; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; -use lightning::chain::{BestBlock, Filter}; +use lightning::chain::{BestBlock, Confirm, Filter}; use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; use lightning::events::{Event, PaymentFailureReason, PaymentPurpose, ReplayEvent}; +use lightning::impl_writeable_tlv_based; use lightning::ln::channelmanager::{self, PaymentId, RecentPaymentDetails}; use lightning::ln::channelmanager::{ ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager, @@ -32,6 +33,7 @@ use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; +use lightning::routing::utxo::UtxoLookup; use lightning::sign::{ EntropySource, InMemorySigner, KeysManager, NodeSigner, OutputSpender, SpendableOutputDescriptor, @@ -46,13 +48,7 @@ use lightning::util::persist::{ }; use lightning::util::ser::{ReadableArgs, Writeable}; use lightning::util::sweep as ldk_sweep; -use lightning::{chain, impl_writeable_tlv_based}; use lightning_background_processor::{process_events_async, GossipSync, NO_LIQUIDITY_MANAGER}; -use lightning_block_sync::gossip::TokioSpawner; -use lightning_block_sync::init; -use lightning_block_sync::poll; -use lightning_block_sync::SpvClient; -use lightning_block_sync::UnboundedCache; use lightning_dns_resolver::OMDomainResolver; use lightning_invoice::PaymentSecret; use lightning_net_tokio::SocketDescriptor; @@ -93,12 +89,12 @@ use tokio::runtime::Handle; use tokio::sync::watch::Sender; use tokio::task::JoinHandle; -use crate::bitcoind::BitcoindClient; use crate::disk::{ self, FilesystemLogger, CHANNEL_IDS_FNAME, CHANNEL_PEER_DATA, INBOUND_PAYMENTS_FNAME, MAKER_SWAPS_FNAME, OUTBOUND_PAYMENTS_FNAME, OUTPUT_SPENDER_TXES, TAKER_SWAPS_FNAME, }; use crate::error::APIError; +use crate::indexer::{IndexerClient, IndexerGossipVerifier, IndexerSyncClient}; use crate::rgb::{check_rgb_proxy_endpoint, get_rgb_channel_info_optional, RgbLibWalletWrapper}; use crate::routes::{HTLCStatus, SwapStatus, UnlockRequest, DUST_LIMIT_MSAT}; use crate::swap::SwapData; @@ -442,8 +438,8 @@ impl UnlockedAppState { pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, - Arc, - Arc, + Arc, + Arc, Arc, Arc< MonitorUpdatingPersister< @@ -451,23 +447,22 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc, Arc, - Arc, - Arc, + Arc, + Arc, >, >, Arc, >; -pub(crate) type GossipVerifier = lightning_block_sync::gossip::GossipVerifier< - TokioSpawner, - Arc, - Arc, ->; +pub(crate) type ChainSource = IndexerSyncClient; + +pub(crate) type PeerGossipSync = + P2PGossipSync, Arc, Arc>; pub(crate) type PeerManager = LdkPeerManager< SocketDescriptor, Arc, - Arc, Arc, Arc>>, + Arc, Arc, Arc, IgnoringMessageHandler, @@ -487,7 +482,7 @@ pub(crate) type Router = DefaultRouter< >; pub(crate) type ChannelManager = - SimpleArcChannelManager; + SimpleArcChannelManager; pub(crate) type NetworkGraph = gossip::NetworkGraph>; @@ -504,7 +499,7 @@ pub(crate) type OnionMessenger = LdkOnionMessenger< >; pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler< - Arc, + Arc, Arc, Arc>>, Arc, Arc, @@ -522,9 +517,9 @@ pub(crate) struct RgbOutputSpender { } pub(crate) type OutputSweeper = ldk_sweep::OutputSweeper< - Arc, + Arc, Arc, - Arc, + Arc, Arc, Arc, Arc, @@ -1745,37 +1740,6 @@ pub(crate) async fn start_ldk( let network: Network = bitcoin_network.into(); let ldk_peer_listening_port = static_state.ldk_peer_listening_port; - // Initialize our bitcoind client. - let bitcoind_client = match BitcoindClient::new( - unlock_request.bitcoind_rpc_host.clone(), - unlock_request.bitcoind_rpc_port, - unlock_request.bitcoind_rpc_username.clone(), - unlock_request.bitcoind_rpc_password.clone(), - tokio::runtime::Handle::current(), - Arc::clone(&logger), - ) - .await - { - Ok(client) => Arc::new(client), - Err(e) => { - return Err(APIError::FailedBitcoindConnection(e.to_string())); - } - }; - - // Check that the bitcoind we've connected to is running the network we expect - let bitcoind_chain = bitcoind_client.get_blockchain_info().await.chain; - if bitcoind_chain - != match bitcoin_network { - BitcoinNetwork::Mainnet => "main", - BitcoinNetwork::Testnet => "test", - BitcoinNetwork::Testnet4 => "testnet4", - BitcoinNetwork::Regtest => "regtest", - BitcoinNetwork::Signet | BitcoinNetwork::SignetCustom => "signet", - } - { - return Err(APIError::NetworkMismatch(bitcoind_chain, bitcoin_network)); - } - // RGB setup let indexer_url = if let Some(indexer_url) = &unlock_request.indexer_url { let indexer_protocol = check_indexer_url(indexer_url, bitcoin_network)?; @@ -1822,14 +1786,35 @@ pub(crate) async fn start_ldk( ) .expect("able to write"); + let indexer_protocol = check_indexer_url(indexer_url, bitcoin_network)?; + let indexer_client = Arc::new( + IndexerClient::new( + indexer_url.to_string(), + indexer_protocol.clone(), + network, + tokio::runtime::Handle::current(), + Arc::clone(&logger), + ) + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?, + ); + let tx_sync = Arc::new( + ChainSource::new( + indexer_url.to_string(), + indexer_protocol, + Arc::clone(&logger), + ) + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?, + ); + let chain_source: Arc = tx_sync.clone(); + let chain_tip = indexer_client + .get_best_block() + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?; + // Initialize the FeeEstimator - // BitcoindClient implements the FeeEstimator trait, so it'll act as our fee estimator. - let fee_estimator = bitcoind_client.clone(); + let fee_estimator = indexer_client.clone(); // Initialize the BroadcasterInterface - // BitcoindClient implements the BroadcasterInterface trait, so it'll act as our transaction - // broadcaster. - let broadcaster = bitcoind_client.clone(); + let broadcaster = indexer_client.clone(); // Initialize the KeysManager // The key seed that we use to derive the node privkey (that corresponds to the node pubkey) and @@ -1864,13 +1849,13 @@ pub(crate) async fn start_ldk( 1000, Arc::clone(&keys_manager), Arc::clone(&keys_manager), - Arc::clone(&bitcoind_client), - Arc::clone(&bitcoind_client), + Arc::clone(&indexer_client), + Arc::clone(&indexer_client), )); // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - None, + Some(chain_source.clone()), Arc::clone(&broadcaster), Arc::clone(&logger), Arc::clone(&fee_estimator), @@ -1880,12 +1865,7 @@ pub(crate) async fn start_ldk( )); // Read ChannelMonitor state from disk - let mut channelmonitors = persister.read_all_channel_monitors_with_updates().unwrap(); - - // Poll for the best chain tip, which may be used by the channel manager & spv client - let polled_chain_tip = init::validate_best_block_header(bitcoind_client.as_ref()) - .await - .expect("Failed to fetch best block header and best block"); + let channelmonitors = persister.read_all_channel_monitors_with_updates().unwrap(); // Initialize routing ProbabilisticScorer let network_graph_path = ldk_data_dir.join("network_graph"); @@ -1925,8 +1905,7 @@ pub(crate) async fn start_ldk( .channel_handshake_config .negotiate_anchors_zero_fee_htlc_tx = true; user_config.manually_accept_inbound_channels = true; - let mut restarting_node = true; - let (channel_manager_blockhash, channel_manager) = { + let channel_manager = { if let Ok(f) = fs::File::open(ldk_data_dir.join("manager")) { let mut channel_monitor_references = Vec::new(); for (_, channel_monitor) in channelmonitors.iter() { @@ -1946,18 +1925,17 @@ pub(crate) async fn start_ldk( channel_monitor_references, ldk_data_dir_path.clone(), ); - <(BlockHash, ChannelManager)>::read(&mut BufReader::new(f), read_args).unwrap() + let (_, channel_manager) = + <(bitcoin::BlockHash, ChannelManager)>::read(&mut BufReader::new(f), read_args) + .unwrap(); + channel_manager } else { // We're starting a fresh node. - restarting_node = false; - - let polled_best_block = polled_chain_tip.to_best_block(); - let polled_best_block_hash = polled_best_block.block_hash; let chain_params = ChainParameters { network, - best_block: polled_best_block, + best_block: chain_tip, }; - let fresh_channel_manager = channelmanager::ChannelManager::new( + channelmanager::ChannelManager::new( fee_estimator.clone(), chain_monitor.clone(), broadcaster.clone(), @@ -1971,8 +1949,7 @@ pub(crate) async fn start_ldk( chain_params, cur.as_secs() as u32, ldk_data_dir_path.clone(), - ); - (polled_best_block_hash, fresh_channel_manager) + ) } }; @@ -2065,7 +2042,7 @@ pub(crate) async fn start_ldk( txes, proxy_endpoint: proxy_endpoint.to_string(), }); - let (sweeper_best_block, output_sweeper) = match fs_store.read( + let (_sweeper_best_block, output_sweeper) = match fs_store.read( OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, @@ -2075,7 +2052,7 @@ pub(crate) async fn start_ldk( channel_manager.current_best_block(), broadcaster.clone(), fee_estimator.clone(), - None, + Some(chain_source.clone()), rgb_output_spender, rgb_wallet_wrapper.clone(), fs_store.clone(), @@ -2087,7 +2064,7 @@ pub(crate) async fn start_ldk( let read_args = ( broadcaster.clone(), fee_estimator.clone(), - None, + Some(chain_source.clone()), rgb_output_spender.clone(), rgb_wallet_wrapper.clone(), fs_store.clone(), @@ -2100,71 +2077,8 @@ pub(crate) async fn start_ldk( Err(e) => panic!("Failed to read OutputSweeper with {e}"), }; - // Sync ChannelMonitors, ChannelManager and OutputSweeper to chain tip - let mut chain_listener_channel_monitors = Vec::new(); - let mut cache = UnboundedCache::new(); - let chain_tip = if restarting_node { - let mut chain_listeners = vec![ - ( - channel_manager_blockhash, - &channel_manager as &(dyn chain::Listen + Send + Sync), - ), - ( - sweeper_best_block.block_hash, - &output_sweeper as &(dyn chain::Listen + Send + Sync), - ), - ]; - - for (blockhash, channel_monitor) in channelmonitors.drain(..) { - let outpoint = channel_monitor.get_funding_txo(); - chain_listener_channel_monitors.push(( - blockhash, - ( - channel_monitor, - broadcaster.clone(), - fee_estimator.clone(), - logger.clone(), - ), - outpoint, - )); - } - - for monitor_listener_info in chain_listener_channel_monitors.iter_mut() { - chain_listeners.push(( - monitor_listener_info.0, - &monitor_listener_info.1 as &(dyn chain::Listen + Send + Sync), - )); - } - - let mut attempts = 3; - loop { - match init::synchronize_listeners( - bitcoind_client.as_ref(), - network, - &mut cache, - chain_listeners.clone(), - ) - .await - { - Ok(res) => break res, - Err(e) => { - tracing::error!("Error synchronizing chain: {:?}", e); - attempts -= 1; - if attempts == 0 { - return Err(APIError::FailedBitcoindConnection( - e.into_inner().to_string(), - )); - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - } - } else { - polled_chain_tip - }; - // Give ChannelMonitors to ChainMonitor - for (_, (channel_monitor, _, _, _), _) in chain_listener_channel_monitors { + for (_, channel_monitor) in channelmonitors { let channel_id = channel_monitor.channel_id(); assert_eq!( chain_monitor.load_existing_monitor(channel_id, channel_monitor), @@ -2173,7 +2087,7 @@ pub(crate) async fn start_ldk( } // Optional: Initialize the P2PGossipSync - let gossip_sync = Arc::new(P2PGossipSync::new( + let gossip_sync: Arc = Arc::new(P2PGossipSync::new( Arc::clone(&network_graph), None, Arc::clone(&logger), @@ -2223,15 +2137,16 @@ pub(crate) async fn start_ldk( logger.clone(), Arc::clone(&keys_manager), )); - - // Install a GossipVerifier in in the P2PGossipSync - let utxo_lookup = GossipVerifier::new( - Arc::clone(&bitcoind_client.bitcoind_rpc_client), - TokioSpawner, + let peer_manager_wake = Arc::new({ + let peer_manager = Arc::clone(&peer_manager); + move || peer_manager.process_events() + }); + let utxo_lookup: Arc = Arc::new(IndexerGossipVerifier::new( + Arc::clone(&indexer_client), Arc::clone(&gossip_sync), - Arc::clone(&peer_manager), - ); - gossip_sync.add_utxo_lookup(Some(Arc::new(utxo_lookup))); + peer_manager_wake, + )); + gossip_sync.add_utxo_lookup(Some(utxo_lookup)); // ## Running LDK // Initialize networking @@ -2260,26 +2175,24 @@ pub(crate) async fn start_ldk( } }); - // Connect and Disconnect Blocks let output_sweeper: Arc = Arc::new(output_sweeper); - let channel_manager_listener = channel_manager.clone(); - let chain_monitor_listener = chain_monitor.clone(); - let output_sweeper_listener = output_sweeper.clone(); - let bitcoind_block_source = bitcoind_client.clone(); + let confirmables: Vec> = vec![ + channel_manager.clone(), + chain_monitor.clone(), + output_sweeper.clone(), + ]; + sync_chain_data(tx_sync.clone(), confirmables.clone()) + .await + .map_err(|e| APIError::InvalidIndexer(e.to_string()))?; + let stop_listen = Arc::clone(&stop_processing); tokio::spawn(async move { - let chain_poller = poll::ChainPoller::new(bitcoind_block_source.as_ref(), network); - let chain_listener = ( - chain_monitor_listener, - &(channel_manager_listener, output_sweeper_listener), - ); - let mut spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); loop { if stop_listen.load(Ordering::Acquire) { return; } - if let Err(e) = spv_client.poll_best_tip().await { - tracing::error!("Error while polling best tip: {:?}", e); + if let Err(e) = sync_chain_data(tx_sync.clone(), confirmables.clone()).await { + tracing::error!("Error while syncing via indexer: {:?}", e); } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -2496,6 +2409,15 @@ pub(crate) async fn start_ldk( )) } +async fn sync_chain_data( + tx_sync: Arc, + confirmables: Vec>, +) -> Result<(), Box> { + tokio::task::spawn_blocking(move || tx_sync.sync(confirmables)) + .await + .map_err(|e| -> Box { Box::new(e) })? +} + impl AppState { fn stop_ldk(&self) -> Option>> { let mut ldk_background_services = self.get_ldk_background_services(); diff --git a/src/main.rs b/src/main.rs index 4fcc5bbd..d3fdfe64 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,9 @@ mod args; mod auth; mod backup; -mod bitcoind; mod disk; mod error; +mod indexer; mod ldk; mod rgb; mod routes; diff --git a/src/routes.rs b/src/routes.rs index ae6c8ed0..8e2c7862 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1313,10 +1313,6 @@ pub(crate) enum TransportType { #[derive(Deserialize, Serialize)] pub(crate) struct UnlockRequest { pub(crate) password: String, - pub(crate) bitcoind_rpc_username: String, - pub(crate) bitcoind_rpc_password: String, - pub(crate) bitcoind_rpc_host: String, - pub(crate) bitcoind_rpc_port: u16, pub(crate) indexer_url: Option, pub(crate) proxy_endpoint: Option, pub(crate) announce_addresses: Vec, diff --git a/src/test/mod.rs b/src/test/mod.rs index 0e3d9c66..9f0adeb6 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -1730,10 +1730,6 @@ async fn taker(node_address: SocketAddr, swapstring: String) -> EmptyResponse { fn unlock_req(password: &str) -> UnlockRequest { UnlockRequest { password: password.to_string(), - bitcoind_rpc_username: s!("user"), - bitcoind_rpc_password: s!("password"), - bitcoind_rpc_host: s!("localhost"), - bitcoind_rpc_port: 18443, indexer_url: Some(ELECTRUM_URL_REGTEST.to_string()), proxy_endpoint: Some(PROXY_ENDPOINT_LOCAL.to_string()), announce_addresses: vec![], diff --git a/src/test/swap_roundtrip_multihop_asset_asset.rs b/src/test/swap_roundtrip_multihop_asset_asset.rs index dcac6f0e..3018d4b0 100644 --- a/src/test/swap_roundtrip_multihop_asset_asset.rs +++ b/src/test/swap_roundtrip_multihop_asset_asset.rs @@ -103,6 +103,10 @@ async fn swap_roundtrip_multihop_asset_asset() { ) .await; + wait_for_usable_channels(node1_addr, 2).await; + wait_for_usable_channels(node2_addr, 4).await; + wait_for_usable_channels(node3_addr, 2).await; + let channels_1_before = list_channels(node1_addr).await; let channels_2_before = list_channels(node2_addr).await; let channels_3_before = list_channels(node3_addr).await; diff --git a/src/test/upload_asset_media.rs b/src/test/upload_asset_media.rs index 641669ea..9b588341 100644 --- a/src/test/upload_asset_media.rs +++ b/src/test/upload_asset_media.rs @@ -101,13 +101,22 @@ async fn fail() { "file", reqwest::multipart::Part::bytes(file_bytes).headers([].into_iter().collect()), ); - let res = reqwest::Client::new() + // RequestBodyLimitLayer can either reply 413 or close the connection mid-stream + // depending on how much of the body the server has read; both prove the limit triggered. + let result = reqwest::Client::new() .post(format!("http://{node1_addr}/postassetmedia")) .multipart(form) .send() - .await - .unwrap(); - assert_eq!(res.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE); - let api_error_response = res.text().await.unwrap(); - assert_eq!(api_error_response, "length limit exceeded"); + .await; + match result { + Ok(res) => { + assert_eq!(res.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE); + let api_error_response = res.text().await.unwrap(); + assert_eq!(api_error_response, "length limit exceeded"); + } + Err(e) => assert!( + e.is_request() || e.is_body(), + "expected payload-too-large rejection, got: {e:?}" + ), + } }