@@ -24,6 +24,17 @@ use crate::protocol;
2424use crate :: socks5;
2525use crate :: types:: * ;
2626
27+ /// Grouped listener callbacks passed to `block_listener_loop` and
28+ /// `handle_background_message` to avoid exceeding the argument limit.
29+ pub struct ListenerCallbacks {
30+ /// Callback fired when an unsolicited "tx" message arrives (mempool detection).
31+ pub on_mempool_tx_data : MempoolTxCallback ,
32+ /// Callback fired when an inv MSG_BLOCK arrives (new block mined).
33+ pub on_new_block : NewBlockCallback ,
34+ /// Callback fired when addr/addrv2 messages arrive with peer addresses.
35+ pub on_addr : AddrCallback ,
36+ }
37+
2738/// Maximum magic byte resyncs before declaring stream desync.
2839const MAX_RESYNCS : usize = 5 ;
2940/// Maximum bytes to scan for magic bytes during resync.
@@ -101,17 +112,17 @@ pub struct Peer {
101112 /// Wrapped in Arc<Mutex<>> so running block listeners share the same slot.
102113 /// When `set_on_mempool_tx_data` updates this, all running listeners see the change
103114 /// immediately — no need to restart listeners.
104- pub on_mempool_tx_data : Arc < Mutex < Option < Arc < dyn Fn ( Vec < u8 > ) + Send + Sync > > > > ,
115+ pub on_mempool_tx_data : MempoolTxCallback ,
105116
106117 /// Callback fired when an inv MSG_BLOCK arrives (new block mined).
107118 /// Signals the background sync loop to trigger an immediate sync instead
108119 /// of waiting for the 30s timer. Shared via Arc<Mutex<>> like on_mempool_tx_data.
109- pub on_new_block : Arc < Mutex < Option < Arc < dyn Fn ( ) + Send + Sync > > > > ,
120+ pub on_new_block : NewBlockCallback ,
110121
111122 /// Callback fired when addr/addrv2 messages arrive with peer addresses.
112123 /// Used for peer discovery — harvested addresses feed into PeerManager's
113124 /// known_addresses pool for future connections.
114- pub on_addr : Arc < Mutex < Option < Arc < dyn Fn ( Vec < ( String , u16 ) > ) + Send + Sync > > > > ,
125+ pub on_addr : AddrCallback ,
115126}
116127
117128impl Peer {
@@ -224,9 +235,11 @@ impl Peer {
224235 let cancel = CancellationToken :: new ( ) ;
225236 let cancel_clone = cancel. clone ( ) ;
226237 let peer_id = self . id . clone ( ) ;
227- let on_mempool_tx_data = self . on_mempool_tx_data . clone ( ) ; // shares the same Mutex slot
228- let on_new_block = self . on_new_block . clone ( ) ;
229- let on_addr = self . on_addr . clone ( ) ;
238+ let callbacks = ListenerCallbacks {
239+ on_mempool_tx_data : self . on_mempool_tx_data . clone ( ) , // shares the same Mutex slot
240+ on_new_block : self . on_new_block . clone ( ) ,
241+ on_addr : self . on_addr . clone ( ) ,
242+ } ;
230243
231244 let handle = tokio:: spawn ( async move {
232245 block_listener_loop (
@@ -235,9 +248,7 @@ impl Peer {
235248 dispatcher,
236249 cancel_clone,
237250 peer_id,
238- on_mempool_tx_data,
239- on_new_block,
240- on_addr,
251+ callbacks,
241252 )
242253 . await ;
243254 } ) ;
@@ -547,8 +558,8 @@ async fn do_handshake(
547558
548559/// Check if a protocol version is valid Zclassic (not Zcash).
549560fn is_valid_zclassic_version ( version : u32 ) -> bool {
550- ( version >= MIN_PEER_PROTOCOL_VERSION && version <= MAX_ZCLASSIC_PROTOCOL_VERSION )
551- || ( version >= ZCLASSIC_V2_MIN_VERSION && version <= ZCLASSIC_V2_MAX_VERSION )
561+ ( MIN_PEER_PROTOCOL_VERSION ..= MAX_ZCLASSIC_PROTOCOL_VERSION ) . contains ( & version )
562+ || ( ZCLASSIC_V2_MIN_VERSION ..= ZCLASSIC_V2_MAX_VERSION ) . contains ( & version )
552563}
553564
554565/// Configure TCP keepalive and no-delay on a stream.
@@ -613,7 +624,7 @@ pub async fn receive_message_timeout(
613624 tokio:: time:: timeout ( timeout, reader. read_exact ( & mut payload) )
614625 . await
615626 . map_err ( |_| NetworkError :: ResponseTimeout ) ?
616- . map_err ( |e| NetworkError :: Io ( e ) ) ?;
627+ . map_err ( NetworkError :: Io ) ?;
617628 }
618629
619630 // Verify checksum
@@ -682,9 +693,7 @@ async fn block_listener_loop(
682693 dispatcher : Arc < Mutex < Dispatcher > > ,
683694 cancel : CancellationToken ,
684695 peer_id : String ,
685- on_mempool_tx_data : Arc < Mutex < Option < Arc < dyn Fn ( Vec < u8 > ) + Send + Sync > > > > ,
686- on_new_block : Arc < Mutex < Option < Arc < dyn Fn ( ) + Send + Sync > > > > ,
687- on_addr : Arc < Mutex < Option < Arc < dyn Fn ( Vec < ( String , u16 ) > ) + Send + Sync > > > > ,
696+ callbacks : ListenerCallbacks ,
688697) {
689698 dispatcher. lock ( ) . unwrap ( ) . set_active ( true ) ;
690699
@@ -717,7 +726,7 @@ async fn block_listener_loop(
717726 eprintln!( "[ZipherX] Peer {} exceeded rate limit ({} unsolicited msgs/min), disconnecting" , peer_id, rl_count) ;
718727 break ;
719728 }
720- handle_background_message( & cmd, & payload, & writer, & peer_id, & on_mempool_tx_data , & on_new_block , & on_addr ) . await ;
729+ handle_background_message( & cmd, & payload, & writer, & peer_id, & callbacks ) . await ;
721730 }
722731 }
723732 Err ( e) => {
@@ -739,15 +748,14 @@ async fn block_listener_loop(
739748/// Event-driven mempool detection:
740749/// - "inv" MSG_TX → immediately send getdata back to the peer
741750/// - "tx" → fire `on_mempool_tx_data` callback with raw TX bytes
751+ ///
742752/// No separate task, no channel, no extra TCP connection.
743753async fn handle_background_message (
744754 command : & str ,
745755 payload : & [ u8 ] ,
746756 writer : & Arc < Mutex < Option < OwnedWriteHalf > > > ,
747757 _peer_id : & str ,
748- on_mempool_tx_data : & Arc < Mutex < Option < Arc < dyn Fn ( Vec < u8 > ) + Send + Sync > > > > ,
749- on_new_block : & Arc < Mutex < Option < Arc < dyn Fn ( ) + Send + Sync > > > > ,
750- on_addr : & Arc < Mutex < Option < Arc < dyn Fn ( Vec < ( String , u16 ) > ) + Send + Sync > > > > ,
758+ callbacks : & ListenerCallbacks ,
751759) {
752760 match command {
753761 // ── Keepalive ──
@@ -770,7 +778,7 @@ async fn handle_background_message(
770778 "[ZipherX] {}: inv MSG_BLOCK received — new block!" ,
771779 _peer_id
772780 ) ;
773- let cb = on_new_block. lock ( ) . unwrap ( ) . clone ( ) ;
781+ let cb = callbacks . on_new_block . lock ( ) . unwrap ( ) . clone ( ) ;
774782 if let Some ( cb) = cb {
775783 cb ( ) ;
776784 } else {
@@ -782,7 +790,7 @@ async fn handle_background_message(
782790 }
783791
784792 // MSG_TX: mempool detection — fetch TX for trial decryption
785- if on_mempool_tx_data. lock ( ) . unwrap ( ) . is_some ( ) {
793+ if callbacks . on_mempool_tx_data . lock ( ) . unwrap ( ) . is_some ( ) {
786794 let tx_items: Vec < crate :: types:: InvVector > = items
787795 . into_iter ( )
788796 . filter ( |item| item. inv_type == crate :: types:: InvType :: Tx )
@@ -797,15 +805,15 @@ async fn handle_background_message(
797805
798806 // ── Transaction data (mempool detection) ──
799807 "tx" => {
800- let cb = on_mempool_tx_data. lock ( ) . unwrap ( ) . clone ( ) ;
808+ let cb = callbacks . on_mempool_tx_data . lock ( ) . unwrap ( ) . clone ( ) ;
801809 if let Some ( cb) = cb {
802810 cb ( payload. to_vec ( ) ) ;
803811 }
804812 }
805813
806814 // ── Peer address discovery ──
807815 "addr" => {
808- let cb = on_addr. lock ( ) . unwrap ( ) . clone ( ) ;
816+ let cb = callbacks . on_addr . lock ( ) . unwrap ( ) . clone ( ) ;
809817 if let Some ( cb) = cb {
810818 if let Some ( addrs) = crate :: messages:: deserialize_addr ( payload) {
811819 let peers: Vec < ( String , u16 ) > = addrs
@@ -828,7 +836,7 @@ async fn handle_background_message(
828836 }
829837 }
830838 "addrv2" => {
831- let cb = on_addr. lock ( ) . unwrap ( ) . clone ( ) ;
839+ let cb = callbacks . on_addr . lock ( ) . unwrap ( ) . clone ( ) ;
832840 if let Some ( cb) = cb {
833841 if let Some ( addrs) = crate :: messages:: deserialize_addrv2 ( payload) {
834842 let peers: Vec < ( String , u16 ) > = addrs
@@ -953,7 +961,7 @@ mod tests {
953961 reader
954962 . read_exact ( & mut header_buf)
955963 . await
956- . map_err ( |e| NetworkError :: Io ( e ) ) ?;
964+ . map_err ( NetworkError :: Io ) ?;
957965
958966 let ( command, payload_len, checksum) = protocol:: parse_header ( & header_buf) ?;
959967 let len = payload_len as usize ;
@@ -962,7 +970,7 @@ mod tests {
962970 reader
963971 . read_exact ( & mut payload)
964972 . await
965- . map_err ( |e| NetworkError :: Io ( e ) ) ?;
973+ . map_err ( NetworkError :: Io ) ?;
966974 }
967975 assert ! ( protocol:: verify_checksum( & payload, & checksum) ) ;
968976 Ok ( ( command, payload) )
0 commit comments