diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 0f9d2fac2..483b63ecf 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -67,6 +67,10 @@ pub const INBOUND_PEER_LIMIT_MULTIPLIER: usize = 5; /// See [`INBOUND_PEER_LIMIT_MULTIPLIER`] for details. pub const OUTBOUND_PEER_LIMIT_MULTIPLIER: usize = 3; +/// The maximum number of peer connections Zebra will keep for a given IP address +/// before it drops any additional peer connections with that IP. +pub const MAX_CONNS_PER_IP: usize = 3; + /// The buffer size for the peer set. /// /// This should be greater than 1 to avoid sender contention, but also reasonably diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 2d01437af..de27c8d37 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -179,6 +179,7 @@ where inv_receiver, address_metrics, MinimumPeerVersion::new(latest_chain_tip, config.network), + None, ); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 0353d377f..0f52a067c 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -251,6 +251,10 @@ where /// The last time we logged a message about the peer set size last_peer_log: Option, + + /// The configured maximum number of peers that can be in the + /// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`] + max_conns_per_ip: usize, } impl Drop for PeerSet @@ -270,6 +274,7 @@ where D::Error: Into, C: ChainTip, { + #[allow(clippy::too_many_arguments)] /// Construct a peerset which uses `discover` to manage peer connections. /// /// Arguments: @@ -282,6 +287,9 @@ where /// - `inv_stream`: receives inventory changes from peers, /// allowing the peer set to direct inventory requests; /// - `address_book`: when peer set is busy, it logs address book diagnostics. + /// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time. + /// - `max_conns_per_ip`: configured maximum number of peers that can be in the + /// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`]. pub fn new( config: &Config, discover: D, @@ -290,6 +298,7 @@ where inv_stream: broadcast::Receiver, address_metrics: watch::Receiver, minimum_peer_version: MinimumPeerVersion, + max_conns_per_ip: Option, ) -> Self { Self { // New peers @@ -317,6 +326,8 @@ where // Metrics last_peer_log: None, address_metrics, + + max_conns_per_ip: max_conns_per_ip.unwrap_or(crate::constants::MAX_CONNS_PER_IP), } } @@ -476,6 +487,26 @@ where } } + /// Returns the number of peer connections Zebra already has with + /// the provided IP address + /// + /// # Performance + /// + /// This method is `O(connected peers)`, so it should not be called from a loop + /// that is already iterating through the peer set. + fn num_peers_with_ip(&self, ip: IpAddr) -> usize { + self.ready_services + .keys() + .chain(self.cancel_handles.keys()) + .filter(|addr| addr.ip() == ip) + .count() + } + + /// Returns `true` if Zebra is already connected to the IP and port in `addr`. + fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool { + self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr) + } + /// Checks for newly inserted or removed services. /// /// Puts inserted services in the unready list. @@ -496,7 +527,25 @@ where // - always do the same checks on every ready peer, and // - check for any errors that happened right after the handshake trace!(?key, "got Change::Insert from Discover"); - self.remove(&key); + + // # Security + // + // Drop the new peer if we are already connected to it. + // Preferring old connections avoids connection thrashing. + if self.has_peer_with_addr(key) { + std::mem::drop(svc); + continue; + } + + // # Security + // + // drop the new peer if there are already `MAX_CONNS_PER_IP` peers with + // the same IP address in the peer set. + if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip { + std::mem::drop(svc); + continue; + } + self.push_unready(key, svc); } } diff --git a/zebra-network/src/peer_set/set/tests.rs b/zebra-network/src/peer_set/set/tests.rs index a24330f98..d111792e0 100644 --- a/zebra-network/src/peer_set/set/tests.rs +++ b/zebra-network/src/peer_set/set/tests.rs @@ -117,6 +117,7 @@ struct PeerSetBuilder { inv_stream: Option>, address_book: Option>>, minimum_peer_version: Option>, + max_conns_per_ip: Option, } impl PeerSetBuilder<(), ()> { @@ -137,6 +138,7 @@ impl PeerSetBuilder { inv_stream: self.inv_stream, address_book: self.address_book, minimum_peer_version: self.minimum_peer_version, + max_conns_per_ip: self.max_conns_per_ip, } } @@ -146,13 +148,33 @@ impl PeerSetBuilder { minimum_peer_version: MinimumPeerVersion, ) -> PeerSetBuilder { PeerSetBuilder { - minimum_peer_version: Some(minimum_peer_version), config: self.config, discover: self.discover, demand_signal: self.demand_signal, handle_rx: self.handle_rx, inv_stream: self.inv_stream, address_book: self.address_book, + minimum_peer_version: Some(minimum_peer_version), + max_conns_per_ip: self.max_conns_per_ip, + } + } + + /// Use the provided [`MinimumPeerVersion`] instance when constructing the [`PeerSet`] instance. + pub fn max_conns_per_ip(self, max_conns_per_ip: usize) -> PeerSetBuilder { + assert!( + max_conns_per_ip > 0, + "max_conns_per_ip must be greater than zero" + ); + + PeerSetBuilder { + config: self.config, + discover: self.discover, + demand_signal: self.demand_signal, + handle_rx: self.handle_rx, + inv_stream: self.inv_stream, + address_book: self.address_book, + minimum_peer_version: self.minimum_peer_version, + max_conns_per_ip: Some(max_conns_per_ip), } } } @@ -175,6 +197,7 @@ where let minimum_peer_version = self .minimum_peer_version .expect("`minimum_peer_version` must be set"); + let max_conns_per_ip = self.max_conns_per_ip; let demand_signal = self .demand_signal @@ -196,6 +219,7 @@ where inv_stream, address_metrics, minimum_peer_version, + max_conns_per_ip, ); (peer_set, guard) diff --git a/zebra-network/src/peer_set/set/tests/prop.rs b/zebra-network/src/peer_set/set/tests/prop.rs index f8388880b..b7301fea2 100644 --- a/zebra-network/src/peer_set/set/tests/prop.rs +++ b/zebra-network/src/peer_set/set/tests/prop.rs @@ -42,6 +42,7 @@ proptest! { let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() .with_discover(discovered_peers) .with_minimum_peer_version(minimum_peer_version) + .max_conns_per_ip(usize::MAX) .build(); check_if_only_up_to_date_peers_are_live( @@ -72,6 +73,7 @@ proptest! { let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() .with_discover(discovered_peers) .with_minimum_peer_version(minimum_peer_version.clone()) + .max_conns_per_ip(usize::MAX) .build(); check_if_only_up_to_date_peers_are_live( @@ -122,6 +124,7 @@ proptest! { let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() .with_discover(discovered_peers) .with_minimum_peer_version(minimum_peer_version.clone()) + .max_conns_per_ip(usize::MAX) .build(); // Get the total number of active peers @@ -197,6 +200,7 @@ proptest! { let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() .with_discover(discovered_peers) .with_minimum_peer_version(minimum_peer_version.clone()) + .max_conns_per_ip(usize::MAX) .build(); // Remove peers, test broadcast until there is only 1 peer left in the peerset @@ -267,6 +271,7 @@ proptest! { let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() .with_discover(discovered_peers) .with_minimum_peer_version(minimum_peer_version.clone()) + .max_conns_per_ip(usize::MAX) .build(); // Remove peers diff --git a/zebra-network/src/peer_set/set/tests/vectors.rs b/zebra-network/src/peer_set/set/tests/vectors.rs index 89ff294a8..44bf9c1aa 100644 --- a/zebra-network/src/peer_set/set/tests/vectors.rs +++ b/zebra-network/src/peer_set/set/tests/vectors.rs @@ -174,6 +174,55 @@ fn peer_set_ready_multiple_connections() { }); } +#[test] +fn peer_set_rejects_connections_past_per_ip_limit() { + const NUM_PEER_VERSIONS: usize = crate::constants::MAX_CONNS_PER_IP + 1; + + // Use three peers with the same version + let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Nu5); + let peer_versions = PeerVersions { + peer_versions: [peer_version; NUM_PEER_VERSIONS].into_iter().collect(), + }; + + // Start the runtime + let (runtime, _init_guard) = zebra_test::init_async(); + let _guard = runtime.enter(); + + // Pause the runtime's timer so that it advances automatically. + // + // CORRECTNESS: This test does not depend on external resources that could really timeout, like + // real network connections. + tokio::time::pause(); + + // Get peers and client handles of them + let (discovered_peers, handles) = peer_versions.mock_peer_discovery(); + let (minimum_peer_version, _best_tip_height) = + MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet); + + // Make sure we have the right number of peers + assert_eq!(handles.len(), NUM_PEER_VERSIONS); + + runtime.block_on(async move { + // Build a peerset + let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new() + .with_discover(discovered_peers) + .with_minimum_peer_version(minimum_peer_version.clone()) + .build(); + + // Get peerset ready + let peer_ready = peer_set + .ready() + .await + .expect("peer set service is always ready"); + + // Check we have the right amount of ready services + assert_eq!( + peer_ready.ready_services.len(), + crate::constants::MAX_CONNS_PER_IP + ); + }); +} + /// Check that a peer set with an empty inventory registry routes requests to a random ready peer. #[test] fn peer_set_route_inv_empty_registry() {