fix(network): Rate-limit inbound connections per IP. (#7041)

* Adds RecentByIp

* Removes new [cfg(not(test))]s, supports configurable max_conn_per_ip in RecentByIp and account_inbound_connections

Updates tests

* Uses self.time_limit instead of constant

* Adds sleep after dropping connections

Uses partition_point & split_off

Moves tests to separate module

* Apply suggestions from code review

Co-authored-by: teor <teor@riseup.net>

* Always prune before adding

* Tweak comments

* Move the time calculation outside the binary search closure

---------

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Arya 2023-07-13 22:26:46 -04:00 committed by GitHub
parent be5cfad07f
commit 7b0dedd3a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 194 additions and 9 deletions

View File

@ -51,6 +51,8 @@ use crate::{
#[cfg(test)]
mod tests;
mod recent_by_ip;
/// A successful outbound peer connection attempt or inbound connection handshake.
///
/// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections
@ -576,6 +578,9 @@ where
+ Clone,
S::Future: Send + 'static,
{
let mut recent_inbound_connections =
recent_by_ip::RecentByIp::new(None, Some(config.max_connections_per_ip));
let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with(
config.peerset_inbound_connection_limit(),
"Inbound Connections",
@ -605,10 +610,14 @@ where
if active_inbound_connections.update_count()
>= config.peerset_inbound_connection_limit()
|| recent_inbound_connections.is_past_limit_or_add(addr.ip())
{
// Too many open inbound connections or pending handshakes already.
// Close the connection.
std::mem::drop(tcp_stream);
// Allow invalid connections to be cleared quickly,
// but still put a limit on our CPU and network usage from failed connections.
tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
continue;
}

View File

@ -0,0 +1,94 @@
//! A set of IPs from recent connection attempts.
use std::{
collections::{HashMap, VecDeque},
net::IpAddr,
time::{Duration, Instant},
};
use crate::constants;
#[cfg(test)]
mod tests;
#[derive(Debug)]
/// Stores IPs of recently attempted inbound connections.
pub struct RecentByIp {
/// The list of IPs in decreasing connection age order.
pub by_time: VecDeque<(IpAddr, Instant)>,
/// Stores IPs for recently attempted inbound connections.
pub by_ip: HashMap<IpAddr, usize>,
/// The maximum number of peer connections Zebra will keep for a given IP address
/// before it drops any additional peer connections with that IP.
pub max_connections_per_ip: usize,
/// The duration to wait after an entry is added before removing it.
pub time_limit: Duration,
}
impl Default for RecentByIp {
fn default() -> Self {
Self::new(None, None)
}
}
impl RecentByIp {
/// Creates a new [`RecentByIp`]
pub fn new(time_limit: Option<Duration>, max_connections_per_ip: Option<usize>) -> Self {
let (by_time, by_ip) = Default::default();
Self {
by_time,
by_ip,
time_limit: time_limit.unwrap_or(constants::MIN_PEER_RECONNECTION_DELAY),
max_connections_per_ip: max_connections_per_ip
.unwrap_or(constants::DEFAULT_MAX_CONNS_PER_IP),
}
}
/// Prunes outdated entries, checks if there's a recently attempted inbound connection with
/// this IP, and adds the entry to `by_time`, and `by_ip` if needed.
///
/// Returns true if the recently attempted inbound connection count is past the configured limit.
pub fn is_past_limit_or_add(&mut self, ip: IpAddr) -> bool {
let now = Instant::now();
self.prune_by_time(now);
let count = self.by_ip.entry(ip).or_default();
if *count >= self.max_connections_per_ip {
true
} else {
*count += 1;
self.by_time.push_back((ip, now));
false
}
}
/// Prunes entries older than `time_limit`, decrementing or removing their counts in `by_ip`.
fn prune_by_time(&mut self, now: Instant) {
// Currently saturates to zero:
// <https://doc.rust-lang.org/std/time/struct.Instant.html#monotonicity>
//
// This discards the whole structure if the time limit is very large,
// which is unexpected, but stops this list growing without limit.
// After the handshake, the peer set will remove any duplicate connections over the limit.
let age_limit = now - self.time_limit;
// `by_time` must be sorted for this to work.
let split_off_idx = self.by_time.partition_point(|&(_, time)| time <= age_limit);
let updated_by_time = self.by_time.split_off(split_off_idx);
for (ip, _) in &self.by_time {
if let Some(count) = self.by_ip.get_mut(ip) {
*count -= 1;
if *count == 0 {
self.by_ip.remove(ip);
}
}
}
self.by_time = updated_by_time;
}
}

View File

@ -0,0 +1,69 @@
//! Fixed test vectors for recent IP limits.
use std::time::Duration;
use crate::peer_set::initialize::recent_by_ip::RecentByIp;
#[test]
fn old_connection_attempts_are_pruned() {
const TEST_TIME_LIMIT: Duration = Duration::from_secs(5);
let _init_guard = zebra_test::init();
let mut recent_connections = RecentByIp::new(Some(TEST_TIME_LIMIT), None);
let ip = "127.0.0.1".parse().expect("should parse");
assert!(
!recent_connections.is_past_limit_or_add(ip),
"should not be past limit"
);
assert!(
recent_connections.is_past_limit_or_add(ip),
"should be past max_connections_per_ip limit"
);
std::thread::sleep(TEST_TIME_LIMIT / 3);
assert!(
recent_connections.is_past_limit_or_add(ip),
"should still contain entry after a third of the time limit"
);
std::thread::sleep(3 * TEST_TIME_LIMIT / 4);
assert!(
!recent_connections.is_past_limit_or_add(ip),
"should prune entry after 13/12 * time_limit"
);
const TEST_MAX_CONNS_PER_IP: usize = 3;
let mut recent_connections =
RecentByIp::new(Some(TEST_TIME_LIMIT), Some(TEST_MAX_CONNS_PER_IP));
for _ in 0..TEST_MAX_CONNS_PER_IP {
assert!(
!recent_connections.is_past_limit_or_add(ip),
"should not be past limit"
);
}
assert!(
recent_connections.is_past_limit_or_add(ip),
"should be past max_connections_per_ip limit"
);
std::thread::sleep(TEST_TIME_LIMIT / 3);
assert!(
recent_connections.is_past_limit_or_add(ip),
"should still be past limit after a third of the reconnection delay"
);
std::thread::sleep(3 * TEST_TIME_LIMIT / 4);
assert!(
!recent_connections.is_past_limit_or_add(ip),
"should prune entry after 13/12 * time_limit"
);
}

View File

@ -727,7 +727,7 @@ async fn listener_peer_limit_zero_handshake_panic() {
});
let (_config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(0, unreachable_inbound_handshaker).await;
spawn_inbound_listener_with_peer_limit(0, None, unreachable_inbound_handshaker).await;
let peer_result = peerset_rx.try_next();
assert!(
@ -752,7 +752,7 @@ async fn listener_peer_limit_one_handshake_error() {
service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) });
let (_config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(1, error_inbound_handshaker).await;
spawn_inbound_listener_with_peer_limit(1, None, error_inbound_handshaker).await;
let peer_result = peerset_rx.try_next();
assert!(
@ -794,8 +794,12 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() {
Ok(fake_client)
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await;
let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(
1,
usize::MAX,
success_disconnect_inbound_handshaker,
)
.await;
let mut peer_count: usize = 0;
loop {
@ -853,7 +857,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() {
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await;
spawn_inbound_listener_with_peer_limit(1, None, success_stay_open_inbound_handshaker).await;
let mut peer_change_count: usize = 0;
loop {
@ -917,7 +921,7 @@ async fn listener_peer_limit_default_handshake_error() {
service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) });
let (_config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(None, error_inbound_handshaker).await;
spawn_inbound_listener_with_peer_limit(None, None, error_inbound_handshaker).await;
let peer_result = peerset_rx.try_next();
assert!(
@ -963,8 +967,12 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() {
Ok(fake_client)
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await;
let (config, mut peerset_rx) = spawn_inbound_listener_with_peer_limit(
None,
usize::MAX,
success_disconnect_inbound_handshaker,
)
.await;
let mut peer_count: usize = 0;
loop {
@ -1022,7 +1030,8 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() {
});
let (config, mut peerset_rx) =
spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await;
spawn_inbound_listener_with_peer_limit(None, None, success_stay_open_inbound_handshaker)
.await;
let mut peer_change_count: usize = 0;
loop {
@ -1609,6 +1618,7 @@ where
/// Returns the generated [`Config`], and the peer set receiver.
async fn spawn_inbound_listener_with_peer_limit<S>(
peerset_initial_target_size: impl Into<Option<usize>>,
max_connections_per_ip: impl Into<Option<usize>>,
listen_handshaker: S,
) -> (Config, mpsc::Receiver<DiscoveredPeer>)
where
@ -1623,6 +1633,9 @@ where
let listen_addr = "127.0.0.1:0".parse().unwrap();
let mut config = Config {
listen_addr,
max_connections_per_ip: max_connections_per_ip
.into()
.unwrap_or(constants::DEFAULT_MAX_CONNS_PER_IP),
..Config::default()
};