diff --git a/crates/floresta-wire/src/p2p_wire/error.rs b/crates/floresta-wire/src/p2p_wire/error.rs index 1d05ba395..f9145fcb8 100644 --- a/crates/floresta-wire/src/p2p_wire/error.rs +++ b/crates/floresta-wire/src/p2p_wire/error.rs @@ -90,6 +90,9 @@ pub enum WireError { /// Couldn't find the leaf data for a block LeafDataNotFound, + + /// Exceeded the max number of outbound peers + OutboundPeersExceeded, } impl Display for WireError { @@ -134,6 +137,9 @@ impl Display for WireError { "We tried to work on a block that we don't have a proof for yet" ), WireError::LeafDataNotFound => write!(f, "Couldn't find the leaf data for a block"), + WireError::OutboundPeersExceeded => { + write!(f, "Exceeded the max number of outbound peers") + } } } } diff --git a/crates/floresta-wire/src/p2p_wire/node/conn.rs b/crates/floresta-wire/src/p2p_wire/node/conn.rs index 901896cb8..b75f7465e 100644 --- a/crates/floresta-wire/src/p2p_wire/node/conn.rs +++ b/crates/floresta-wire/src/p2p_wire/node/conn.rs @@ -80,7 +80,8 @@ where // Get the peer's `ServiceFlags`. let required_services = match conn_kind { - ConnectionKind::Regular(services) => services, + ConnectionKind::OutboundFullRelay(services) + | ConnectionKind::BlockRelayOnly(services) => services, _ => ServiceFlags::NONE, }; @@ -157,7 +158,7 @@ where /// /// `kind` may or may not be a [`ConnectionKind::Feeler`], a special connection type /// that is used to learn about good peers, but are not kept after handshake - /// (others are [`ConnectionKind::Regular`], [`ConnectionKind::Manual`] and [`ConnectionKind::Extra`]). + /// (others are [`ConnectionKind::OutboundFullRelay`], [`ConnectionKind::Manual`] and [`ConnectionKind::Extra`]). /// /// We will always try to open a V2 connection first. If the `allow_v1_fallback` is set, /// we may retry the connection with the old V1 protocol if the V2 connection fails. @@ -242,7 +243,9 @@ where match kind { ConnectionKind::Feeler => self.last_feeler = Instant::now(), - ConnectionKind::Regular(_) => self.last_connection = Instant::now(), + ConnectionKind::OutboundFullRelay(_) | ConnectionKind::BlockRelayOnly(_) => { + self.last_connection = Instant::now() + } // Note: Creating a manual peer intentionally doesn't affect the `last_connection` // timer, since they don't necessarily follow our connection logic, and we may still // need more utreexo/CBS peers @@ -585,7 +588,7 @@ where for address in anchors { self.open_connection( - ConnectionKind::Regular(service_flags::UTREEXO.into()), + ConnectionKind::OutboundFullRelay(service_flags::UTREEXO.into()), address.id, address, // Using V1 transport fallback as utreexo nodes have limited support @@ -608,7 +611,23 @@ where return Ok(()); } - let connection_kind = ConnectionKind::Regular(required_service); + let full_relay_count = self + .peers + .values() + .filter(|p| matches!(p.kind, ConnectionKind::OutboundFullRelay(_))) + .count(); + let block_relay_count = self + .peers + .values() + .filter(|p| matches!(p.kind, ConnectionKind::BlockRelayOnly(_))) + .count(); + let connection_kind = if full_relay_count < T::MAX_FULL_RELAY_PEERS { + ConnectionKind::OutboundFullRelay(required_service) + } else if block_relay_count < T::MAX_BLOCKS_ONLY_PEERS { + ConnectionKind::BlockRelayOnly(required_service) + } else { + return Ok(()); + }; // If the user passes in a `--connect` cli argument, we only connect with // that particular peer. @@ -636,6 +655,12 @@ where if self.added_peers.is_empty() { return Ok(()); } + + let connected_manual = self.peers.values().filter(|p| p.is_manual_peer()).count(); + if connected_manual >= T::MAX_MANUAL_PEERS { + return Ok(()); + } + let peers_count = self.peer_id_count; for added_peer in self.added_peers.clone() { let matching_peer = self.peers.values().find(|peer| { diff --git a/crates/floresta-wire/src/p2p_wire/node/mod.rs b/crates/floresta-wire/src/p2p_wire/node/mod.rs index ee1bb0db2..87ed6a62f 100644 --- a/crates/floresta-wire/src/p2p_wire/node/mod.rs +++ b/crates/floresta-wire/src/p2p_wire/node/mod.rs @@ -138,14 +138,23 @@ pub(crate) enum InflightRequests { /// /// Core's counterpart: . pub enum ConnectionKind { + /// A regular outbound connection that relays transactions and blocks. + OutboundFullRelay(ServiceFlags), + + /// An outbound connection that only relays blocks, no transactions. + BlockRelayOnly(ServiceFlags), + + /// A connection manually requested by the user, exempt from banning and service requirements. + Manual, + /// A feeler connection is a short-lived connection used to check whether this peer is alive. /// /// After handshake, we ask for addresses and when we receive an answer we just disconnect, /// marking this peer as alive in our address manager. Feeler, - /// A regular peer, used to send requests to and learn about transactions and blocks. - Regular(ServiceFlags), + /// A short-lived connection used to solicit addresses from peers. + AddrFetch, /// An extra peer specially created if our tip hasn't moved for too long. /// @@ -153,11 +162,6 @@ pub enum ConnectionKind { /// last processed block, we use this to make sure we are not in a partitioned subnet, /// unable to learn about new blocks. Extra, - - /// A connection that was manually requested by our user. This type of peer won't be banned on - /// misbehaving, and won't respect the [`ServiceFlags`] requirements when creating a - /// connection. - Manual, } impl Serialize for ConnectionKind { @@ -166,10 +170,13 @@ impl Serialize for ConnectionKind { S: serde::Serializer, { match self { - ConnectionKind::Feeler => serializer.serialize_str("feeler"), - ConnectionKind::Regular(_) => serializer.serialize_str("regular"), - ConnectionKind::Extra => serializer.serialize_str("extra"), + ConnectionKind::OutboundFullRelay(_) => serializer.serialize_str("outbound-full-relay"), + ConnectionKind::BlockRelayOnly(_) | ConnectionKind::Extra => { + serializer.serialize_str("block-relay-only") + } ConnectionKind::Manual => serializer.serialize_str("manual"), + ConnectionKind::Feeler => serializer.serialize_str("feeler"), + ConnectionKind::AddrFetch => serializer.serialize_str("addr-fetch"), } } } @@ -233,9 +240,12 @@ impl LocalPeerView { matches!(self.kind, ConnectionKind::Manual) } - /// Whether this is a regular peer + /// Whether this is a regular outbound peer pub(crate) const fn is_regular_peer(&self) -> bool { - matches!(self.kind, ConnectionKind::Regular(_)) + matches!( + self.kind, + ConnectionKind::OutboundFullRelay(_) | ConnectionKind::BlockRelayOnly(_) + ) } // Connections expected to remain open if the peer doesn't die diff --git a/crates/floresta-wire/src/p2p_wire/node/peer_man.rs b/crates/floresta-wire/src/p2p_wire/node/peer_man.rs index a8902cade..bfe78962b 100644 --- a/crates/floresta-wire/src/p2p_wire/node/peer_man.rs +++ b/crates/floresta-wire/src/p2p_wire/node/peer_man.rs @@ -230,7 +230,7 @@ where let good_peers_count = self.connected_peers(); if good_peers_count > T::MAX_OUTGOING_PEERS { // We allow utreexo, extra and manual peers to bypass our connection limits - let is_utreexo_peer = matches!(version.kind, ConnectionKind::Regular(services) if services.has(service_flags::UTREEXO.into())); + let is_utreexo_peer = matches!(version.kind, ConnectionKind::OutboundFullRelay(services) if services.has(service_flags::UTREEXO.into())); let is_manual_peer = version.kind == ConnectionKind::Manual; let is_extra = version.kind == ConnectionKind::Extra; @@ -286,10 +286,13 @@ where peer_data.transport_protocol = version.transport_protocol; // If this peer doesn't have basic services, we disconnect it - if let ConnectionKind::Regular(needs) = version.kind { + if let ConnectionKind::OutboundFullRelay(needs) + | ConnectionKind::BlockRelayOnly(needs) = version.kind + { if !Self::is_peer_good(peer_data, needs) { info!( - "Disconnecting peer {peer} for not having the required services. has={} needs={}", peer_data.services, needs + "Disconnecting peer {peer} for not having the required services. has={} needs={}", + peer_data.services, needs ); peer_data.channel.send(NodeRequest::Shutdown)?; self.address_man.update_set_state( diff --git a/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs index 30d73d065..82a660614 100644 --- a/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/running_ctx.rs @@ -570,12 +570,13 @@ where /// This function checks how many time has passed since our last tip update, if it's /// been more than 15 minutes, try to update it. fn check_for_stale_tip(&mut self) -> Result<(), WireError> { - warn!("Potential stale tip detected, trying extra peers"); + warn!("Potential stale tip detected, will try using extra outbound peer"); // this catches an edge-case where all our utreexo peers are gone, and the GetData // times-out. That yields an error, but doesn't ask the block again. Our last_block_request // will be pointing to a block that will never arrive, so we basically deadlock. - self.last_block_request = self.chain.get_validation_index().unwrap(); + self.last_block_request = self.chain.get_validation_index()?; + // update this or we'll get this warning every second after 15 minutes without a block, // until we get a new block. self.last_tip_update = Instant::now(); @@ -681,7 +682,11 @@ where ); self.inflight.remove(&InflightRequests::Headers); - let peer_info = self.peers.get(&peer).cloned().expect("Peer not found"); + let peer_info = match self.peers.get(&peer).cloned() { + Some(peer) => peer, + None => return Err(WireError::PeerNotFound), + }; + let is_extra = matches!(peer_info.kind, ConnectionKind::Extra); if is_extra { @@ -691,25 +696,56 @@ where return Ok(()); } - // this peer got us a new block, we should disconnect one of our regular peers - // and keep this one. - let peer_to_disconnect = self + // this peer got us a new block, we should disconnect one + // of our OutboundFullRelay/BlockRelayOnly peers + let full_relay_count = self .peers .iter() - // Don't disconnect manual connections - .filter(|(_, info)| info.is_regular_peer()) - .min_by_key(|(k, _)| self.get_peer_score(**k)) - .map(|(peer, _)| *peer); + .filter(|(_, info)| { + matches!(info.kind, ConnectionKind::OutboundFullRelay(_)) + }) + .count(); - // disconnect the peer with the lowest score - if let Some(peer) = peer_to_disconnect { - self.send_to_peer(peer, NodeRequest::Shutdown)?; - } + let wants_full = + full_relay_count < RunningNode::MAX_FULL_RELAY_PEERS; - // update the peer info - self.peers.entry(peer).and_modify(|info| { - info.kind = ConnectionKind::Regular(peer_info.services); - }); + match self + .peers + .iter() + .filter(|(_, info)| { + if wants_full { + matches!(info.kind, ConnectionKind::OutboundFullRelay(_)) + } else { + matches!(info.kind, ConnectionKind::BlockRelayOnly(_)) + } + }) + .min_by_key(|(k, _)| self.get_peer_score(**k)) + .map(|(peer, _)| *peer) + { + Some(peer_to_disconnect) => { + self.send_to_peer(peer_to_disconnect, NodeRequest::Shutdown)?; + + self.peers.entry(peer).and_modify(|info| { + info.kind = if wants_full { + ConnectionKind::OutboundFullRelay(peer_info.services) + } else { + ConnectionKind::BlockRelayOnly(peer_info.services) + } + }); + } + None => { + // No peer of the target type to displace; promote anyway + // since the Extra peer already holds a connection slot. + warn!("No peer to displace for extra peer promotion, promoting in place"); + self.peers.entry(peer).and_modify(|info| { + info.kind = if wants_full { + ConnectionKind::OutboundFullRelay(peer_info.services) + } else { + ConnectionKind::BlockRelayOnly(peer_info.services) + } + }); + } + } } for header in headers.iter() { diff --git a/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs b/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs index b2df2d24d..9ba48b0a0 100644 --- a/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs +++ b/crates/floresta-wire/src/p2p_wire/node/sync_ctx.rs @@ -51,6 +51,10 @@ impl NodeContext for SyncNode { const REQUEST_TIMEOUT: u64 = 60 * 2; // 2 minutes const MAX_INFLIGHT_REQUESTS: usize = 100; // double the default + // During IBD we only need block data, no tx relay + const MAX_FULL_RELAY_PEERS: usize = 0; + const MAX_BLOCKS_ONLY_PEERS: usize = 10; + // A more conservative value than the default of 1 second, since we'll have many peer messages const MAINTENANCE_TICK: Duration = Duration::from_secs(5); } diff --git a/crates/floresta-wire/src/p2p_wire/node_context.rs b/crates/floresta-wire/src/p2p_wire/node_context.rs index cfc491195..e7de9cc18 100644 --- a/crates/floresta-wire/src/p2p_wire/node_context.rs +++ b/crates/floresta-wire/src/p2p_wire/node_context.rs @@ -39,6 +39,15 @@ pub trait NodeContext { /// Max number of simultaneous connections we initiates we are willing to hold const MAX_OUTGOING_PEERS: usize = 10; + /// Max number of outbound-full-relay connections we are willing to hold + const MAX_FULL_RELAY_PEERS: usize = 8; + + /// Max number of block-relay-only connections we are willing to hold + const MAX_BLOCKS_ONLY_PEERS: usize = 2; + + /// Max number of manual (addnode) connections we are willing to hold + const MAX_MANUAL_PEERS: usize = 8; + /// We ask for peers every ASK_FOR_PEERS_INTERVAL seconds const ASK_FOR_PEERS_INTERVAL: u64 = 60 * 60; // One hour diff --git a/crates/floresta-wire/src/p2p_wire/tests/utils.rs b/crates/floresta-wire/src/p2p_wire/tests/utils.rs index cf0773400..485cd5fed 100644 --- a/crates/floresta-wire/src/p2p_wire/tests/utils.rs +++ b/crates/floresta-wire/src/p2p_wire/tests/utils.rs @@ -82,7 +82,7 @@ impl SimulatedPeer { | service_flags::UTREEXO_ARCHIVE.into() | ServiceFlags::WITNESS | ServiceFlags::COMPACT_FILTERS, - kind: ConnectionKind::Regular(service_flags::UTREEXO.into()), + kind: ConnectionKind::OutboundFullRelay(service_flags::UTREEXO.into()), transport_protocol: TransportProtocol::V2, }; @@ -182,7 +182,7 @@ pub fn create_peer( state: PeerStatus::Ready, channel: sender, port: 8333, - kind: ConnectionKind::Regular(service_flags::UTREEXO.into()), + kind: ConnectionKind::OutboundFullRelay(service_flags::UTREEXO.into()), banscore: 0, address_id: 0, _last_message: Instant::now(),