From 26b73012e925d10405211ca03fe4f0f81f099688 Mon Sep 17 00:00:00 2001 From: bennyhodl Date: Wed, 14 May 2025 15:29:10 -0500 Subject: [PATCH 1/2] Unified error type --- ddk-node/src/command.rs | 3 +- ddk-node/src/lib.rs | 2 +- ddk/examples/lightning.rs | 9 +- ddk/examples/nostr.rs | 7 +- ddk/examples/postgres.rs | 7 +- ddk/src/builder.rs | 14 +- ddk/src/chain/esplora.rs | 4 +- ddk/src/ddk.rs | 88 ++++++------ ddk/src/error.rs | 121 +++++++++++++++-- ddk/src/json.rs | 16 ++- ddk/src/lib.rs | 5 +- ddk/src/nostr/messages.rs | 31 +++-- ddk/src/nostr/mod.rs | 16 +-- ddk/src/oracle/kormir.rs | 49 ++++--- ddk/src/oracle/nostr.rs | 32 ++--- ddk/src/oracle/p2p_derivatives.rs | 44 +++--- ddk/src/storage/postgres/mod.rs | 89 ++++++++---- ddk/src/storage/sqlx.rs | 3 + ddk/src/transport/lightning/mod.rs | 8 +- ddk/src/transport/lightning/peer_manager.rs | 29 ++-- ddk/src/transport/memory.rs | 4 +- ddk/src/transport/nostr/mod.rs | 5 +- ddk/src/transport/nostr/relay_handler.rs | 24 +++- ddk/src/wallet.rs | 143 ++++++++++++-------- testconfig/nostr-relay.toml | 6 +- 25 files changed, 487 insertions(+), 272 deletions(-) diff --git a/ddk-node/src/command.rs b/ddk-node/src/command.rs index 5825e542..cea6f43e 100644 --- a/ddk-node/src/command.rs +++ b/ddk-node/src/command.rs @@ -18,6 +18,7 @@ use ddk_manager::contract::contract_input::{ContractInput, ContractInputInfo, Or use ddk_manager::contract::enum_descriptor::EnumDescriptor; use ddk_manager::contract::offered_contract::OfferedContract; use ddk_manager::contract::{Contract, ContractDescriptor}; +use ddk_manager::Oracle; use dlc::{EnumerationPayout, Payout}; use dlc_messages::oracle_msgs::{EventDescriptor, OracleAnnouncement}; use dlc_messages::{AcceptDlc, OfferDlc}; @@ -227,7 +228,7 @@ async fn generate_contract_input() -> anyhow::Result { .await?; let oracle_input = OracleInput { - public_keys: vec![kormir.get_pubkey().await?], + public_keys: vec![kormir.get_public_key()], event_id: announcement.oracle_event.event_id, threshold: 1, }; diff --git a/ddk-node/src/lib.rs b/ddk-node/src/lib.rs index 2c96b778..4fe04dd0 100644 --- a/ddk-node/src/lib.rs +++ b/ddk-node/src/lib.rs @@ -308,7 +308,7 @@ impl DdkRpc for DdkNode { &self, _request: Request, ) -> Result, Status> { - let pubkey = self.node.oracle.get_pubkey().await.unwrap().to_string(); + let pubkey = self.node.oracle.get_public_key().to_string(); let name = self.node.oracle.name(); Ok(Response::new(ListOraclesResponse { name, pubkey })) } diff --git a/ddk/examples/lightning.rs b/ddk/examples/lightning.rs index 3a0334fb..3127bef4 100644 --- a/ddk/examples/lightning.rs +++ b/ddk/examples/lightning.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use bitcoin::key::rand::Fill; use ddk::builder::Builder; use ddk::oracle::kormir::KormirOracleClient; @@ -10,13 +9,15 @@ use std::sync::Arc; type ApplicationDdk = ddk::DlcDevKit; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), ddk::error::Error> { let transport = Arc::new(LightningTransport::new(&[0u8; 32], 1776)?); - let storage = Arc::new(SledStorage::new(current_dir()?.to_str().unwrap())?); + let storage = Arc::new(SledStorage::new(current_dir().unwrap().to_str().unwrap()).unwrap()); let oracle_client = Arc::new(KormirOracleClient::new("host", None).await?); let mut seed_bytes = [0u8; 32]; - seed_bytes.try_fill(&mut bitcoin::key::rand::thread_rng())?; + seed_bytes + .try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); let mut builder = Builder::new(); builder.set_seed_bytes(seed_bytes); diff --git a/ddk/examples/nostr.rs b/ddk/examples/nostr.rs index 654ccb01..c99f07fa 100644 --- a/ddk/examples/nostr.rs +++ b/ddk/examples/nostr.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use bitcoin::key::rand::Fill; use bitcoin::Network; use ddk::builder::Builder; @@ -10,9 +9,11 @@ use std::sync::Arc; type NostrDdk = ddk::DlcDevKit; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), ddk::error::Error> { let mut seed_bytes = [0u8; 32]; - seed_bytes.try_fill(&mut bitcoin::key::rand::thread_rng())?; + seed_bytes + .try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); let transport = Arc::new(NostrDlc::new(&seed_bytes, "wss://nostr.dlcdevkit.com", Network::Regtest).await?); diff --git a/ddk/examples/postgres.rs b/ddk/examples/postgres.rs index eef5cd68..cc299575 100644 --- a/ddk/examples/postgres.rs +++ b/ddk/examples/postgres.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use bitcoin::key::rand::Fill; use ddk::builder::Builder; use ddk::oracle::kormir::KormirOracleClient; @@ -9,7 +8,7 @@ use std::sync::Arc; type ApplicationDdk = ddk::DlcDevKit; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), ddk::error::Error> { let transport = Arc::new(LightningTransport::new(&[0u8; 32], 1776)?); let storage = Arc::new( PostgresStore::new( @@ -23,7 +22,9 @@ async fn main() -> Result<()> { Arc::new(KormirOracleClient::new("https://kormir.dlcdevkit.com", None).await?); let mut seed_bytes = [0u8; 32]; - seed_bytes.try_fill(&mut bitcoin::key::rand::thread_rng())?; + seed_bytes + .try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); let mut builder = Builder::new(); builder.set_seed_bytes(seed_bytes); diff --git a/ddk/src/builder.rs b/ddk/src/builder.rs index 97dff168..3c101033 100644 --- a/ddk/src/builder.rs +++ b/ddk/src/builder.rs @@ -7,9 +7,9 @@ use std::sync::{Arc, RwLock}; use crate::chain::EsploraClient; use crate::ddk::{DlcDevKit, DlcManagerMessage}; +use crate::error::{BuilderError, Error}; use crate::wallet::DlcDevKitWallet; use crate::{Oracle, Storage, Transport}; -use thiserror::Error; const DEFAULT_ESPLORA_HOST: &str = "https://mutinynet.com/api"; const DEFAULT_NETWORK: Network = Network::Signet; @@ -26,16 +26,6 @@ pub struct Builder { seed_bytes: [u8; 32], } -/// An error that could be thrown while building [`crate::ddk::DlcDevKit`] -#[derive(Debug, Clone, Copy, Error)] -pub enum BuilderError { - #[error("A transport was not provided.")] - NoTransport, - #[error("A storage implementation was not provided.")] - NoStorage, - #[error("An oracle client was not provided.")] - NoOracle, -} /// Defaults when creating a DDK application /// Transport, storage, and oracle is set to none. /// @@ -111,7 +101,7 @@ impl Builder { } /// Builds the `DlcDevKit` instance. Fails if any components are missing. - pub async fn finish(&self) -> anyhow::Result> { + pub async fn finish(&self) -> Result, Error> { tracing::info!( network = self.network.to_string(), esplora = self.esplora_host, diff --git a/ddk/src/chain/esplora.rs b/ddk/src/chain/esplora.rs index 380700ee..473f747c 100644 --- a/ddk/src/chain/esplora.rs +++ b/ddk/src/chain/esplora.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use crate::error::esplora_err_to_manager_err; +use crate::error::{esplora_err_to_manager_err, Error}; use bdk_esplora::esplora_client::Error as EsploraError; use bdk_esplora::esplora_client::{AsyncClient, BlockingClient, Builder}; use bitcoin::Network; @@ -20,7 +20,7 @@ pub struct EsploraClient { } impl EsploraClient { - pub fn new(esplora_host: &str, network: Network) -> Result { + pub fn new(esplora_host: &str, network: Network) -> Result { let builder = Builder::new(esplora_host).timeout(Duration::from_secs(5).as_secs()); let blocking_client = builder.clone().build_blocking(); let async_client = builder.build_async()?; diff --git a/ddk/src/ddk.rs b/ddk/src/ddk.rs index 092727a0..c3d3cfcf 100644 --- a/ddk/src/ddk.rs +++ b/ddk/src/ddk.rs @@ -1,14 +1,13 @@ use crate::chain::EsploraClient; +use crate::error::Error; use crate::wallet::DlcDevKitWallet; #[cfg(feature = "marketplace")] use crate::{nostr::marketplace::*, DEFAULT_NOSTR_RELAY}; use crate::{Oracle, Storage, Transport}; -use anyhow::anyhow; use bitcoin::secp256k1::PublicKey; use bitcoin::{Amount, Network}; use crossbeam::channel::{unbounded, Receiver, Sender}; use ddk_manager::contract::Contract; -use ddk_manager::error::Error; use ddk_manager::{ contract::contract_input::ContractInput, CachedContractSignerProvider, ContractId, SimpleSigner, SystemTimeProvider, @@ -32,17 +31,22 @@ pub type DlcDevKitDlcManager = ddk_manager::manager::Manager< SimpleSigner, >; +type Result = std::result::Result; +type StdResult = std::result::Result; + #[derive(Debug)] pub enum DlcManagerMessage { AcceptDlc { contract: ContractId, - responder: Sender>, + responder: Sender< + std::result::Result<(ContractId, PublicKey, AcceptDlc), ddk_manager::error::Error>, + >, }, OfferDlc { contract_input: ContractInput, counter_party: PublicKey, oracle_announcements: Vec, - responder: Sender, + responder: Sender>, }, PeriodicCheck, } @@ -67,7 +71,7 @@ where S: Storage, O: Oracle, { - pub fn start(&self) -> anyhow::Result<()> { + pub fn start(&self) -> Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -75,11 +79,11 @@ where self.start_with_runtime(runtime) } - pub fn start_with_runtime(&self, runtime: Runtime) -> anyhow::Result<()> { + pub fn start_with_runtime(&self, runtime: Runtime) -> Result<()> { let mut runtime_lock = self.runtime.write().unwrap(); if runtime_lock.is_some() { - return Err(anyhow!("DDK is still running.")); + return Err(Error::RuntimeExists); } let manager_clone = self.manager.clone(); @@ -128,21 +132,21 @@ where }); } - // TODO: connect stored peers. - *runtime_lock = Some(runtime); Ok(()) } - pub fn stop(&self) -> anyhow::Result<()> { + pub fn stop(&self) -> Result<()> { tracing::warn!("Shutting down DDK runtime and listeners."); - self.stop_signal_sender.send(true)?; + self.stop_signal_sender + .send(true) + .map_err(|e| Error::ActorSendError(e.to_string()))?; let mut runtime_lock = self.runtime.write().unwrap(); if let Some(rt) = runtime_lock.take() { rt.shutdown_background(); Ok(()) } else { - Err(anyhow!("Runtime is not running.")) + Err(Error::NoRuntime) } } @@ -164,8 +168,7 @@ where counter_party, vec![oracle_announcements], ) - .await - .expect("can't create offerdlc"); + .await; responder.send(offer).expect("send offer error") } @@ -178,20 +181,12 @@ where responder.send(accept_dlc).expect("can't send") } DlcManagerMessage::PeriodicCheck => { - manager.periodic_check(false).await.unwrap(); + let _ = manager.periodic_check(false).await; } } } } - pub fn connect_if_necessary(&self) -> anyhow::Result<()> { - let _known_peers = self.storage.list_peers()?; - - // check from already connected - - Ok(()) - } - pub fn network(&self) -> Network { self.network } @@ -201,15 +196,21 @@ where contract_input: &ContractInput, counter_party: PublicKey, oracle_announcements: Vec, - ) -> anyhow::Result { + ) -> Result { let (responder, receiver) = unbounded(); - self.sender.send(DlcManagerMessage::OfferDlc { - contract_input: contract_input.to_owned(), - counter_party, - oracle_announcements, - responder, - })?; - let offer = receiver.recv()?; + self.sender + .send(DlcManagerMessage::OfferDlc { + contract_input: contract_input.to_owned(), + counter_party, + oracle_announcements, + responder, + }) + .map_err(|e| Error::ActorSendError(e.to_string()))?; + let offer = receiver + .recv() + .map_err(|e| Error::ActorReceiveError(e.to_string()))?; + + let offer = offer?; let contract_id = hex::encode(offer.temporary_contract_id); self.transport @@ -227,17 +228,20 @@ where pub async fn accept_dlc_offer( &self, contract: [u8; 32], - ) -> anyhow::Result<(String, String, AcceptDlc)> { + ) -> Result<(String, String, AcceptDlc)> { let (responder, receiver) = unbounded(); - self.sender.send(DlcManagerMessage::AcceptDlc { - contract, - responder, - })?; + self.sender + .send(DlcManagerMessage::AcceptDlc { + contract, + responder, + }) + .map_err(|e| Error::ActorSendError(e.to_string()))?; + + let received_message = receiver + .recv() + .map_err(|e| Error::ActorReceiveError(e.to_string()))?; - let (contract_id, public_key, accept_dlc) = receiver.recv()?.map_err(|e| { - tracing::error!(error=?e, "Could not accept offer."); - anyhow!("Could not accept dlc offer.") - })?; + let (contract_id, public_key, accept_dlc) = received_message?; self.transport .send_message(public_key, Message::Accept(accept_dlc.clone())) @@ -254,7 +258,7 @@ where Ok((contract_id, counter_party, accept_dlc)) } - pub async fn balance(&self) -> anyhow::Result { + pub async fn balance(&self) -> Result { let wallet_balance = self.wallet.get_balance()?; let contracts = self.storage.get_contracts().await?; @@ -279,7 +283,7 @@ where let contract_pnl = &contracts .iter() .map(|contract| match contract { - Contract::Closed(_) => 0_i64, + Contract::Closed(c) => c.pnl, Contract::PreClosed(p) => p .signed_contract .accepted_contract diff --git a/ddk/src/error.rs b/ddk/src/error.rs index 85703552..8e3ecd73 100644 --- a/ddk/src/error.rs +++ b/ddk/src/error.rs @@ -1,16 +1,107 @@ use bdk_esplora::esplora_client::Error as EsploraError; -use ddk_manager::error::Error as ManagerError; +use ddk_manager::error::Error as DlcManagerError; +use thiserror::Error; -pub fn esplora_err_to_manager_err(e: EsploraError) -> ManagerError { - ManagerError::BlockchainError(e.to_string()) +#[derive(Error, Debug)] +pub enum Error { + #[error("DDK runtime has already been initialized.")] + RuntimeExists, + #[error("DDK is not runnging.")] + NoRuntime, + #[error("WalletError: {0}")] + Wallet(#[from] WalletError), + #[error("TransportError: {0}")] + Transport(#[from] TransportError), + #[error("OracleError: {0}")] + Oracle(#[from] OracleError), + #[error("StorageError: {0}")] + Storage(#[from] StorageError), + #[error("ActorSendError: {0}")] + ActorSendError(String), + #[error("ActorReceiveError: {0}")] + ActorReceiveError(String), + #[error("ManagerError: {0}")] + Manager(#[from] DlcManagerError), + #[error("BuilderError: {0}")] + Builder(#[from] BuilderError), + #[error("EsploraError: {0}")] + Esplora(#[from] bdk_esplora::esplora_client::Error), + #[error("Generic error: {0}")] + Generic(String), + #[cfg(feature = "nostr")] + #[error("NostrError: {0}")] + Nostr(#[from] NostrError), } -pub fn wallet_err_to_manager_err(e: WalletError) -> ManagerError { - ManagerError::WalletError(Box::new(e)) +#[derive(Error, Debug)] +pub enum StorageError { + #[error("Storage initialization: {0}")] + Init(String), + #[error("Sqlx storage error: {0}")] + #[cfg(feature = "postgres")] + Sqlx(#[from] crate::storage::sqlx::SqlxError), +} + +#[derive(Error, Debug)] +pub enum OracleError { + #[error("Oracle initialization: {0}")] + Init(String), + #[error("Oracle announcement error: {0}")] + Announcement(String), + #[error("Oracle attestation error: {0}")] + Attestation(String), + #[error("Create oracle event error: {0}")] + CreateEvent(String), + #[error("Sign oracle event error: {0}")] + SignEvent(String), + #[error("Oracle error: {0}")] + Custom(String), + #[error("HTTP error: {0}")] + #[cfg(any(feature = "p2pderivatives", feature = "kormir"))] + Reqwest(#[from] reqwest::Error), +} + +#[derive(Error, Debug)] +pub enum ManagerError { + #[error("DlcManagerError: {0}")] + DlcManager(#[from] DlcManagerError), +} + +#[derive(Error, Debug)] +pub enum TransportError { + #[error("Transport initialization: {0}")] + Init(String), + #[error("Listen error: {0}")] + Listen(String), + #[error("Message processing error: {0}")] + MessageProcessing(String), +} + +#[derive(Error, Debug)] +pub enum NostrError { + #[error("Nostr nip4: {0}")] + Nip04(#[from] nostr_rs::nips::nip04::Error), + #[error("Message parsing error: {0}")] + MessageParsing(String), + #[error("Signing nostr event error: {0}")] + Signing(#[from] nostr_rs::event::builder::Error), + #[error("Nostr generic: {0}")] + Generic(String), +} + +/// An error that could be thrown while building [`crate::ddk::DlcDevKit`] +#[derive(Debug, Clone, Copy, Error)] +pub enum BuilderError { + #[error("A transport was not provided.")] + NoTransport, + #[error("A storage implementation was not provided.")] + NoStorage, + #[error("An oracle client was not provided.")] + NoOracle, } /// BDK and wallet storage errors -#[derive(thiserror::Error, Debug)] +#[derive(Error, Debug)] pub enum WalletError { #[error("Wallet Persistance error: {0}")] WalletPersistanceError(String), @@ -20,15 +111,17 @@ pub enum WalletError { Lock, #[error("Error syncing the internal BDK wallet.")] SyncError, - #[error("Storage error. {0}")] + #[error("Wallet Ssorage error. {0}")] StorageError(String), - #[error("Error with deriving signer: {0}")] + #[error("Signer error: {0}")] SignerError(String), + #[error("TxnBuilder: Failed to build transaction. {0}")] + TxnBuildeR(#[from] bdk_wallet::error::CreateTxError), #[error("Wallet call to esplora: {0}")] - Esplora(#[from] Box), + Esplora(String), #[error("Broadcast to esplora: {0}")] Broadcast(#[from] bdk_esplora::esplora_client::Error), - #[error("Could not extract txn from psbt.")] + #[error("Could not extract txn from psbt for sending.")] ExtractTx, #[error("Applying an update to the wallet.")] UtxoUpdate(#[from] bdk_chain::local_chain::CannotConnectError), @@ -50,3 +143,11 @@ where { ddk_manager::error::Error::StorageError(e.to_string()) } + +pub fn esplora_err_to_manager_err(e: EsploraError) -> DlcManagerError { + DlcManagerError::BlockchainError(e.to_string()) +} + +pub fn wallet_err_to_manager_err(e: WalletError) -> DlcManagerError { + DlcManagerError::WalletError(Box::new(e)) +} diff --git a/ddk/src/json.rs b/ddk/src/json.rs index 5337d0ba..f687ce24 100644 --- a/ddk/src/json.rs +++ b/ddk/src/json.rs @@ -4,9 +4,17 @@ use ddk_manager::contract::{ FailedSignContract, PreClosedContract, }; use dlc_messages::oracle_msgs::EventDescriptor; +use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashSet; +#[derive(Debug, Serialize, Deserialize, Hash, Eq, PartialEq)] +pub struct OracleEvent { + pub event_id: String, + pub oracle_pubkey: String, + pub event_type: String, +} + pub fn offered_contract_to_value(offered_contract: &OfferedContract, state: &str) -> Value { let contract_id = hex::encode(offered_contract.id); let mut event_ids = HashSet::new(); @@ -16,11 +24,15 @@ pub fn offered_contract_to_value(offered_contract: &OfferedContract, state: &str EventDescriptor::EnumEvent(_) => "enum", EventDescriptor::DigitDecompositionEvent(_) => "numerical", }; - let oracle_event = json!({ "event_id": announcement.oracle_event.event_id, "oracle_pubkey": announcement.oracle_public_key.to_string(), "event_type": event_type}); + let oracle_event = OracleEvent { + event_id: announcement.oracle_event.event_id.to_string(), + oracle_pubkey: announcement.oracle_public_key.to_string(), + event_type: event_type.to_string(), + }; event_ids.insert(oracle_event); } } - let event_ids = event_ids.into_iter().collect::>(); + let event_ids = event_ids.into_iter().collect::>(); json!({ "state": state, "contract_id": contract_id, diff --git a/ddk/src/lib.rs b/ddk/src/lib.rs index 4ae0ffee..601dd082 100644 --- a/ddk/src/lib.rs +++ b/ddk/src/lib.rs @@ -40,6 +40,7 @@ use bitcoin::Amount; use ddk::DlcDevKitDlcManager; use dlc_messages::oracle_msgs::OracleAnnouncement; use dlc_messages::Message; +use error::TransportError; use error::WalletError; use std::sync::Arc; use tokio::sync::watch; @@ -57,7 +58,7 @@ pub trait Transport: Send + Sync + 'static { &self, mut stop_signal: watch::Receiver, manager: Arc>, - ) -> Result<(), anyhow::Error>; + ) -> Result<(), TransportError>; /// Send a message to a specific counterparty. async fn send_message(&self, counterparty: PublicKey, message: Message); /// Connect to another peer @@ -72,8 +73,10 @@ pub trait Storage: ddk_manager::Storage + Send + Sync + 'static { /// Save changeset to the wallet storage. async fn persist_bdk(&self, changeset: &ChangeSet) -> Result<(), WalletError>; /// Connected counterparties. + /// TODO: Remove fn list_peers(&self) -> anyhow::Result>; /// Persis counterparty. + /// TODO: Remove fn save_peer(&self, peer: PeerInformation) -> anyhow::Result<()>; // #[cfg(feature = "marketplace")] fn save_announcement(&self, announcement: OracleAnnouncement) -> anyhow::Result<()>; diff --git a/ddk/src/nostr/messages.rs b/ddk/src/nostr/messages.rs index 6e0acc66..6974ba6d 100644 --- a/ddk/src/nostr/messages.rs +++ b/ddk/src/nostr/messages.rs @@ -1,3 +1,4 @@ +use crate::error::NostrError; use crate::nostr::nostr_to_bitcoin_pubkey; use crate::nostr::{DLC_MESSAGE_KIND, ORACLE_ANNOUNCMENT_KIND, ORACLE_ATTESTATION_KIND}; use crate::util::ser::message_variant_name; @@ -27,23 +28,33 @@ pub fn create_oracle_message_filter(since: Timestamp) -> Filter { .since(since) } -pub fn parse_dlc_msg_event(event: &Event, secret_key: &SecretKey) -> anyhow::Result { +pub fn parse_dlc_msg_event(event: &Event, secret_key: &SecretKey) -> Result { let decrypt = nip04::decrypt(secret_key, &event.pubkey, &event.content)?; - let bytes = base64::decode(decrypt)?; + let bytes = base64::decode(decrypt).map_err(|e| NostrError::MessageParsing(e.to_string()))?; let mut cursor = lightning::io::Cursor::new(bytes); - let msg_type: u16 = Readable::read(&mut cursor).unwrap(); + let msg_type: u16 = + Readable::read(&mut cursor).map_err(|e| NostrError::Generic(e.to_string()))?; - let Some(wire) = read_dlc_message(msg_type, &mut cursor).unwrap() else { - return Err(anyhow::anyhow!("Couldn't read DLC message.")); + let Some(wire) = read_dlc_message(msg_type, &mut cursor) + .map_err(|e| NostrError::MessageParsing(e.to_string()))? + else { + return Err(NostrError::MessageParsing( + "Couldn't read DLC message.".to_string(), + )); }; let message = match wire { WireMessage::Message(msg) => Ok(msg), + // We could stll do segment chunks. Nostr relays can handle the large sizes, + // but I'm running a custom relay so generic relays won't be able to handle. WireMessage::SegmentStart(_) | WireMessage::SegmentChunk(_) => { - Err(anyhow::anyhow!("Blah blah, something with a wire")) + Err(NostrError::MessageParsing( + "DLC message is not a valid message. Nostr should not be chunking messages." + .to_string(), + )) } }?; @@ -59,9 +70,11 @@ pub fn parse_dlc_msg_event(event: &Event, secret_key: &SecretKey) -> anyhow::Res pub fn handle_dlc_msg_event( event: &Event, secret_key: &SecretKey, -) -> anyhow::Result<(SecpPublicKey, Message, Event)> { +) -> Result<(SecpPublicKey, Message, Event), NostrError> { if event.kind != Kind::Custom(8_888) { - return Err(anyhow::anyhow!("Event reveived was not DLC Message event.")); + return Err(NostrError::MessageParsing( + "Event reveived was not DLC Message event (kind 8_888).".to_string(), + )); } tracing::info!( kind = 8_888, @@ -81,7 +94,7 @@ pub fn create_dlc_msg_event( event_id: Option, msg: Message, keys: &Keys, -) -> anyhow::Result { +) -> Result { let mut bytes = msg.type_id().encode(); bytes.extend(msg.encode()); diff --git a/ddk/src/nostr/mod.rs b/ddk/src/nostr/mod.rs index aa3fd937..e75417b8 100644 --- a/ddk/src/nostr/mod.rs +++ b/ddk/src/nostr/mod.rs @@ -5,6 +5,8 @@ use lightning::io::Cursor; use lightning::util::ser::Readable; use nostr_rs::{Filter, Kind, PublicKey, Timestamp}; +use crate::error::Error; + /// Nostr [dlc_messages::oracle_msgs::OracleAnnouncement] marketplace. #[cfg(feature = "marketplace")] pub mod marketplace; @@ -41,18 +43,16 @@ pub fn create_oracle_message_filter(since: Timestamp) -> Filter { .since(since) } -pub fn oracle_announcement_from_str(content: &str) -> anyhow::Result { - let bytes = base64::decode(content)?; +pub fn oracle_announcement_from_str(content: &str) -> Result { + let bytes = base64::decode(content).map_err(|e| Error::Generic(e.to_string()))?; let mut cursor = Cursor::new(bytes); - OracleAnnouncement::read(&mut cursor) - .map_err(|_| anyhow::anyhow!("could not get oracle announcement")) + OracleAnnouncement::read(&mut cursor).map_err(|e| Error::Generic(e.to_string())) } -pub fn oracle_attestation_from_str(content: &str) -> anyhow::Result { - let bytes = base64::decode(content)?; +pub fn oracle_attestation_from_str(content: &str) -> Result { + let bytes = base64::decode(content).map_err(|e| Error::Generic(e.to_string()))?; let mut cursor = Cursor::new(bytes); - OracleAttestation::read(&mut cursor) - .map_err(|_| anyhow::anyhow!("could not read oracle attestation")) + OracleAttestation::read(&mut cursor).map_err(|e| Error::Generic(e.to_string())) } #[cfg(test)] diff --git a/ddk/src/oracle/kormir.rs b/ddk/src/oracle/kormir.rs index 33fb364d..35f97143 100644 --- a/ddk/src/oracle/kormir.rs +++ b/ddk/src/oracle/kormir.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use bitcoin::key::XOnlyPublicKey; use dlc_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation}; use hmac::{Hmac, Mac}; @@ -9,7 +8,9 @@ use serde::Serialize; use sha2::Sha256; use uuid::Uuid; -async fn get(host: &str, path: &str) -> anyhow::Result +use crate::error::OracleError; + +async fn get(host: &str, path: &str) -> Result where T: serde::de::DeserializeOwned, { @@ -68,8 +69,11 @@ impl KormirOracleClient { pub async fn new( host: &str, hmac_secret: Option>, - ) -> anyhow::Result { - let pubkey: XOnlyPublicKey = get::(host, "pubkey").await?.pubkey; + ) -> Result { + let pubkey: XOnlyPublicKey = get::(host, "pubkey") + .await + .map_err(|_| OracleError::Init("Could not get pubkey from Kormir.".to_string()))? + .pubkey; let client = reqwest::Client::new(); tracing::info!( host, @@ -85,18 +89,14 @@ impl KormirOracleClient { }) } - pub async fn get_pubkey(&self) -> anyhow::Result { - Ok(self.pubkey) - } - /// List all events stored with the connected Kormir server. /// /// Kormir events includes announcements info, nonce index, signatures /// if announcement has been signed, and nostr information. - pub async fn list_events(&self) -> anyhow::Result> { + pub async fn list_events(&self) -> Result, OracleError> { get(&self.host, "list-events").await.map_err(|e| { tracing::error!(error = e.to_string(), "Error getting all kormir events."); - anyhow!("List events") + OracleError::Announcement("Could not list events from Kormir.".to_string()) }) } @@ -107,7 +107,7 @@ impl KormirOracleClient { &self, outcomes: Vec, maturity: u32, - ) -> anyhow::Result { + ) -> Result { let event_id = Uuid::new_v4().to_string(); let create_event_request = CreateEnumEvent { @@ -138,7 +138,7 @@ impl KormirOracleClient { &self, event_id: String, outcome: String, - ) -> anyhow::Result { + ) -> Result { tracing::info!("Signing event. event_id={} outcome={}", event_id, outcome); let event = SignEnumEvent { @@ -175,7 +175,7 @@ impl KormirOracleClient { precision: Option, unit: String, maturity: u32, - ) -> anyhow::Result { + ) -> Result { let event_id = Uuid::new_v4().to_string(); let create_event_request = CreateNumericEvent { @@ -209,7 +209,7 @@ impl KormirOracleClient { &self, event_id: String, outcome: i64, - ) -> anyhow::Result { + ) -> Result { tracing::info!("Signing event. event_id={} outcome={}", event_id, outcome); let event = SignNumericEvent { @@ -237,19 +237,24 @@ impl KormirOracleClient { fn body_and_headers( &self, json: &T, - ) -> anyhow::Result<(Vec, HeaderMap)> { - let body = serde_json::to_vec(json)?; + ) -> Result<(Vec, HeaderMap), OracleError> { + let body = serde_json::to_vec(json).map_err(|e| OracleError::Custom(e.to_string()))?; let mut headers = HeaderMap::new(); headers.append(CONTENT_TYPE, HeaderValue::from_static("application/json")); if let Some(secret) = &self.hmac_secret { let hmac = Self::calculate_hmac(&body, secret)?; - headers.append("X-Signature", HeaderValue::from_bytes(hmac.as_bytes())?); + headers.append( + "X-Signature", + HeaderValue::from_bytes(hmac.as_bytes()) + .map_err(|e| OracleError::Custom(e.to_string()))?, + ); } Ok((body, headers)) } - fn calculate_hmac(payload: &[u8], secret: &[u8]) -> anyhow::Result { - let mut mac = Hmac::::new_from_slice(secret)?; + fn calculate_hmac(payload: &[u8], secret: &[u8]) -> Result { + let mut mac = Hmac::::new_from_slice(secret) + .map_err(|e| OracleError::Custom(e.to_string()))?; mac.update(payload); let result = mac.finalize().into_bytes(); Ok(hex::encode(result)) @@ -271,7 +276,7 @@ impl ddk_manager::Oracle for KormirOracleClient { .await .map_err(|e| { tracing::error!(error=?e, "Could not get attestation."); - ddk_manager::error::Error::OracleError("Could not get attestation".into()) + ddk_manager::error::Error::OracleError(format!("Could not get attestation: {e}")) })?; tracing::info!(event_id, attestation =? attestation, "Kormir attestation."); Ok(attestation) @@ -287,7 +292,9 @@ impl ddk_manager::Oracle for KormirOracleClient { .await .map_err(|e| { tracing::error!(error =? e, "Could not get announcement."); - ddk_manager::error::Error::OracleError("Could not get announcement".into()) + ddk_manager::error::Error::OracleError(format!( + "Could not get announcement: {e}" + )) })?; tracing::info!(event_id, announcement=?announcement, "Kormir announcement."); Ok(announcement) diff --git a/ddk/src/oracle/nostr.rs b/ddk/src/oracle/nostr.rs index 15d0c962..2e9b9c26 100644 --- a/ddk/src/oracle/nostr.rs +++ b/ddk/src/oracle/nostr.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use std::time::Duration; +use crate::error::OracleError; use bitcoin::XOnlyPublicKey; use ddk_manager::error::Error as ManagerError; use kormir::{OracleAnnouncement, OracleAttestation}; @@ -18,16 +19,6 @@ use nostr_sdk::RelayPoolNotification; use tokio::sync::watch; use tokio::task::JoinHandle; -#[derive(Debug, thiserror::Error)] -pub enum NostrOracleError { - #[error("Failed to make subscription.")] - FailedToMakeSubscription, - #[error("Failed to convert Nostr public key to XOnlyPublicKey.")] - XonlyConversionError, - #[error("Failed to read event.")] - DlcReadError, -} - #[derive(Debug)] pub struct NostrOracle { client: Client, @@ -41,9 +32,13 @@ impl NostrOracle { relays: Vec, since: Option, nostr_oracle_pubkey: NostrPublicKey, - ) -> Result { + ) -> Result { let xonly_oracle_pubkey = XOnlyPublicKey::from_slice(nostr_oracle_pubkey.as_bytes()) - .map_err(|_| NostrOracleError::XonlyConversionError)?; + .map_err(|_| { + OracleError::Init( + "Failed to convert Nostr public key to XOnlyPublicKey.".to_string(), + ) + })?; let client = Client::default(); @@ -63,7 +58,7 @@ impl NostrOracle { client .subscribe(filter, None) .await - .map_err(|_| NostrOracleError::FailedToMakeSubscription)?; + .map_err(|_| OracleError::Init("Failed to make subscription.".to_string()))?; let db = MemoryDatabase::new(); @@ -78,7 +73,7 @@ impl NostrOracle { pub fn start( &self, mut stop_signal: watch::Receiver, - ) -> JoinHandle> { + ) -> JoinHandle> { tracing::info!( pubkey = self.nostr_oracle_pubkey.to_string(), "Starting Nostr Oracle listener." @@ -127,7 +122,7 @@ impl NostrOracle { } } } - Ok::<_, anyhow::Error>(()) + Ok::<_, OracleError>(()) }) } } @@ -205,10 +200,11 @@ impl ddk_manager::Oracle for NostrOracle { } } -fn decode_base64(content: &str) -> Result { - let bytes = base64::decode(content).map_err(|_| NostrOracleError::DlcReadError)?; +fn decode_base64(content: &str) -> Result { + let bytes = base64::decode(content) + .map_err(|_| OracleError::Custom("Failed to decode base64.".to_string()))?; let mut cursor = Cursor::new(bytes); - T::read(&mut cursor).map_err(|_| NostrOracleError::DlcReadError) + T::read(&mut cursor).map_err(|_| OracleError::Custom("Failed to read event.".to_string())) } #[cfg(test)] diff --git a/ddk/src/oracle/p2p_derivatives.rs b/ddk/src/oracle/p2p_derivatives.rs index 834726ec..728c72a3 100644 --- a/ddk/src/oracle/p2p_derivatives.rs +++ b/ddk/src/oracle/p2p_derivatives.rs @@ -3,13 +3,12 @@ //! # cg-oracle-client //! Http client wrapper for the Crypto Garage DLC oracle +use crate::{error::OracleError, Oracle}; use chrono::{DateTime, SecondsFormat, Utc}; use ddk_manager::error::Error as DlcManagerError; use dlc::secp256k1_zkp::{schnorr::Signature, XOnlyPublicKey}; use dlc_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation}; -use crate::Oracle; - /// Enables interacting with a DLC oracle. pub struct P2PDOracleClient { host: String, @@ -55,20 +54,16 @@ struct AttestationResponse { values: Vec, } -async fn get(path: &str) -> Result +async fn get(path: &str) -> Result where T: serde::de::DeserializeOwned, { reqwest::get(path) .await - .map_err(|x| { - ddk_manager::error::Error::IOError( - std::io::Error::new(std::io::ErrorKind::Other, x).into(), - ) - })? + .map_err(|x| OracleError::Reqwest(x))? .json::() .await - .map_err(|e| ddk_manager::error::Error::OracleError(e.to_string())) + .map_err(|e| OracleError::Reqwest(e)) } fn pubkey_path(host: &str) -> String { @@ -97,11 +92,9 @@ impl P2PDOracleClient { /// Try to create an instance of an oracle client connecting to the provided /// host. Returns an error if the host could not be reached. Panics if the /// oracle uses an incompatible format. - pub async fn new(host: &str) -> Result { + pub async fn new(host: &str) -> Result { if host.is_empty() { - return Err(DlcManagerError::InvalidParameters( - "Invalid host".to_string(), - )); + return Err(OracleError::Init("Invalid host".to_string())); } let host = if !host.ends_with('/') { format!("{}{}", host, "/") @@ -117,19 +110,14 @@ impl P2PDOracleClient { } } -fn parse_event_id(event_id: &str) -> Result<(String, DateTime), DlcManagerError> { +fn parse_event_id(event_id: &str) -> Result<(String, DateTime), OracleError> { let asset_id = &event_id[..6]; let timestamp_str = &event_id[6..]; let timestamp: i64 = timestamp_str .parse() - .map_err(|_| DlcManagerError::OracleError("Invalid timestamp format".to_string()))?; + .map_err(|_| OracleError::Custom("Invalid timestamp format".to_string()))?; let naive_date_time = DateTime::from_timestamp(timestamp, 0) - .ok_or_else(|| { - DlcManagerError::InvalidParameters(format!( - "Invalid timestamp {} in event id", - timestamp - )) - })? + .ok_or_else(|| OracleError::Custom(format!("Invalid timestamp {} in event id", timestamp)))? .naive_utc(); let date_time = DateTime::from_naive_utc_and_offset(naive_date_time, Utc); Ok((asset_id.to_string(), date_time)) @@ -145,9 +133,12 @@ impl ddk_manager::Oracle for P2PDOracleClient { &self, event_id: &str, ) -> Result { - let (asset_id, date_time) = parse_event_id(event_id)?; + let (asset_id, date_time) = + parse_event_id(event_id).map_err(|e| DlcManagerError::OracleError(e.to_string()))?; let path = announcement_path(&self.host, &asset_id, &date_time); - let announcement = get(&path).await?; + let announcement = get(&path) + .await + .map_err(|e| DlcManagerError::OracleError(e.to_string()))?; Ok(announcement) } @@ -155,13 +146,16 @@ impl ddk_manager::Oracle for P2PDOracleClient { &self, event_id: &str, ) -> Result { - let (asset_id, date_time) = parse_event_id(event_id)?; + let (asset_id, date_time) = + parse_event_id(event_id).map_err(|e| DlcManagerError::OracleError(e.to_string()))?; let path = attestation_path(&self.host, &asset_id, &date_time); let AttestationResponse { event_id, signatures, values, - } = get::(&path).await?; + } = get::(&path) + .await + .map_err(|e| DlcManagerError::OracleError(e.to_string()))?; Ok(OracleAttestation { event_id, diff --git a/ddk/src/storage/postgres/mod.rs b/ddk/src/storage/postgres/mod.rs index 7f72fab9..57811d23 100644 --- a/ddk/src/storage/postgres/mod.rs +++ b/ddk/src/storage/postgres/mod.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use super::sqlx::{ContractRowNoBytes, SqlxError}; -use crate::error::WalletError; +use crate::error::{StorageError, WalletError}; use crate::transport::PeerInformation; use crate::Storage; use crate::{ @@ -33,6 +33,7 @@ use ddk_manager::{ }; use dlc_messages::oracle_msgs::OracleAnnouncement; use serde_json::json; +use sqlx::pool::PoolOptions; use sqlx::postgres::PgRow; use sqlx::{FromRow, Pool, Postgres, Row, Transaction}; use std::sync::Arc; @@ -46,13 +47,22 @@ pub struct PostgresStore { } impl PostgresStore { - pub async fn new(url: &str, migrations: bool, wallet_name: String) -> Result { - let pool = Pool::::connect(url).await?; + pub async fn new( + url: &str, + migrations: bool, + wallet_name: String, + ) -> Result { + let pool = PoolOptions::::new() + .max_connections(5) + .connect(url) + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; if migrations { tracing::info!("Migrating postgres"); sqlx::migrate!("src/storage/postgres/migrations") .run(&pool) - .await?; + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; } Ok(Self { pool, wallet_name }) @@ -61,7 +71,7 @@ impl PostgresStore { pub async fn get_contract_rows( &self, states: Option>, - ) -> Result, SqlxError> { + ) -> Result, StorageError> { let rows = if let Some(states) = states { let placeholders = (1..=states.len()) .map(|i| format!("${i}")) @@ -76,29 +86,38 @@ impl PostgresStore { query = query.bind(state as i16); } - query.fetch_all(&self.pool).await? + query + .fetch_all(&self.pool) + .await + .map_err(|e| StorageError::Sqlx(e.into()))? } else { sqlx::query_as::( "SELECT id, state, is_offer_party, counter_party, offer_collateral, accept_collateral, total_collateral, fee_rate_per_vb, cet_locktime, refund_locktime, pnl FROM contracts" ) .fetch_all(&self.pool) - .await? + .await + .map_err(|e| StorageError::Sqlx(e.into()))? }; Ok(rows) } - pub async fn get_offer_rows(&self) -> Result, SqlxError> { + pub async fn get_offer_rows(&self) -> Result, StorageError> { let rows = sqlx::query_as::( "SELECT id, state, is_offer_party, counter_party, offer_collateral, accept_collateral, total_collateral, fee_rate_per_vb, cet_locktime, refund_locktime, pnl FROM contracts WHERE state = 1" ) .fetch_all(&self.pool) - .await?; + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; Ok(rows) } #[tracing::instrument] - pub(crate) async fn read(&self) -> Result { - let mut tx = self.pool.begin().await?; + pub(crate) async fn read(&self) -> Result { + let mut tx = self + .pool + .begin() + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; let mut changeset = ChangeSet::default(); let sql = "SELECT n.name as network, @@ -113,7 +132,8 @@ impl PostgresStore { let row = sqlx::query(sql) .bind(&self.wallet_name) .fetch_optional(&mut *tx) - .await?; + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; if let Some(row) = row { Self::changeset_from_row(&mut tx, &mut changeset, row, &self.wallet_name).await?; @@ -128,7 +148,7 @@ impl PostgresStore { changeset: &mut ChangeSet, row: PgRow, wallet_name: &str, - ) -> Result<(), SqlxError> { + ) -> Result<(), StorageError> { tracing::info!("changeset from row"); let network: String = row.get("network"); @@ -140,7 +160,9 @@ impl PostgresStore { changeset.network = Some(Network::from_str(&network).expect("parse Network")); if let Some(desc_str) = external_desc_str { - let descriptor: Descriptor = desc_str.parse()?; + let descriptor: Descriptor = desc_str + .parse() + .map_err(|_| StorageError::Sqlx(SqlxError::Custom("parse descriptor".into())))?; let did = descriptor.descriptor_id(); changeset.descriptor = Some(descriptor); if let Some(last_rev) = external_last_revealed { @@ -149,7 +171,9 @@ impl PostgresStore { } if let Some(desc_str) = internal_desc_str { - let descriptor: Descriptor = desc_str.parse()?; + let descriptor: Descriptor = desc_str + .parse() + .map_err(|_| StorageError::Sqlx(SqlxError::Custom("parse descriptor".into())))?; let did = descriptor.descriptor_id(); changeset.change_descriptor = Some(descriptor); if let Some(last_rev) = internal_last_revealed { @@ -163,39 +187,56 @@ impl PostgresStore { } #[tracing::instrument] - pub(crate) async fn write(&self, changeset: &ChangeSet) -> Result<(), SqlxError> { + pub(crate) async fn write(&self, changeset: &ChangeSet) -> Result<(), StorageError> { tracing::info!("changeset write"); if changeset.is_empty() { return Ok(()); } let wallet_name = &self.wallet_name; - let mut tx = self.pool.begin().await?; + let mut tx = self + .pool + .begin() + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; if let Some(ref descriptor) = changeset.descriptor { - insert_descriptor(&mut tx, wallet_name, descriptor, External).await?; + insert_descriptor(&mut tx, wallet_name, descriptor, External) + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; } if let Some(ref change_descriptor) = changeset.change_descriptor { - insert_descriptor(&mut tx, wallet_name, change_descriptor, Internal).await?; + insert_descriptor(&mut tx, wallet_name, change_descriptor, Internal) + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; } if let Some(network) = changeset.network { - insert_network(&mut tx, wallet_name, network).await?; + insert_network(&mut tx, wallet_name, network) + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; } let last_revealed_indices = &changeset.indexer.last_revealed; if !last_revealed_indices.is_empty() { for (desc_id, index) in last_revealed_indices { - update_last_revealed(&mut tx, wallet_name, *desc_id, *index).await?; + update_last_revealed(&mut tx, wallet_name, *desc_id, *index) + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; } } local_chain_changeset_persist_to_postgres(&mut tx, wallet_name, &changeset.local_chain) - .await?; - tx_graph_changeset_persist_to_postgres(&mut tx, wallet_name, &changeset.tx_graph).await?; + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; + tx_graph_changeset_persist_to_postgres(&mut tx, wallet_name, &changeset.tx_graph) + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; - tx.commit().await?; + tx.commit() + .await + .map_err(|e| StorageError::Sqlx(e.into()))?; Ok(()) } diff --git a/ddk/src/storage/sqlx.rs b/ddk/src/storage/sqlx.rs index af7c1834..773f7873 100644 --- a/ddk/src/storage/sqlx.rs +++ b/ddk/src/storage/sqlx.rs @@ -24,9 +24,12 @@ pub enum SqlxError { /// miniscript error #[error("miniscript error: {0}")] Miniscript(#[from] bdk_chain::miniscript::Error), + #[error("Custom error: {0}")] + Custom(String), } #[derive(Debug, Clone, FromRow, Serialize, Deserialize)] +/// todo: add fee rate and funding txid pub struct ContractRow { pub id: String, pub state: i16, diff --git a/ddk/src/transport/lightning/mod.rs b/ddk/src/transport/lightning/mod.rs index 8867cc3f..69ab0001 100644 --- a/ddk/src/transport/lightning/mod.rs +++ b/ddk/src/transport/lightning/mod.rs @@ -1,4 +1,4 @@ -use crate::{DlcDevKitDlcManager, Oracle, Storage, Transport}; +use crate::{error::TransportError, DlcDevKitDlcManager, Oracle, Storage, Transport}; use async_trait::async_trait; use bitcoin::secp256k1::PublicKey; use lightning_net_tokio::connect_outbound; @@ -38,7 +38,7 @@ impl Transport for LightningTransport { &self, mut stop_signal: watch::Receiver, manager: Arc>, - ) -> Result<(), anyhow::Error> { + ) -> Result<(), TransportError> { let listen_handle = self.listen(stop_signal.clone()); let process_handle = self.process_messages(stop_signal.clone(), manager.clone()); @@ -46,8 +46,8 @@ impl Transport for LightningTransport { // Wait for either task to complete or stop signal tokio::select! { _ = stop_signal.changed() => Ok(()), - res = listen_handle => res?, - res = process_handle => res?, + res = listen_handle => res.map_err(|e| TransportError::Listen(e.to_string()))?, + res = process_handle => res.map_err(|e| TransportError::MessageProcessing(e.to_string()))?, } } diff --git a/ddk/src/transport/lightning/peer_manager.rs b/ddk/src/transport/lightning/peer_manager.rs index 170e4231..ee08e54c 100644 --- a/ddk/src/transport/lightning/peer_manager.rs +++ b/ddk/src/transport/lightning/peer_manager.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use bitcoin::{key::rand::Fill, secp256k1::PublicKey}; use dlc_messages::message_handler::MessageHandler as DlcMessageHandler; use lightning::{ @@ -16,7 +15,7 @@ use std::{ }; use tokio::{net::TcpListener, sync::watch, task::JoinHandle, time::interval}; -use crate::{ddk::DlcDevKitDlcManager, Oracle, Storage}; +use crate::{ddk::DlcDevKitDlcManager, error::TransportError, Oracle, Storage}; pub struct DlcDevKitLogger; @@ -58,12 +57,17 @@ pub struct LightningTransport { } impl LightningTransport { - pub fn new(seed_bytes: &[u8; 32], listening_port: u16) -> anyhow::Result { - let time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; + pub fn new( + seed_bytes: &[u8; 32], + listening_port: u16, + ) -> Result { + let time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| TransportError::Init(e.to_string()))?; let key_signer = KeysManager::new(seed_bytes, time.as_secs(), time.as_nanos() as u32); let node_id = key_signer .get_node_id(lightning::sign::Recipient::Node) - .map_err(|_| anyhow!("Could not get node id."))?; + .map_err(|_| TransportError::Init("Could not get node id.".to_string()))?; let dlc_message_handler = Arc::new(DlcMessageHandler::new()); let message_handler = MessageHandler { @@ -74,7 +78,9 @@ impl LightningTransport { }; let mut ephmeral_data = [0u8; 32]; - ephmeral_data.try_fill(&mut bitcoin::key::rand::thread_rng())?; + ephmeral_data + .try_fill(&mut bitcoin::key::rand::thread_rng()) + .map_err(|e| TransportError::Init(e.to_string()))?; Ok(LightningTransport { peer_manager: Arc::new(LnPeerManager::new( @@ -93,14 +99,14 @@ impl LightningTransport { pub fn listen( &self, stop_signal: watch::Receiver, - ) -> JoinHandle> { + ) -> JoinHandle> { let listening_port = self.listening_port; let mut listen_stop = stop_signal.clone(); let peer_manager = Arc::clone(&self.peer_manager); tokio::spawn(async move { let listener = TcpListener::bind(format!("0.0.0.0:{}", listening_port)) .await - .expect("Coldn't get port."); + .map_err(|e| TransportError::Listen(e.to_string()))?; tracing::info!( addr =? listener.local_addr().unwrap(), @@ -133,7 +139,7 @@ impl LightningTransport { } } } - Ok::<_, anyhow::Error>(()) + Ok::<_, TransportError>(()) }) } @@ -141,14 +147,13 @@ impl LightningTransport { &self, stop_signal: watch::Receiver, manager: Arc>, - ) -> JoinHandle> { + ) -> JoinHandle> { let mut message_stop = stop_signal.clone(); let message_manager = Arc::clone(&manager); let peer_manager = Arc::clone(&self.peer_manager); let message_handler = Arc::clone(&self.message_handler); tokio::spawn(async move { let mut message_interval = interval(Duration::from_secs(20)); - // let mut event_interval = interval(Duration::from_secs(2)); loop { tokio::select! { _ = message_stop.changed() => { @@ -195,7 +200,7 @@ impl LightningTransport { } } } - Ok::<_, anyhow::Error>(()) + Ok::<_, TransportError>(()) }) } } diff --git a/ddk/src/transport/memory.rs b/ddk/src/transport/memory.rs index cabd1297..da622851 100644 --- a/ddk/src/transport/memory.rs +++ b/ddk/src/transport/memory.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use crate::{ddk::DlcDevKitDlcManager, Oracle, Storage, Transport}; +use crate::{ddk::DlcDevKitDlcManager, error::TransportError, Oracle, Storage, Transport}; use bitcoin::{ key::{self, Keypair}, secp256k1::{All, PublicKey, Secp256k1}, @@ -66,7 +66,7 @@ impl Transport for MemoryTransport { &self, mut stop_receiver: watch::Receiver, manager: Arc>, - ) -> Result<(), anyhow::Error> { + ) -> Result<(), TransportError> { let mut timer = tokio::time::interval(Duration::from_secs(1)); loop { tokio::select! { diff --git a/ddk/src/transport/nostr/mod.rs b/ddk/src/transport/nostr/mod.rs index 0cafc5b6..319ecea0 100644 --- a/ddk/src/transport/nostr/mod.rs +++ b/ddk/src/transport/nostr/mod.rs @@ -3,6 +3,7 @@ mod relay_handler; pub use relay_handler::NostrDlc; use tokio::sync::watch; +use crate::error::TransportError; use crate::nostr; use crate::{DlcDevKitDlcManager, Oracle, Storage, Transport}; use async_trait::async_trait; @@ -25,13 +26,13 @@ impl Transport for NostrDlc { &self, mut stop_signal: watch::Receiver, manager: Arc>, - ) -> Result<(), anyhow::Error> { + ) -> Result<(), TransportError> { let listen_handle = self.start(stop_signal.clone(), manager); // Wait for either task to complete or stop signal tokio::select! { _ = stop_signal.changed() => Ok(()), - res = listen_handle => res?, + res = listen_handle => res.map_err(|e| TransportError::Listen(e.to_string()))?, } } /// Send a message to a specific counterparty. diff --git a/ddk/src/transport/nostr/relay_handler.rs b/ddk/src/transport/nostr/relay_handler.rs index 47471115..e093d162 100644 --- a/ddk/src/transport/nostr/relay_handler.rs +++ b/ddk/src/transport/nostr/relay_handler.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::error::TransportError; use crate::nostr::messages::{create_dlc_msg_event, handle_dlc_msg_event}; use crate::DlcDevKitDlcManager; use crate::{nostr, Transport}; @@ -22,15 +23,21 @@ impl NostrDlc { seed_bytes: &[u8; 32], relay_host: &str, network: Network, - ) -> anyhow::Result { + ) -> Result { tracing::info!("Creating Nostr Dlc handler."); let secp = Secp256k1::new(); - let seed = Xpriv::new_master(network, seed_bytes)?; + let seed = Xpriv::new_master(network, seed_bytes) + .map_err(|e| TransportError::Init(e.to_string()))?; let keys = Keys::new_with_ctx(&secp, seed.private_key.into()); - let relay_url = relay_host.parse()?; + let relay_url = relay_host + .parse() + .map_err(|_| TransportError::Init("Could not parse relay url.".to_string()))?; let client = Client::new(keys.clone()); - client.add_relay(&relay_url).await?; + client + .add_relay(&relay_url) + .await + .map_err(|e| TransportError::Init(e.to_string()))?; client.connect().await; Ok(NostrDlc { @@ -44,7 +51,7 @@ impl NostrDlc { &self, mut stop_signal: watch::Receiver, manager: Arc>, - ) -> JoinHandle> { + ) -> JoinHandle> { tracing::info!( pubkey = self.keys.public_key().to_string(), transport_public_key = self.public_key().to_string(), @@ -56,7 +63,10 @@ impl NostrDlc { let since = Timestamp::now(); let msg_subscription = nostr::messages::create_dlc_message_filter(since, keys.public_key()); - nostr_client.subscribe(msg_subscription, None).await?; + nostr_client + .subscribe(msg_subscription, None) + .await + .map_err(|e| TransportError::Listen(e.to_string()))?; tracing::info!( "Listening for messages on {}", keys.public_key().to_string() @@ -114,7 +124,7 @@ impl NostrDlc { } } } - Ok::<_, anyhow::Error>(()) + Ok::<_, TransportError>(()) }) } } diff --git a/ddk/src/wallet.rs b/ddk/src/wallet.rs index 3db2cfe9..530f1564 100644 --- a/ddk/src/wallet.rs +++ b/ddk/src/wallet.rs @@ -37,7 +37,8 @@ use std::{ }; use tokio::sync::Mutex; -type FutureResult<'a, T, E> = Pin> + Send + 'a>>; +type FutureResult<'a, T, E> = Pin> + Send + 'a>>; +type Result = std::result::Result; /// Wrapper type to pass `crate::Storage` to a BDK wallet. #[derive(Clone)] @@ -89,7 +90,7 @@ impl DlcDevKitWallet { esplora_url: &str, network: Network, storage: Arc, - ) -> Result { + ) -> Result { let secp = Secp256k1::new(); let xprv = Xpriv::new_master(network, seed_bytes)?; @@ -121,37 +122,13 @@ impl DlcDevKitWallet { let wallet = Arc::new(Mutex::new(internal_wallet)); - let blockchain = - Arc::new(EsploraClient::new(esplora_url, network).map_err(WalletError::Esplora)?); - - // TODO: Actually get fees. I don't think it's used for regular DLCs though - let mut fees: HashMap = HashMap::new(); - fees.insert(ConfirmationTarget::UrgentOnChainSweep, AtomicU32::new(5000)); - fees.insert( - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - AtomicU32::new(25 * 250), - ); - fees.insert( - ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, - AtomicU32::new(MIN_FEERATE), - ); - fees.insert( - ConfirmationTarget::AnchorChannelFee, - AtomicU32::new(MIN_FEERATE), + let blockchain = Arc::new( + EsploraClient::new(esplora_url, network) + .map_err(|e| WalletError::Esplora(e.to_string()))?, ); - fees.insert( - ConfirmationTarget::NonAnchorChannelFee, - AtomicU32::new(2000), - ); - fees.insert( - ConfirmationTarget::ChannelCloseMinimum, - AtomicU32::new(MIN_FEERATE), - ); - let fees = Arc::new(fees); + + // not used for regular DLCs. only for channels + let fees = Arc::new(fee_estimator()); Ok(DlcDevKitWallet { wallet, @@ -165,7 +142,7 @@ impl DlcDevKitWallet { }) } - pub async fn sync(&self) -> Result<(), WalletError> { + pub async fn sync(&self) -> Result<()> { let mut wallet = match self.wallet.try_lock() { Ok(w) => w, Err(e) => { @@ -192,7 +169,12 @@ impl DlcDevKitWallet { .spks_for_keychain(KeychainKind::External, spks.clone()) .chain_tip(prev_tip) .build(); - let sync = self.blockchain.async_client.full_scan(chain, 10, 1).await?; + let sync = self + .blockchain + .async_client + .full_scan(chain, 10, 1) + .await + .map_err(|e| WalletError::Esplora(e.to_string()))?; Update { last_active_indices: sync.last_active_indices, tx_update: sync.tx_update, @@ -203,7 +185,12 @@ impl DlcDevKitWallet { .start_sync_with_revealed_spks() .chain_tip(prev_tip) .build(); - let sync = self.blockchain.async_client.sync(spks, 1).await?; + let sync = self + .blockchain + .async_client + .sync(spks, 1) + .await + .map_err(|e| WalletError::Esplora(e.to_string()))?; let indices = wallet.derivation_index(KeychainKind::External).unwrap_or(0); let internal_index = wallet.derivation_index(KeychainKind::Internal).unwrap_or(0); let mut last_active_indices = BTreeMap::new(); @@ -228,7 +215,7 @@ impl DlcDevKitWallet { PublicKey::from_secret_key(&self.secp, &self.xprv.private_key) } - pub fn get_balance(&self) -> Result { + pub fn get_balance(&self) -> Result { let Ok(wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -236,7 +223,7 @@ impl DlcDevKitWallet { Ok(wallet.balance()) } - pub async fn new_external_address(&self) -> Result { + pub async fn new_external_address(&self) -> Result { let Ok(mut wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -247,7 +234,7 @@ impl DlcDevKitWallet { Ok(address) } - pub async fn new_change_address(&self) -> Result { + pub async fn new_change_address(&self) -> Result { let Ok(mut wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -263,7 +250,7 @@ impl DlcDevKitWallet { address: Address, amount: Amount, fee_rate: FeeRate, - ) -> Result { + ) -> Result { let Ok(mut wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -280,7 +267,7 @@ impl DlcDevKitWallet { .version(2) .fee_rate(fee_rate); - let mut psbt = txn_builder.finish().unwrap(); + let mut psbt = txn_builder.finish()?; wallet.sign(&mut psbt, SignOptions::default())?; @@ -291,7 +278,7 @@ impl DlcDevKitWallet { Ok(tx.compute_txid()) } - pub async fn send_all(&self, address: Address, fee_rate: FeeRate) -> Result { + pub async fn send_all(&self, address: Address, fee_rate: FeeRate) -> Result { let Ok(mut wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -314,7 +301,7 @@ impl DlcDevKitWallet { Ok(tx.compute_txid()) } - pub fn get_transactions(&self) -> Result>, WalletError> { + pub fn get_transactions(&self) -> Result>> { let Ok(wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -325,7 +312,7 @@ impl DlcDevKitWallet { .collect::>>()) } - pub fn list_utxos(&self) -> Result, WalletError> { + pub fn list_utxos(&self) -> Result> { let Ok(wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -333,7 +320,7 @@ impl DlcDevKitWallet { Ok(wallet.list_unspent().map(|utxo| utxo.to_owned()).collect()) } - fn next_derivation_index(&self) -> Result { + fn next_derivation_index(&self) -> Result { let Ok(wallet) = self.wallet.try_lock() else { tracing::error!("Could not get lock to sync wallet."); return Err(WalletError::Lock); @@ -373,7 +360,10 @@ impl ddk_manager::ContractSignerProvider for DlcDevKitWallet { hash.to_byte_array() } - fn derive_contract_signer(&self, key_id: [u8; 32]) -> Result { + fn derive_contract_signer( + &self, + key_id: [u8; 32], + ) -> std::result::Result { let child_key = SecretKey::from_slice(&key_id).expect("correct size"); tracing::info!( key_id = hex::encode(key_id), @@ -382,18 +372,21 @@ impl ddk_manager::ContractSignerProvider for DlcDevKitWallet { Ok(SimpleSigner::new(child_key)) } - fn get_secret_key_for_pubkey(&self, _pubkey: &PublicKey) -> Result { + fn get_secret_key_for_pubkey( + &self, + _pubkey: &PublicKey, + ) -> std::result::Result { unreachable!("get_secret_key_for_pubkey is only used in channels.") } - fn get_new_secret_key(&self) -> Result { + fn get_new_secret_key(&self) -> std::result::Result { unreachable!("get_new_secret_key is only used for channels") } } #[async_trait::async_trait] impl ddk_manager::Wallet for DlcDevKitWallet { - async fn get_new_address(&self) -> Result { + async fn get_new_address(&self) -> std::result::Result { let address = self .new_external_address() .await @@ -405,7 +398,7 @@ impl ddk_manager::Wallet for DlcDevKitWallet { Ok(address.address) } - async fn get_new_change_address(&self) -> Result { + async fn get_new_change_address(&self) -> std::result::Result { let address = self .new_change_address() .await @@ -422,7 +415,7 @@ impl ddk_manager::Wallet for DlcDevKitWallet { &self, psbt: &mut bitcoin::psbt::Psbt, input_index: usize, - ) -> Result<(), ManagerError> { + ) -> std::result::Result<(), ManagerError> { tracing::info!( input_index, inputs = psbt.inputs.len(), @@ -450,11 +443,14 @@ impl ddk_manager::Wallet for DlcDevKitWallet { } // BDK does not have reserving UTXOs nor need it. - fn unreserve_utxos(&self, _outpoints: &[bitcoin::OutPoint]) -> Result<(), ManagerError> { + fn unreserve_utxos( + &self, + _outpoints: &[bitcoin::OutPoint], + ) -> std::result::Result<(), ManagerError> { Ok(()) } - fn import_address(&self, _address: &bitcoin::Address) -> Result<(), ManagerError> { + fn import_address(&self, _address: &bitcoin::Address) -> std::result::Result<(), ManagerError> { Ok(()) } @@ -464,7 +460,7 @@ impl ddk_manager::Wallet for DlcDevKitWallet { amount: u64, fee_rate: u64, _lock_utxos: bool, - ) -> Result, ManagerError> { + ) -> std::result::Result, ManagerError> { let local_utxos = self.list_utxos().map_err(wallet_err_to_manager_err)?; let utxos = local_utxos @@ -507,6 +503,36 @@ impl ddk_manager::Wallet for DlcDevKitWallet { } } +fn fee_estimator() -> HashMap { + let mut fees: HashMap = HashMap::new(); + fees.insert(ConfirmationTarget::UrgentOnChainSweep, AtomicU32::new(5000)); + fees.insert( + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + AtomicU32::new(25 * 250), + ); + fees.insert( + ConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::AnchorChannelFee, + AtomicU32::new(MIN_FEERATE), + ); + fees.insert( + ConfirmationTarget::NonAnchorChannelFee, + AtomicU32::new(2000), + ); + fees.insert( + ConfirmationTarget::ChannelCloseMinimum, + AtomicU32::new(MIN_FEERATE), + ); + fees +} + #[cfg(test)] mod tests { use std::{str::FromStr, sync::Arc, time::Duration}; @@ -522,6 +548,7 @@ mod tests { use super::DlcDevKitWallet; async fn create_wallet() -> DlcDevKitWallet { + let esplora = std::env::var("ESPLORA_HOST").unwrap_or("http://localhost:30000".to_string()); let storage = Arc::new(MemoryStorage::new()); let mut entropy = [0u8; 64]; entropy @@ -531,7 +558,7 @@ mod tests { DlcDevKitWallet::new( "test".into(), &xpriv.private_key.secret_bytes(), - "http://localhost:30000", + &esplora, Network::Regtest, storage.clone(), ) @@ -541,8 +568,10 @@ mod tests { fn generate_blocks(num: u64) { tracing::warn!("Generating {} blocks.", num); + let bitcoind = + std::env::var("BITCOIND_HOST").unwrap_or("http://localhost:18443".to_string()); let auth = bitcoincore_rpc::Auth::UserPass("ddk".to_string(), "ddk".to_string()); - let client = bitcoincore_rpc::Client::new("http://127.0.0.1:18443", auth).unwrap(); + let client = bitcoincore_rpc::Client::new(&bitcoind, auth).unwrap(); let previous_height = client.get_block_count().unwrap(); let address = client.get_new_address(None, None).unwrap().assume_checked(); @@ -555,8 +584,10 @@ mod tests { } fn fund_address(address: &Address) { + let bitcoind = + std::env::var("BITCOIND_HOST").unwrap_or("http://localhost:18443".to_string()); let auth = bitcoincore_rpc::Auth::UserPass("ddk".to_string(), "ddk".to_string()); - let client = bitcoincore_rpc::Client::new("http://127.0.0.1:18443", auth).unwrap(); + let client = bitcoincore_rpc::Client::new(&bitcoind, auth).unwrap(); client .send_to_address( address, diff --git a/testconfig/nostr-relay.toml b/testconfig/nostr-relay.toml index b0b81d16..2ffb00a3 100644 --- a/testconfig/nostr-relay.toml +++ b/testconfig/nostr-relay.toml @@ -129,13 +129,13 @@ reject_future_seconds = 1800 # Limit the maximum size of an EVENT message. Defaults to 128 KB. # Set to 0 for unlimited. -#max_event_bytes = 131072 +max_event_bytes = 5283256 # Maximum WebSocket message in bytes. Defaults to 128 KB. -#max_ws_message_bytes = 131072 +max_ws_message_bytes = 5283256 # Maximum WebSocket frame size in bytes. Defaults to 128 KB. -#max_ws_frame_bytes = 131072 +max_ws_frame_bytes = 5283256 # Broadcast buffer size, in number of events. This prevents slow # readers from consuming memory. From ce1951db0371e33cf4daf87e8fd987067dcda451 Mon Sep 17 00:00:00 2001 From: bennyhodl Date: Wed, 14 May 2025 15:42:26 -0500 Subject: [PATCH 2/2] remove marketplace listener and anyhow crate --- Cargo.lock | 1 - ddk-node/Cargo.toml | 2 +- ddk-node/src/lib.rs | 12 +---- ddk/Cargo.toml | 3 -- ddk/src/ddk.rs | 13 ----- ddk/src/lib.rs | 14 +---- ddk/src/nostr/marketplace.rs | 60 --------------------- ddk/src/nostr/mod.rs | 10 ++-- ddk/src/storage/memory.rs | 28 ---------- ddk/src/storage/postgres/mod.rs | 30 +++-------- ddk/src/storage/sled/mod.rs | 58 +------------------- ddk/src/transport/lightning/peer_manager.rs | 17 ++++++ ddk/src/transport/mod.rs | 6 --- ddk/tests/test_util.rs | 23 ++++---- 14 files changed, 43 insertions(+), 234 deletions(-) delete mode 100644 ddk/src/nostr/marketplace.rs diff --git a/Cargo.lock b/Cargo.lock index 592dbff3..520130d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,7 +975,6 @@ checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" name = "ddk" version = "0.0.17" dependencies = [ - "anyhow", "async-trait", "base64 0.13.1", "bdk_chain", diff --git a/ddk-node/Cargo.toml b/ddk-node/Cargo.toml index 66de4e19..c86d18cf 100644 --- a/ddk-node/Cargo.toml +++ b/ddk-node/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/bennyhodl/dlcdevkit" edition = "2021" [dependencies] -ddk = { version = "0.0.17", path = "../ddk", features = ["marketplace", "postgres", "kormir", "nostr"] } +ddk = { version = "0.0.17", path = "../ddk", features = ["postgres", "kormir", "nostr"] } ddk-manager = { version = "0.7.5", path = "../ddk-manager", features = ["use-serde"] } ddk-payouts = { version = "0.0.16", path = "../payouts" } diff --git a/ddk-node/src/lib.rs b/ddk-node/src/lib.rs index 4fe04dd0..bd365c28 100644 --- a/ddk-node/src/lib.rs +++ b/ddk-node/src/lib.rs @@ -12,7 +12,7 @@ use ddk::storage::postgres::PostgresStore; use ddk::transport::nostr::NostrDlc; use ddk::util::ser::serialize_contract; use ddk::DlcDevKit; -use ddk::{Oracle, Storage, Transport}; +use ddk::{Oracle, Transport}; use ddk_manager::contract::contract_input::ContractInput; use ddk_manager::Oracle as DlcOracle; use ddk_manager::Storage as DlcStorage; @@ -362,15 +362,7 @@ impl DdkRpc for DdkNode { &self, _request: Request, ) -> Result, Status> { - let announcements: Vec> = self - .node - .storage - .get_marketplace_announcements() - // TODO: fails if no announcements - .unwrap() - .iter() - .map(|ann| serde_json::to_vec(ann).unwrap()) - .collect(); + let announcements: Vec> = vec![]; Ok(Response::new(OracleAnnouncementsResponse { announcements })) } diff --git a/ddk/Cargo.toml b/ddk/Cargo.toml index 9cf87c15..bb2ff8d3 100644 --- a/ddk/Cargo.toml +++ b/ddk/Cargo.toml @@ -10,8 +10,6 @@ homepage = "https://dlcdevkit.com" readme = "../README.md" [features] -marketplace = ["dep:nostr-rs", "dep:nostr-sdk", "dep:base64"] - # transport features nostr = ["dep:nostr-rs", "dep:nostr-sdk", "dep:base64"] lightning = ["dep:lightning-net-tokio"] @@ -40,7 +38,6 @@ bdk_esplora = { version = "0.20.1", features = ["blocking-https", "async-https", # bdk_wallet = { version = "1.0.0-beta.5", features = ["bdk_file_store"] } bdk_wallet = "1.0.0" bdk_chain = "0.21.1" -anyhow = "1.0.75" lightning = { version = "0.0.125", default-features = false, features = ["grind_signatures", "std"] } serde = { version = "1.0.192", features = ["derive"] } serde_json = "1.0.108" diff --git a/ddk/src/ddk.rs b/ddk/src/ddk.rs index c3d3cfcf..a1e66d20 100644 --- a/ddk/src/ddk.rs +++ b/ddk/src/ddk.rs @@ -1,8 +1,6 @@ use crate::chain::EsploraClient; use crate::error::Error; use crate::wallet::DlcDevKitWallet; -#[cfg(feature = "marketplace")] -use crate::{nostr::marketplace::*, DEFAULT_NOSTR_RELAY}; use crate::{Oracle, Storage, Transport}; use bitcoin::secp256k1::PublicKey; use bitcoin::{Amount, Network}; @@ -121,17 +119,6 @@ where } }); - #[cfg(feature = "marketplace")] - { - let storage_clone = self.storage.clone(); - runtime.spawn(async move { - tracing::info!("Starting marketplace listener."); - marketplace_listener(&storage_clone, vec![DEFAULT_NOSTR_RELAY]) - .await - .unwrap(); - }); - } - *runtime_lock = Some(runtime); Ok(()) } diff --git a/ddk/src/lib.rs b/ddk/src/lib.rs index 601dd082..511bc74c 100644 --- a/ddk/src/lib.rs +++ b/ddk/src/lib.rs @@ -12,7 +12,7 @@ pub mod error; /// JSON structs pub mod json; /// Nostr related functions. -#[cfg(any(feature = "nostr", feature = "marketplace"))] +#[cfg(feature = "nostr")] pub mod nostr; /// Oracle clients. pub mod oracle; @@ -38,13 +38,11 @@ use bdk_wallet::ChangeSet; use bitcoin::secp256k1::{PublicKey, SecretKey}; use bitcoin::Amount; use ddk::DlcDevKitDlcManager; -use dlc_messages::oracle_msgs::OracleAnnouncement; use dlc_messages::Message; use error::TransportError; use error::WalletError; use std::sync::Arc; use tokio::sync::watch; -use transport::PeerInformation; #[async_trait] /// Allows ddk to open a listening connection and send/receive dlc messages functionality. @@ -72,16 +70,6 @@ pub trait Storage: ddk_manager::Storage + Send + Sync + 'static { async fn initialize_bdk(&self) -> Result; /// Save changeset to the wallet storage. async fn persist_bdk(&self, changeset: &ChangeSet) -> Result<(), WalletError>; - /// Connected counterparties. - /// TODO: Remove - fn list_peers(&self) -> anyhow::Result>; - /// Persis counterparty. - /// TODO: Remove - fn save_peer(&self, peer: PeerInformation) -> anyhow::Result<()>; - // #[cfg(feature = "marketplace")] - fn save_announcement(&self, announcement: OracleAnnouncement) -> anyhow::Result<()>; - // #[cfg(feature = "marketplace")] - fn get_marketplace_announcements(&self) -> anyhow::Result>; } /// Retrieval of key material for signing DLC transactions diff --git a/ddk/src/nostr/marketplace.rs b/ddk/src/nostr/marketplace.rs deleted file mode 100644 index bc7d2513..00000000 --- a/ddk/src/nostr/marketplace.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::Storage; -use nostr_rs::Timestamp; -use nostr_sdk::{client::builder::ClientBuilder, Event, Kind, RelayPoolNotification}; -use std::ops::Deref; - -/// NIP-88 compliant oracle announcement listener. -/// -/// This process listens to the connected relays for oracles making oracle announcements. -/// Listener is passed with a storage object which then stores the announcement in storage -/// to be used later for creating an offer contract. -/// -/// The marketplace listener can be paired with `ddk::transport::NostDlc` to fetch announcements -/// and attestations from storage. -pub async fn marketplace_listener(storage: &S, relays: Vec<&str>) -> anyhow::Result<()> -where - S::Target: Storage, -{ - let client = ClientBuilder::new().build(); - for relay in relays { - client.add_relay(relay).await?; - } - client.connect().await; - let now = Timestamp::now(); - let oracle_filter = super::create_oracle_message_filter(now); - - client.subscribe(oracle_filter, None).await?; - - while let Ok(notification) = client.notifications().recv().await { - match notification { - RelayPoolNotification::Event { - relay_url: _, - subscription_id: _, - event, - } => handle_oracle_event(storage, *event), - RelayPoolNotification::Shutdown => { - tracing::error!("Relay disconnected.") - } - _ => (), - } - } - - Ok(()) -} - -fn handle_oracle_event(storage: &S, event: Event) -where - S::Target: Storage, -{ - match event.kind { - Kind::Custom(89) => { - tracing::info!("Oracle attestation. Saved to storage.") - } - Kind::Custom(88) => { - let announcement = super::oracle_announcement_from_str(&event.content).unwrap(); - storage.save_announcement(announcement).unwrap(); - tracing::info!("Oracle announcement. Saved to storage.") - } - _ => (), - } -} diff --git a/ddk/src/nostr/mod.rs b/ddk/src/nostr/mod.rs index e75417b8..6452c24b 100644 --- a/ddk/src/nostr/mod.rs +++ b/ddk/src/nostr/mod.rs @@ -1,3 +1,6 @@ +pub mod messages; + +use crate::error::Error; use bitcoin::key::Parity; use bitcoin::secp256k1::PublicKey as BitcoinPublicKey; use dlc_messages::oracle_msgs::{OracleAnnouncement, OracleAttestation}; @@ -5,13 +8,6 @@ use lightning::io::Cursor; use lightning::util::ser::Readable; use nostr_rs::{Filter, Kind, PublicKey, Timestamp}; -use crate::error::Error; - -/// Nostr [dlc_messages::oracle_msgs::OracleAnnouncement] marketplace. -#[cfg(feature = "marketplace")] -pub mod marketplace; -pub mod messages; - pub const DLC_MESSAGE_KIND: Kind = Kind::Custom(8_888); pub const ORACLE_ANNOUNCMENT_KIND: Kind = Kind::Custom(88); pub const ORACLE_ATTESTATION_KIND: Kind = Kind::Custom(89); diff --git a/ddk/src/storage/memory.rs b/ddk/src/storage/memory.rs index d8cf611a..5a5bb1a6 100644 --- a/ddk/src/storage/memory.rs +++ b/ddk/src/storage/memory.rs @@ -1,16 +1,12 @@ -use crate::transport::PeerInformation; use crate::Storage; use bdk_chain::Merge; use ddk_manager::{channel::Channel, contract::Contract, ChannelId, ContractId}; -use dlc_messages::oracle_msgs::OracleAnnouncement; use std::collections::HashMap; use std::sync::RwLock; #[derive(Default, Debug)] pub struct MemoryStorage { - peers: RwLock>, bdk_data: RwLock>, - announcements: RwLock>, contracts: RwLock>, channels: RwLock>, chain_monitor: RwLock>, @@ -19,9 +15,7 @@ pub struct MemoryStorage { impl MemoryStorage { pub fn new() -> Self { Self { - peers: RwLock::new(HashMap::new()), bdk_data: RwLock::new(None), - announcements: RwLock::new(Vec::new()), contracts: RwLock::new(HashMap::new()), channels: RwLock::new(HashMap::new()), chain_monitor: RwLock::new(None), @@ -31,19 +25,6 @@ impl MemoryStorage { #[async_trait::async_trait] impl Storage for MemoryStorage { - fn save_peer(&self, _peer: PeerInformation) -> anyhow::Result<()> { - // self.peers.write().unwrap().insert(peer.id.clone(), peer); - Ok(()) - } - - fn list_peers(&self) -> anyhow::Result> { - // Ok(self.peers.read().unwrap().values().cloned().collect()) - Ok(vec![PeerInformation { - pubkey: "".to_string(), - host: "".to_string(), - }]) - } - async fn persist_bdk( &self, changeset: &bdk_wallet::ChangeSet, @@ -57,15 +38,6 @@ impl Storage for MemoryStorage { async fn initialize_bdk(&self) -> Result { Ok(self.bdk_data.read().unwrap().clone().unwrap_or_default()) } - - fn save_announcement(&self, announcement: kormir::OracleAnnouncement) -> anyhow::Result<()> { - self.announcements.write().unwrap().push(announcement); - Ok(()) - } - - fn get_marketplace_announcements(&self) -> anyhow::Result> { - Ok(self.announcements.read().unwrap().clone()) - } } #[async_trait::async_trait] diff --git a/ddk/src/storage/postgres/mod.rs b/ddk/src/storage/postgres/mod.rs index 57811d23..b7a5eec6 100644 --- a/ddk/src/storage/postgres/mod.rs +++ b/ddk/src/storage/postgres/mod.rs @@ -2,7 +2,6 @@ use std::str::FromStr; use super::sqlx::{ContractRowNoBytes, SqlxError}; use crate::error::{StorageError, WalletError}; -use crate::transport::PeerInformation; use crate::Storage; use crate::{ error::to_storage_error, @@ -31,7 +30,6 @@ use ddk_manager::{ }, Storage as ManagerStorage, }; -use dlc_messages::oracle_msgs::OracleAnnouncement; use serde_json::json; use sqlx::pool::PoolOptions; use sqlx::postgres::PgRow; @@ -203,19 +201,19 @@ impl PostgresStore { if let Some(ref descriptor) = changeset.descriptor { insert_descriptor(&mut tx, wallet_name, descriptor, External) .await - .map_err(|e| StorageError::Sqlx(e.into()))?; + .map_err(StorageError::Sqlx)?; } if let Some(ref change_descriptor) = changeset.change_descriptor { insert_descriptor(&mut tx, wallet_name, change_descriptor, Internal) .await - .map_err(|e| StorageError::Sqlx(e.into()))?; + .map_err(StorageError::Sqlx)?; } if let Some(network) = changeset.network { insert_network(&mut tx, wallet_name, network) .await - .map_err(|e| StorageError::Sqlx(e.into()))?; + .map_err(StorageError::Sqlx)?; } let last_revealed_indices = &changeset.indexer.last_revealed; @@ -223,16 +221,16 @@ impl PostgresStore { for (desc_id, index) in last_revealed_indices { update_last_revealed(&mut tx, wallet_name, *desc_id, *index) .await - .map_err(|e| StorageError::Sqlx(e.into()))?; + .map_err(StorageError::Sqlx)?; } } local_chain_changeset_persist_to_postgres(&mut tx, wallet_name, &changeset.local_chain) .await - .map_err(|e| StorageError::Sqlx(e.into()))?; + .map_err(StorageError::Sqlx)?; tx_graph_changeset_persist_to_postgres(&mut tx, wallet_name, &changeset.tx_graph) .await - .map_err(|e| StorageError::Sqlx(e.into()))?; + .map_err(StorageError::Sqlx)?; tx.commit() .await @@ -258,22 +256,6 @@ impl Storage for PostgresStore { .await .map_err(|_| WalletError::StorageError("Did not persist bdk storage".to_string())) } - - fn list_peers(&self) -> anyhow::Result> { - unimplemented!("Not implemented to list peers") - } - - fn save_peer(&self, _peer: PeerInformation) -> anyhow::Result<()> { - unimplemented!("Not implemented to save peer") - } - - fn save_announcement(&self, _announcement: OracleAnnouncement) -> anyhow::Result<()> { - unimplemented!("Not implemented to save announcement") - } - - fn get_marketplace_announcements(&self) -> anyhow::Result> { - unimplemented!("Not implemented to get marketplace announcements") - } } #[async_trait::async_trait] diff --git a/ddk/src/storage/sled/mod.rs b/ddk/src/storage/sled/mod.rs index d2657678..a161d99b 100644 --- a/ddk/src/storage/sled/mod.rs +++ b/ddk/src/storage/sled/mod.rs @@ -5,18 +5,15 @@ mod contract; mod wallet; +use crate::error::WalletError; +use crate::Storage; use bdk_chain::Merge; use bdk_wallet::ChangeSet; use ddk_manager::contract::ser::Serializable; use ddk_manager::error::Error; -use dlc_messages::oracle_msgs::OracleAnnouncement; use lightning::io::{Cursor, Read}; use sled::{Db, Tree}; -use crate::error::WalletError; -use crate::transport::PeerInformation; -use crate::Storage; - const CONTRACT_TREE: u8 = 1; const CHANNEL_TREE: u8 = 2; pub const CHAIN_MONITOR_TREE: u8 = 3; @@ -130,57 +127,6 @@ impl Storage for SledStorage { }; Ok(changeset) } - - fn list_peers(&self) -> anyhow::Result> { - if let Some(bytes) = self.db.get(PEERS_KEY)? { - let peers: Vec<_> = serde_json::from_slice(&bytes)?; - Ok(peers) - } else { - Ok(vec![]) - } - } - - fn save_peer(&self, peer: PeerInformation) -> anyhow::Result<()> { - let mut known_peers = self.list_peers()?; - - if known_peers.contains(&peer) { - return Ok(()); - } - - known_peers.push(peer); - let peer_vec = serde_json::to_vec(&known_peers)?; - - self.db.insert("peers", peer_vec)?; - - Ok(()) - } - - fn save_announcement(&self, announcement: OracleAnnouncement) -> anyhow::Result<()> { - let marketplace = self.marketplace_tree().map_err(sled_to_wallet_error)?; - let stored_announcements: Vec = - match marketplace.get(MARKETPLACE_KEY)? { - Some(o) => serde_json::from_slice(&o)?, - None => vec![], - }; - let mut announcements = - crate::util::ser::filter_expired_oracle_announcements(stored_announcements); - announcements.push(announcement); - - let serialize_announcements = serde_json::to_vec(&announcements)?; - marketplace.insert(MARKETPLACE_KEY, serialize_announcements)?; - - Ok(()) - } - - fn get_marketplace_announcements(&self) -> anyhow::Result> { - let marketplace = self.marketplace_tree().map_err(sled_to_wallet_error)?; - let prev_announcements = match marketplace.get(MARKETPLACE_KEY)? { - Some(o) => o.to_vec(), - None => vec![], - }; - let announcements: Vec = serde_json::from_slice(&prev_announcements)?; - Ok(announcements) - } } fn sled_to_wallet_error(error: sled::Error) -> WalletError { diff --git a/ddk/src/transport/lightning/peer_manager.rs b/ddk/src/transport/lightning/peer_manager.rs index ee08e54c..ab1fe18c 100644 --- a/ddk/src/transport/lightning/peer_manager.rs +++ b/ddk/src/transport/lightning/peer_manager.rs @@ -43,6 +43,12 @@ pub type LnPeerManager = LdkPeerManager< Arc, >; +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] +pub struct PeerInformation { + pub pubkey: String, + pub host: String, +} + /// BOLT-8 LightningTransport to manage TCP connections to communicate /// DLC contracts with another party. pub struct LightningTransport { @@ -203,6 +209,17 @@ impl LightningTransport { Ok::<_, TransportError>(()) }) } + + pub fn list_peers(&self) -> Vec { + self.peer_manager + .list_peers() + .into_iter() + .map(|p| PeerInformation { + pubkey: p.counterparty_node_id.to_string(), + host: p.socket_address.unwrap().to_string(), + }) + .collect() + } } #[cfg(test)] diff --git a/ddk/src/transport/mod.rs b/ddk/src/transport/mod.rs index 51d7b88a..8ab69589 100644 --- a/ddk/src/transport/mod.rs +++ b/ddk/src/transport/mod.rs @@ -3,9 +3,3 @@ pub mod lightning; pub mod memory; #[cfg(feature = "nostr")] pub mod nostr; - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)] -pub struct PeerInformation { - pub pubkey: String, - pub host: String, -} diff --git a/ddk/tests/test_util.rs b/ddk/tests/test_util.rs index 5a229899..2880923b 100644 --- a/ddk/tests/test_util.rs +++ b/ddk/tests/test_util.rs @@ -153,23 +153,22 @@ pub async fn wait_for_offer_is_stored(contract_id: ContractId, storage: Arc anyhow::Result { +pub fn xprv_from_path(path: PathBuf, network: Network) -> Xpriv { let seed_path = path.join("seed.ddk"); - let seed = if Path::new(&seed_path).exists() { - let seed = std::fs::read(&seed_path)?; + if Path::new(&seed_path).exists() { + let seed = std::fs::read(&seed_path).unwrap(); let mut key = [0; 32]; key.copy_from_slice(&seed); - let xprv = Xpriv::new_master(network, &seed)?; + let xprv = Xpriv::new_master(network, &seed).unwrap(); xprv } else { - let mut file = File::create(&seed_path)?; + let mut file = File::create(&seed_path).unwrap(); let mut entropy = [0u8; 32]; - entropy.try_fill(&mut bitcoin::key::rand::thread_rng())?; - // let _mnemonic = Mnemonic::from_entropy(&entropy)?; - let xprv = Xpriv::new_master(network, &entropy)?; - file.write_all(&entropy)?; + entropy + .try_fill(&mut bitcoin::key::rand::thread_rng()) + .unwrap(); + let xprv = Xpriv::new_master(network, &entropy).unwrap(); + file.write_all(&entropy).unwrap(); xprv - }; - - Ok(seed) + } }