diff --git a/crates/bcr-ebill-api/src/service/company_service.rs b/crates/bcr-ebill-api/src/service/company_service.rs index ce44aeb1..6b12594e 100644 --- a/crates/bcr-ebill-api/src/service/company_service.rs +++ b/crates/bcr-ebill-api/src/service/company_service.rs @@ -63,6 +63,7 @@ use bitcoin::base58; use log::{debug, error, info}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use tokio_with_wasm::alias as tokio; use uuid::Uuid; #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] @@ -1595,6 +1596,37 @@ impl CompanyServiceApi for CompanyService { .add_identity(id, &company_keys) .await?; + // Ensure all company signatories and the company itself are in our nostr contacts + // so that we can receive and decrypt messages from them. + for signatory in company.signatories.iter() { + if signatory.node_id != our_node_id { + self.transport_service + .contact_transport() + .ensure_nostr_contact(&signatory.node_id) + .await; + } + } + self.transport_service + .contact_transport() + .ensure_nostr_contact(id) + .await; + + // Process any historical bill invites that were sent to this company + // before we joined, so we can construct those bills in our store. + let transport = self.transport_service.clone(); + let company_id = id.clone(); + tokio::spawn(async move { + if let Err(e) = transport + .process_company_historical_bill_invites(&company_id) + .await + { + error!( + "Failed to process historical bill invites for company {}: {}", + company_id, e + ); + } + }); + // company block let previous_block = company_chain.get_latest_block(); let new_block = CompanyBlock::create_block_for_accept_signatory_invite( @@ -2975,6 +3007,7 @@ pub mod tests { #[tokio::test] async fn accept_company_invite_baseline() { + crate::tests::tests::init_test_cfg(); let ( mut storage, file_upload_store, @@ -3050,6 +3083,13 @@ pub mod tests { .expect_add_identity() .returning(|_, _| Ok(())) .times(1); + transport.expect_on_contact_transport(|t| { + t.expect_ensure_nostr_contact().returning(|_| ()).times(1); + }); + transport + .expect_process_company_historical_bill_invites() + .returning(|_| Ok(())) + .times(1); let service = get_service( storage, diff --git a/crates/bcr-ebill-api/src/service/file_reference_helper.rs b/crates/bcr-ebill-api/src/service/file_reference_helper.rs index df9505dd..0ce7fa02 100644 --- a/crates/bcr-ebill-api/src/service/file_reference_helper.rs +++ b/crates/bcr-ebill-api/src/service/file_reference_helper.rs @@ -365,6 +365,7 @@ mod tests { async fn sync_relays(&self) -> crate::service::transport_service::Result<()>; async fn retry_failed_syncs(&self) -> crate::service::transport_service::Result<()>; async fn add_identity(&self, node_id: &bcr_common::core::NodeId, keys: &bcr_ebill_core::protocol::crypto::BcrKeys) -> crate::service::transport_service::Result<()>; + async fn process_company_historical_bill_invites(&self, company_id: &bcr_common::core::NodeId) -> crate::service::transport_service::Result<()>; async fn publish_file_metadata(&self, node_id: &bcr_common::core::NodeId, plaintext_hash: &str, encrypted_hash: &str, server_urls: Vec, mime_type: Option) -> crate::service::transport_service::Result<()>; async fn query_file_metadata_events(&self, file_hash: &str, nostr_hash: &str) -> crate::service::transport_service::Result>; } diff --git a/crates/bcr-ebill-api/src/service/transport_service/transport.rs b/crates/bcr-ebill-api/src/service/transport_service/transport.rs index 381d977f..138e32af 100644 --- a/crates/bcr-ebill-api/src/service/transport_service/transport.rs +++ b/crates/bcr-ebill-api/src/service/transport_service/transport.rs @@ -132,6 +132,10 @@ pub trait TransportServiceApi: ServiceTraitBounds { /// This will also add a subscription for direct messages to this identity. async fn add_identity(&self, node_id: &NodeId, keys: &BcrKeys) -> Result<()>; + /// Queries historical private messages for a company identity and processes any + /// bill chain invites found, constructing the bills in the local store. + async fn process_company_historical_bill_invites(&self, company_id: &NodeId) -> Result<()>; + /// Publishes file metadata (kind:1063) for the specified file. /// This is idempotent - it will only publish if the server URLs have changed. async fn publish_file_metadata( diff --git a/crates/bcr-ebill-api/src/service/transport_service/transport_client.rs b/crates/bcr-ebill-api/src/service/transport_service/transport_client.rs index ce180b49..f8ec0efd 100644 --- a/crates/bcr-ebill-api/src/service/transport_service/transport_client.rs +++ b/crates/bcr-ebill-api/src/service/transport_service/transport_client.rs @@ -52,9 +52,19 @@ pub trait TransportClientApi: ServiceTraitBounds { ) -> Result>; /// Adds a new Nostr subscription on the primary client for an added contact async fn add_contact_subscription(&self, contact: &NodeId) -> Result<()>; - /// Resolves all private messages matching the filter + /// Resolves private messages matching the filter for all local identities. async fn resolve_private_events(&self, filter: Filter) -> Result>; + /// Resolves events matching the given filter. The filter is passed through as-is. + async fn resolve_events(&self, filter: Filter) -> Result>; + + /// Tries to decrypt a private direct message event with any of the local signers. + /// Returns the recipient NodeId, decrypted EventEnvelope, and sender public key if successful. + async fn try_decrypt_private_event( + &self, + event: &Event, + ) -> Result>; + /// Publishes the metadata (contact info) via the Nostr client for the specified identity async fn publish_metadata( &self, diff --git a/crates/bcr-ebill-transport/src/lib.rs b/crates/bcr-ebill-transport/src/lib.rs index a26453da..6f11c895 100644 --- a/crates/bcr-ebill-transport/src/lib.rs +++ b/crates/bcr-ebill-transport/src/lib.rs @@ -218,6 +218,7 @@ pub async fn create_transport_service( notification_transport, contact_transport, block_transport, + bill_invite_handler, ))) } diff --git a/crates/bcr-ebill-transport/src/nostr.rs b/crates/bcr-ebill-transport/src/nostr.rs index e13935a8..ee5b4d27 100644 --- a/crates/bcr-ebill-transport/src/nostr.rs +++ b/crates/bcr-ebill-transport/src/nostr.rs @@ -54,6 +54,7 @@ pub enum SortOrder { const BLOSSOM_SERVER_LIST_KIND: Kind = Kind::Custom(10063); const FILE_METADATA_KIND: Kind = Kind::Custom(1063); +const DM_BACKFILL_LIMIT: usize = 1000; /// Check the output of sending an event to Nostr relays. /// Logs warnings for individual relay failures and returns an error if no relay @@ -649,6 +650,32 @@ impl TransportClientApi for NostrClient { .await?) } + async fn resolve_events(&self, filter: Filter) -> Result> { + Ok(self + .fetch_events(filter, Some(SortOrder::Asc), None) + .await?) + } + + async fn try_decrypt_private_event( + &self, + event: &nostr::event::Event, + ) -> Result> { + match event.kind { + Kind::EncryptedDirectMessage | Kind::GiftWrap => { + let keys_to_try = prioritized_signers_for_event(event, &self.signers); + for (node_id, nostr_keys) in keys_to_try { + if let Some((envelope, sender, _, _)) = + unwrap_direct_message(event, &*nostr_keys).await + { + return Ok(Some((node_id, envelope, sender))); + } + } + Ok(None) + } + _ => Ok(None), + } + } + async fn publish_metadata(&self, node_id: &NodeId, data: &Metadata) -> Result<()> { // Get the signer for this identity let signer = self.get_signer(node_id)?; @@ -850,8 +877,17 @@ impl TransportClientApi for NostrClient { vec![Kind::GiftWrap] }; debug!("Adding subscription for direct messages to identity: {node_id}"); - self.subscribe(Filter::new().pubkey(node_id.npub()).kinds(kinds)) + self.subscribe( + Filter::new() + .pubkey(node_id.npub()) + .kinds(kinds) + .limit(DM_BACKFILL_LIMIT), + ) + .await?; + debug!("Adding subscription for public blocks messages from identity: {node_id}"); + self.subscribe(Filter::new().author(node_id.npub()).kind(Kind::TextNote)) .await?; + let relay_urls: Vec = self .relays .iter() @@ -1042,7 +1078,7 @@ impl NostrConsumer { Filter::new() .pubkeys(local_pubkeys.clone()) .kinds(vec![Kind::EncryptedDirectMessage, Kind::GiftWrap]) - .limit(1000), + .limit(DM_BACKFILL_LIMIT), ) .await .map_err(|e| { diff --git a/crates/bcr-ebill-transport/src/test_utils.rs b/crates/bcr-ebill-transport/src/test_utils.rs index cea7e18c..3dd36311 100644 --- a/crates/bcr-ebill-transport/src/test_utils.rs +++ b/crates/bcr-ebill-transport/src/test_utils.rs @@ -589,6 +589,8 @@ mockall::mock! { async fn resolve_public_chain(&self, id: &str, chain_type: BlockchainType) -> Result>; async fn add_contact_subscription(&self, contact: &NodeId) -> Result<()>; async fn resolve_private_events(&self, filter: nostr::Filter) -> Result>; + async fn resolve_events(&self, filter: nostr::Filter) -> Result>; + async fn try_decrypt_private_event(&self, event: &nostr::event::Event) -> Result>; async fn publish_metadata(&self, node_id: &NodeId, data: &nostr::nips::nip01::Metadata) -> Result<()>; async fn publish_relay_list(&self, node_id: &NodeId, relays: Vec) -> Result<()>; async fn publish_blossom_server_list(&self, node_id: &NodeId, blossom_servers: Vec) -> Result<()>; diff --git a/crates/bcr-ebill-transport/src/transport_service.rs b/crates/bcr-ebill-transport/src/transport_service.rs index 7adffcf9..2166750d 100644 --- a/crates/bcr-ebill-transport/src/transport_service.rs +++ b/crates/bcr-ebill-transport/src/transport_service.rs @@ -13,10 +13,12 @@ use bcr_ebill_core::protocol::blockchain::bill::participant::{ use bcr_ebill_core::protocol::crypto::BcrKeys; use bcr_ebill_core::protocol::event::{BillChainEvent, BillChainEventPayload, Event}; +use super::handler::NotificationHandlerApi; use super::nostr_transport::NostrTransportService; use bcr_ebill_api::service::transport_service::Result; use bcr_ebill_core::application::ServiceTraitBounds; -use bcr_ebill_core::protocol::event::{ActionType, BillEventType}; +use bcr_ebill_core::protocol::event::{ActionType, BillEventType, EventType}; +use log::{debug, error, info, warn}; use std::sync::Arc; pub struct TransportService { @@ -24,6 +26,7 @@ pub struct TransportService { notification_transport_service: Arc, contact_transport_service: Arc, block_transport_service: Arc, + bill_invite_handler: Arc, } impl TransportService { @@ -32,12 +35,14 @@ impl TransportService { notification_transport_service: Arc, contact_transport_service: Arc, block_transport_service: Arc, + bill_invite_handler: Arc, ) -> Self { Self { nostr_transport, notification_transport_service, contact_transport_service, block_transport_service, + bill_invite_handler, } } } @@ -339,6 +344,61 @@ impl TransportServiceApi for TransportService { .await } + async fn process_company_historical_bill_invites(&self, company_id: &NodeId) -> Result<()> { + let node = self.nostr_transport.get_first_transport(); + if !node.has_local_signer(company_id) { + warn!("Try to process_company_historical_bill_invites without signer in transport"); + return Ok(()); + }; + let events = node + .resolve_events( + nostr::Filter::new() + .pubkey(company_id.npub()) + .kinds(vec![ + nostr::Kind::EncryptedDirectMessage, + nostr::Kind::GiftWrap, + ]) + .since(nostr::types::Timestamp::zero()), + ) + .await?; + info!( + "found {} private events for company {} historical bill invite processing", + events.len(), + company_id + ); + + for event in events { + match node.try_decrypt_private_event(&event).await { + Ok(Some((_recipient_id, envelope, sender))) => { + if envelope.event_type == EventType::BillChainInvite + && let Err(e) = self + .bill_invite_handler + .handle_event(envelope, company_id, Some(sender), Some(Box::new(event))) + .await + { + error!( + "Failed to process historical bill invite for company {}: {}", + company_id, e + ); + } + } + Ok(None) => { + debug!( + "Could not decrypt event {} for company {}", + event.id, company_id + ); + } + Err(e) => { + error!( + "Error decrypting event {} for company {}: {}", + event.id, company_id, e + ); + } + } + } + Ok(()) + } + async fn publish_file_metadata( &self, node_id: &NodeId, @@ -406,7 +466,9 @@ mod tests { node_id_test_other2, private_key_test, valid_payment_address_testnet, }; - use super::super::test_utils::{get_identity_public_data, get_test_bitcredit_bill}; + use super::super::test_utils::{ + MockNotificationHandler, get_identity_public_data, get_test_bitcredit_bill, + }; use super::*; fn check_chain_payload(event: &EventEnvelope, bill_event_type: BillEventType) -> bool { @@ -462,6 +524,30 @@ mod tests { mock_notification_transport: MockNotificationTransportService, mock_contact_transport: MockContactTransportService, mock_block_transport: MockBlockTransportService, + ) -> TransportService { + get_transport_with_handler( + mock_transport, + contact_store, + nostr_contact_store, + queued_message_store, + chain_events, + mock_notification_transport, + mock_contact_transport, + mock_block_transport, + Arc::new(MockNotificationHandler::new()), + ) + } + + fn get_transport_with_handler( + mock_transport: MockNotificationJsonTransport, + contact_store: MockContactStore, + nostr_contact_store: MockNostrContactStore, + queued_message_store: MockNostrQueuedMessageStore, + chain_events: MockNostrChainEventStore, + mock_notification_transport: MockNotificationTransportService, + mock_contact_transport: MockContactTransportService, + mock_block_transport: MockBlockTransportService, + handler: Arc, ) -> TransportService { TransportService::new( Arc::new(get_nostr_transport( @@ -474,6 +560,7 @@ mod tests { Arc::new(mock_notification_transport), Arc::new(mock_contact_transport), Arc::new(mock_block_transport), + handler, ) } @@ -2496,4 +2583,161 @@ mod tests { let result = service.send_retry_messages().await; assert!(result.is_ok()); } + + #[tokio::test] + async fn test_process_company_historical_bill_invites_success() { + init_test_cfg(); + let company_id = node_id_test(); + let sender_npub = nostr::PublicKey::from_hex( + "22886f449bec154764401cfb139b80f108a39a91c7e7609f9ffd8a4592b86d38", + ) + .unwrap(); + + let mut mock_transport = MockNotificationJsonTransport::new(); + mock_transport + .expect_has_local_signer() + .with(eq(company_id.clone())) + .returning(|_| true); + + let event = nostr::event::EventBuilder::text_note("test") + .sign_with_keys(&nostr::key::Keys::generate()) + .expect(" Could not create test event"); + + let invite = ChainInvite::bill(bill_id_test().to_string(), BcrKeys::new()); + let envelope: EventEnvelope = Event::new(EventType::BillChainInvite, invite.clone()) + .try_into() + .unwrap(); + + let company_id_for_decrypt = company_id.clone(); + mock_transport + .expect_resolve_events() + .returning(move |_| Ok(vec![event.clone()])); + mock_transport + .expect_try_decrypt_private_event() + .returning(move |_| { + Ok(Some(( + company_id_for_decrypt.clone(), + envelope.clone(), + sender_npub, + ))) + }); + + let mut handler = MockNotificationHandler::new(); + let company_id_for_handler = company_id.clone(); + handler + .expect_handle_event() + .withf(move |env, node_id, sender, _original_event| { + env.event_type == EventType::BillChainInvite + && *node_id == company_id_for_handler + && sender.is_some() + }) + .returning(|_, _, _, _| Ok(())) + .times(1); + + let service = get_transport_with_handler( + mock_transport, + MockContactStore::new(), + MockNostrContactStore::new(), + MockNostrQueuedMessageStore::new(), + MockNostrChainEventStore::new(), + MockNotificationTransportService::new(), + MockContactTransportService::new(), + MockBlockTransportService::new(), + Arc::new(handler), + ); + + let result = service + .process_company_historical_bill_invites(&company_id) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_process_company_historical_bill_invites_skips_non_invite() { + init_test_cfg(); + let company_id = node_id_test(); + let sender_npub = nostr::PublicKey::from_hex( + "22886f449bec154764401cfb139b80f108a39a91c7e7609f9ffd8a4592b86d38", + ) + .unwrap(); + + let mut mock_transport = MockNotificationJsonTransport::new(); + mock_transport + .expect_has_local_signer() + .with(eq(company_id.clone())) + .returning(|_| true); + + let event = nostr::event::EventBuilder::text_note("test") + .sign_with_keys(&nostr::key::Keys::generate()) + .expect(" Could not create test event"); + + let non_invite = ChainInvite::company(node_id_test().to_string(), BcrKeys::new()); + let envelope: EventEnvelope = Event::new(EventType::CompanyChainInvite, non_invite.clone()) + .try_into() + .unwrap(); + + let company_id_for_decrypt = company_id.clone(); + mock_transport + .expect_resolve_events() + .returning(move |_| Ok(vec![event.clone()])); + mock_transport + .expect_try_decrypt_private_event() + .returning(move |_| { + Ok(Some(( + company_id_for_decrypt.clone(), + envelope.clone(), + sender_npub, + ))) + }); + + let mut handler = MockNotificationHandler::new(); + handler.expect_handle_event().times(0); + + let service = get_transport_with_handler( + mock_transport, + MockContactStore::new(), + MockNostrContactStore::new(), + MockNostrQueuedMessageStore::new(), + MockNostrChainEventStore::new(), + MockNotificationTransportService::new(), + MockContactTransportService::new(), + MockBlockTransportService::new(), + Arc::new(handler), + ); + + let result = service + .process_company_historical_bill_invites(&company_id) + .await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_process_company_historical_bill_invites_no_signer() { + init_test_cfg(); + let company_id = node_id_test(); + + let mut mock_transport = MockNotificationJsonTransport::new(); + mock_transport + .expect_has_local_signer() + .with(eq(company_id.clone())) + .returning(|_| false); + mock_transport.expect_resolve_events().times(0); + + let service = get_transport_with_handler( + mock_transport, + MockContactStore::new(), + MockNostrContactStore::new(), + MockNostrQueuedMessageStore::new(), + MockNostrChainEventStore::new(), + MockNotificationTransportService::new(), + MockContactTransportService::new(), + MockBlockTransportService::new(), + Arc::new(MockNotificationHandler::new()), + ); + + let result = service + .process_company_historical_bill_invites(&company_id) + .await; + assert!(result.is_ok()); + } }