fix(network): Limit number of peer connections per IP address, Ignore new peer connections from the same IP and port (#6980)
* Limits num peer conns per ip * Update zebra-network/src/peer_set/set.rs * Update zebra-network/src/constants.rs * Apply suggestions from code review Co-authored-by: teor <teor@riseup.net> * Keep old peer connections, rather than replacing them with new connections * Adds max_conns_per_ip field Configures the max to usize::MAX for some tests. * Adds a test to check that max_conns_per_ip is enforced --------- Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
0da2dcb503
commit
73ce8fbef0
|
@ -67,6 +67,10 @@ pub const INBOUND_PEER_LIMIT_MULTIPLIER: usize = 5;
|
||||||
/// See [`INBOUND_PEER_LIMIT_MULTIPLIER`] for details.
|
/// See [`INBOUND_PEER_LIMIT_MULTIPLIER`] for details.
|
||||||
pub const OUTBOUND_PEER_LIMIT_MULTIPLIER: usize = 3;
|
pub const OUTBOUND_PEER_LIMIT_MULTIPLIER: usize = 3;
|
||||||
|
|
||||||
|
/// The maximum number of peer connections Zebra will keep for a given IP address
|
||||||
|
/// before it drops any additional peer connections with that IP.
|
||||||
|
pub const MAX_CONNS_PER_IP: usize = 3;
|
||||||
|
|
||||||
/// The buffer size for the peer set.
|
/// The buffer size for the peer set.
|
||||||
///
|
///
|
||||||
/// This should be greater than 1 to avoid sender contention, but also reasonably
|
/// This should be greater than 1 to avoid sender contention, but also reasonably
|
||||||
|
|
|
@ -179,6 +179,7 @@ where
|
||||||
inv_receiver,
|
inv_receiver,
|
||||||
address_metrics,
|
address_metrics,
|
||||||
MinimumPeerVersion::new(latest_chain_tip, config.network),
|
MinimumPeerVersion::new(latest_chain_tip, config.network),
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
|
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
|
||||||
|
|
||||||
|
|
|
@ -251,6 +251,10 @@ where
|
||||||
|
|
||||||
/// The last time we logged a message about the peer set size
|
/// The last time we logged a message about the peer set size
|
||||||
last_peer_log: Option<Instant>,
|
last_peer_log: Option<Instant>,
|
||||||
|
|
||||||
|
/// The configured maximum number of peers that can be in the
|
||||||
|
/// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`]
|
||||||
|
max_conns_per_ip: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<D, C> Drop for PeerSet<D, C>
|
impl<D, C> Drop for PeerSet<D, C>
|
||||||
|
@ -270,6 +274,7 @@ where
|
||||||
D::Error: Into<BoxError>,
|
D::Error: Into<BoxError>,
|
||||||
C: ChainTip,
|
C: ChainTip,
|
||||||
{
|
{
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
/// Construct a peerset which uses `discover` to manage peer connections.
|
/// Construct a peerset which uses `discover` to manage peer connections.
|
||||||
///
|
///
|
||||||
/// Arguments:
|
/// Arguments:
|
||||||
|
@ -282,6 +287,9 @@ where
|
||||||
/// - `inv_stream`: receives inventory changes from peers,
|
/// - `inv_stream`: receives inventory changes from peers,
|
||||||
/// allowing the peer set to direct inventory requests;
|
/// allowing the peer set to direct inventory requests;
|
||||||
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
|
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
|
||||||
|
/// - `minimum_peer_version`: endpoint to see the minimum peer protocol version in real time.
|
||||||
|
/// - `max_conns_per_ip`: configured maximum number of peers that can be in the
|
||||||
|
/// peer set per IP, defaults to [`crate::constants::MAX_CONNS_PER_IP`].
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
discover: D,
|
discover: D,
|
||||||
|
@ -290,6 +298,7 @@ where
|
||||||
inv_stream: broadcast::Receiver<InventoryChange>,
|
inv_stream: broadcast::Receiver<InventoryChange>,
|
||||||
address_metrics: watch::Receiver<AddressMetrics>,
|
address_metrics: watch::Receiver<AddressMetrics>,
|
||||||
minimum_peer_version: MinimumPeerVersion<C>,
|
minimum_peer_version: MinimumPeerVersion<C>,
|
||||||
|
max_conns_per_ip: Option<usize>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
// New peers
|
// New peers
|
||||||
|
@ -317,6 +326,8 @@ where
|
||||||
// Metrics
|
// Metrics
|
||||||
last_peer_log: None,
|
last_peer_log: None,
|
||||||
address_metrics,
|
address_metrics,
|
||||||
|
|
||||||
|
max_conns_per_ip: max_conns_per_ip.unwrap_or(crate::constants::MAX_CONNS_PER_IP),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -476,6 +487,26 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the number of peer connections Zebra already has with
|
||||||
|
/// the provided IP address
|
||||||
|
///
|
||||||
|
/// # Performance
|
||||||
|
///
|
||||||
|
/// This method is `O(connected peers)`, so it should not be called from a loop
|
||||||
|
/// that is already iterating through the peer set.
|
||||||
|
fn num_peers_with_ip(&self, ip: IpAddr) -> usize {
|
||||||
|
self.ready_services
|
||||||
|
.keys()
|
||||||
|
.chain(self.cancel_handles.keys())
|
||||||
|
.filter(|addr| addr.ip() == ip)
|
||||||
|
.count()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `true` if Zebra is already connected to the IP and port in `addr`.
|
||||||
|
fn has_peer_with_addr(&self, addr: PeerSocketAddr) -> bool {
|
||||||
|
self.ready_services.contains_key(&addr) || self.cancel_handles.contains_key(&addr)
|
||||||
|
}
|
||||||
|
|
||||||
/// Checks for newly inserted or removed services.
|
/// Checks for newly inserted or removed services.
|
||||||
///
|
///
|
||||||
/// Puts inserted services in the unready list.
|
/// Puts inserted services in the unready list.
|
||||||
|
@ -496,7 +527,25 @@ where
|
||||||
// - always do the same checks on every ready peer, and
|
// - always do the same checks on every ready peer, and
|
||||||
// - check for any errors that happened right after the handshake
|
// - check for any errors that happened right after the handshake
|
||||||
trace!(?key, "got Change::Insert from Discover");
|
trace!(?key, "got Change::Insert from Discover");
|
||||||
self.remove(&key);
|
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Drop the new peer if we are already connected to it.
|
||||||
|
// Preferring old connections avoids connection thrashing.
|
||||||
|
if self.has_peer_with_addr(key) {
|
||||||
|
std::mem::drop(svc);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// drop the new peer if there are already `MAX_CONNS_PER_IP` peers with
|
||||||
|
// the same IP address in the peer set.
|
||||||
|
if self.num_peers_with_ip(key.ip()) >= self.max_conns_per_ip {
|
||||||
|
std::mem::drop(svc);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
self.push_unready(key, svc);
|
self.push_unready(key, svc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,7 @@ struct PeerSetBuilder<D, C> {
|
||||||
inv_stream: Option<broadcast::Receiver<InventoryChange>>,
|
inv_stream: Option<broadcast::Receiver<InventoryChange>>,
|
||||||
address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
|
address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
|
||||||
minimum_peer_version: Option<MinimumPeerVersion<C>>,
|
minimum_peer_version: Option<MinimumPeerVersion<C>>,
|
||||||
|
max_conns_per_ip: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerSetBuilder<(), ()> {
|
impl PeerSetBuilder<(), ()> {
|
||||||
|
@ -137,6 +138,7 @@ impl<D, C> PeerSetBuilder<D, C> {
|
||||||
inv_stream: self.inv_stream,
|
inv_stream: self.inv_stream,
|
||||||
address_book: self.address_book,
|
address_book: self.address_book,
|
||||||
minimum_peer_version: self.minimum_peer_version,
|
minimum_peer_version: self.minimum_peer_version,
|
||||||
|
max_conns_per_ip: self.max_conns_per_ip,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,13 +148,33 @@ impl<D, C> PeerSetBuilder<D, C> {
|
||||||
minimum_peer_version: MinimumPeerVersion<NewC>,
|
minimum_peer_version: MinimumPeerVersion<NewC>,
|
||||||
) -> PeerSetBuilder<D, NewC> {
|
) -> PeerSetBuilder<D, NewC> {
|
||||||
PeerSetBuilder {
|
PeerSetBuilder {
|
||||||
minimum_peer_version: Some(minimum_peer_version),
|
|
||||||
config: self.config,
|
config: self.config,
|
||||||
discover: self.discover,
|
discover: self.discover,
|
||||||
demand_signal: self.demand_signal,
|
demand_signal: self.demand_signal,
|
||||||
handle_rx: self.handle_rx,
|
handle_rx: self.handle_rx,
|
||||||
inv_stream: self.inv_stream,
|
inv_stream: self.inv_stream,
|
||||||
address_book: self.address_book,
|
address_book: self.address_book,
|
||||||
|
minimum_peer_version: Some(minimum_peer_version),
|
||||||
|
max_conns_per_ip: self.max_conns_per_ip,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Use the provided [`MinimumPeerVersion`] instance when constructing the [`PeerSet`] instance.
|
||||||
|
pub fn max_conns_per_ip(self, max_conns_per_ip: usize) -> PeerSetBuilder<D, C> {
|
||||||
|
assert!(
|
||||||
|
max_conns_per_ip > 0,
|
||||||
|
"max_conns_per_ip must be greater than zero"
|
||||||
|
);
|
||||||
|
|
||||||
|
PeerSetBuilder {
|
||||||
|
config: self.config,
|
||||||
|
discover: self.discover,
|
||||||
|
demand_signal: self.demand_signal,
|
||||||
|
handle_rx: self.handle_rx,
|
||||||
|
inv_stream: self.inv_stream,
|
||||||
|
address_book: self.address_book,
|
||||||
|
minimum_peer_version: self.minimum_peer_version,
|
||||||
|
max_conns_per_ip: Some(max_conns_per_ip),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -175,6 +197,7 @@ where
|
||||||
let minimum_peer_version = self
|
let minimum_peer_version = self
|
||||||
.minimum_peer_version
|
.minimum_peer_version
|
||||||
.expect("`minimum_peer_version` must be set");
|
.expect("`minimum_peer_version` must be set");
|
||||||
|
let max_conns_per_ip = self.max_conns_per_ip;
|
||||||
|
|
||||||
let demand_signal = self
|
let demand_signal = self
|
||||||
.demand_signal
|
.demand_signal
|
||||||
|
@ -196,6 +219,7 @@ where
|
||||||
inv_stream,
|
inv_stream,
|
||||||
address_metrics,
|
address_metrics,
|
||||||
minimum_peer_version,
|
minimum_peer_version,
|
||||||
|
max_conns_per_ip,
|
||||||
);
|
);
|
||||||
|
|
||||||
(peer_set, guard)
|
(peer_set, guard)
|
||||||
|
|
|
@ -42,6 +42,7 @@ proptest! {
|
||||||
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
||||||
.with_discover(discovered_peers)
|
.with_discover(discovered_peers)
|
||||||
.with_minimum_peer_version(minimum_peer_version)
|
.with_minimum_peer_version(minimum_peer_version)
|
||||||
|
.max_conns_per_ip(usize::MAX)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
check_if_only_up_to_date_peers_are_live(
|
check_if_only_up_to_date_peers_are_live(
|
||||||
|
@ -72,6 +73,7 @@ proptest! {
|
||||||
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
||||||
.with_discover(discovered_peers)
|
.with_discover(discovered_peers)
|
||||||
.with_minimum_peer_version(minimum_peer_version.clone())
|
.with_minimum_peer_version(minimum_peer_version.clone())
|
||||||
|
.max_conns_per_ip(usize::MAX)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
check_if_only_up_to_date_peers_are_live(
|
check_if_only_up_to_date_peers_are_live(
|
||||||
|
@ -122,6 +124,7 @@ proptest! {
|
||||||
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
||||||
.with_discover(discovered_peers)
|
.with_discover(discovered_peers)
|
||||||
.with_minimum_peer_version(minimum_peer_version.clone())
|
.with_minimum_peer_version(minimum_peer_version.clone())
|
||||||
|
.max_conns_per_ip(usize::MAX)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Get the total number of active peers
|
// Get the total number of active peers
|
||||||
|
@ -197,6 +200,7 @@ proptest! {
|
||||||
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
||||||
.with_discover(discovered_peers)
|
.with_discover(discovered_peers)
|
||||||
.with_minimum_peer_version(minimum_peer_version.clone())
|
.with_minimum_peer_version(minimum_peer_version.clone())
|
||||||
|
.max_conns_per_ip(usize::MAX)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Remove peers, test broadcast until there is only 1 peer left in the peerset
|
// Remove peers, test broadcast until there is only 1 peer left in the peerset
|
||||||
|
@ -267,6 +271,7 @@ proptest! {
|
||||||
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
||||||
.with_discover(discovered_peers)
|
.with_discover(discovered_peers)
|
||||||
.with_minimum_peer_version(minimum_peer_version.clone())
|
.with_minimum_peer_version(minimum_peer_version.clone())
|
||||||
|
.max_conns_per_ip(usize::MAX)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Remove peers
|
// Remove peers
|
||||||
|
|
|
@ -174,6 +174,55 @@ fn peer_set_ready_multiple_connections() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn peer_set_rejects_connections_past_per_ip_limit() {
|
||||||
|
const NUM_PEER_VERSIONS: usize = crate::constants::MAX_CONNS_PER_IP + 1;
|
||||||
|
|
||||||
|
// Use three peers with the same version
|
||||||
|
let peer_version = Version::min_specified_for_upgrade(Network::Mainnet, NetworkUpgrade::Nu5);
|
||||||
|
let peer_versions = PeerVersions {
|
||||||
|
peer_versions: [peer_version; NUM_PEER_VERSIONS].into_iter().collect(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start the runtime
|
||||||
|
let (runtime, _init_guard) = zebra_test::init_async();
|
||||||
|
let _guard = runtime.enter();
|
||||||
|
|
||||||
|
// Pause the runtime's timer so that it advances automatically.
|
||||||
|
//
|
||||||
|
// CORRECTNESS: This test does not depend on external resources that could really timeout, like
|
||||||
|
// real network connections.
|
||||||
|
tokio::time::pause();
|
||||||
|
|
||||||
|
// Get peers and client handles of them
|
||||||
|
let (discovered_peers, handles) = peer_versions.mock_peer_discovery();
|
||||||
|
let (minimum_peer_version, _best_tip_height) =
|
||||||
|
MinimumPeerVersion::with_mock_chain_tip(Network::Mainnet);
|
||||||
|
|
||||||
|
// Make sure we have the right number of peers
|
||||||
|
assert_eq!(handles.len(), NUM_PEER_VERSIONS);
|
||||||
|
|
||||||
|
runtime.block_on(async move {
|
||||||
|
// Build a peerset
|
||||||
|
let (mut peer_set, _peer_set_guard) = PeerSetBuilder::new()
|
||||||
|
.with_discover(discovered_peers)
|
||||||
|
.with_minimum_peer_version(minimum_peer_version.clone())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Get peerset ready
|
||||||
|
let peer_ready = peer_set
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect("peer set service is always ready");
|
||||||
|
|
||||||
|
// Check we have the right amount of ready services
|
||||||
|
assert_eq!(
|
||||||
|
peer_ready.ready_services.len(),
|
||||||
|
crate::constants::MAX_CONNS_PER_IP
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Check that a peer set with an empty inventory registry routes requests to a random ready peer.
|
/// Check that a peer set with an empty inventory registry routes requests to a random ready peer.
|
||||||
#[test]
|
#[test]
|
||||||
fn peer_set_route_inv_empty_registry() {
|
fn peer_set_route_inv_empty_registry() {
|
||||||
|
|
Loading…
Reference in New Issue