From 0d50d973d28518731d15186dc1b445b2b3dddb9f Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 18 Apr 2023 18:13:19 +1000 Subject: [PATCH] fix(net): Limit the number of leftover nonces in the self-connection nonce set (#6534) * Use a stricter connection rate limit for successful inbound peer connections * Limit the number of nonces in the self-connection nonce set * Rate-limit failed inbound connections as well * Justify the sleep and the yield_now * Use the configured connection limit rather than a constant * Tests that the number of nonces is limited (#37) * Tests that the number of nonces is limited * removes unused constant * test that it reaches the nonce limit --------- Co-authored-by: Arya --- zebra-network/src/constants.rs | 63 ++++++++--- zebra-network/src/peer/handshake.rs | 51 +++++++-- zebra-network/src/peer/handshake/tests.rs | 15 +++ zebra-network/src/peer_set/candidate_set.rs | 7 +- .../src/peer_set/candidate_set/tests/prop.rs | 15 +-- zebra-network/src/peer_set/initialize.rs | 37 ++++--- .../src/peer_set/initialize/tests/vectors.rs | 100 +++++++++++++++++- 7 files changed, 244 insertions(+), 44 deletions(-) create mode 100644 zebra-network/src/peer/handshake/tests.rs diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 3b06a8360..e053fdec5 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -149,14 +149,48 @@ pub const MAX_RECENT_PEER_AGE: Duration32 = Duration32::from_days(3); /// Using a prime number makes sure that heartbeats don't synchronise with crawls. pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59); -/// The minimum time between successive calls to +/// The minimum time between outbound peer connections, implemented by /// [`CandidateSet::next`][crate::peer_set::CandidateSet::next]. /// /// ## Security /// -/// Zebra resists distributed denial of service attacks by making sure that new peer connections -/// are initiated at least [`MIN_PEER_CONNECTION_INTERVAL`] apart. -pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(25); +/// Zebra resists distributed denial of service attacks by making sure that new outbound peer +/// connections are only initiated after this minimum time has elapsed. +/// +/// It also enforces a minimum per-peer reconnection interval, and filters failed outbound peers. +pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(50); + +/// The minimum time between _successful_ inbound peer connections, implemented by +/// `peer_set::initialize::accept_inbound_connections`. +/// +/// To support multiple peers connecting simultaneously, this is less than the +/// [`HANDSHAKE_TIMEOUT`]. +/// +/// ## Security +/// +/// Zebra resists distributed denial of service attacks by limiting the inbound connection rate. +/// After a _successful_ inbound connection, new inbound peer connections are only accepted, +/// and our side of the handshake initiated, after this minimum time has elapsed. +/// +/// The inbound interval is much longer than the outbound interval, because Zebra does not +/// control the selection or reconnections of inbound peers. +pub const MIN_INBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_secs(1); + +/// The minimum time between _failed_ inbound peer connections, implemented by +/// `peer_set::initialize::accept_inbound_connections`. +/// +/// This is a tradeoff between: +/// - the memory, CPU, and network usage of each new connection attempt, and +/// - denying service to honest peers due to an attack which makes many inbound connections. +/// +/// Attacks that reach this limit should be managed using a firewall or intrusion prevention system. +/// +/// ## Security +/// +/// Zebra resists distributed denial of service attacks by limiting the inbound connection rate. +/// After a _failed_ inbound connection, new inbound peer connections are only accepted, +/// and our side of the handshake initiated, after this minimum time has elapsed. +pub const MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL: Duration = Duration::from_millis(10); /// The minimum time between successive calls to /// [`CandidateSet::update`][crate::peer_set::CandidateSet::update]. @@ -324,9 +358,6 @@ pub mod magics { #[cfg(test)] mod tests { - - use std::convert::TryFrom; - use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING; use super::*; @@ -363,18 +394,20 @@ mod tests { "The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA."); assert!( - u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32") - * MIN_PEER_CONNECTION_INTERVAL - < MIN_PEER_RECONNECTION_DELAY, - "each peer should get at least one connection attempt in each connection interval", + MIN_PEER_RECONNECTION_DELAY.as_secs() as f32 + / (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32") + * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL) + .as_secs() as f32 + >= 0.5, + "most peers should get a connection attempt in each connection interval", ); assert!( - MIN_PEER_RECONNECTION_DELAY.as_secs() + MIN_PEER_RECONNECTION_DELAY.as_secs() as f32 / (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32") - * MIN_PEER_CONNECTION_INTERVAL) - .as_secs() - <= 2, + * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL) + .as_secs() as f32 + <= 2.0, "each peer should only have a few connection attempts in each connection interval", ); } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index b4e273477..cb4793e95 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -2,7 +2,6 @@ use std::{ cmp::min, - collections::HashSet, fmt, future::Future, net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -14,6 +13,7 @@ use std::{ use chrono::{TimeZone, Utc}; use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt}; +use indexmap::IndexSet; use tokio::{ io::{AsyncRead, AsyncWrite}, sync::broadcast, @@ -48,6 +48,9 @@ use crate::{ BoxError, Config, VersionMessage, }; +#[cfg(test)] +mod tests; + /// A [`Service`] that handshakes with a remote peer and constructs a /// client/server pair. /// @@ -71,7 +74,7 @@ where address_book_updater: tokio::sync::mpsc::Sender, inv_collector: broadcast::Sender, minimum_peer_version: MinimumPeerVersion, - nonces: Arc>>, + nonces: Arc>>, parent_span: Span, } @@ -515,7 +518,7 @@ where let (tx, _rx) = tokio::sync::mpsc::channel(1); tx }); - let nonces = Arc::new(futures::lock::Mutex::new(HashSet::new())); + let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new())); let user_agent = self.user_agent.unwrap_or_default(); let our_services = self.our_services.unwrap_or_else(PeerServices::empty); let relay = self.relay.unwrap_or(false); @@ -572,7 +575,7 @@ pub async fn negotiate_version( peer_conn: &mut Framed, connected_addr: &ConnectedAddr, config: Config, - nonces: Arc>>, + nonces: Arc>>, user_agent: String, our_services: PeerServices, relay: bool, @@ -583,12 +586,43 @@ where { // Create a random nonce for this connection let local_nonce = Nonce::default(); + + // Insert the nonce for this handshake into the shared nonce set. + // Each connection has its own connection state, and handshakes execute concurrently. + // // # Correctness // // It is ok to wait for the lock here, because handshakes have a short // timeout, and the async mutex will be released when the task times // out. - nonces.lock().await.insert(local_nonce); + // + // Duplicate nonces don't matter here, because 64-bit random collisions are very rare. + // If they happen, we're probably replacing a leftover nonce from a failed connection, + // which wasn't cleaned up when it closed. + { + let mut locked_nonces = nonces.lock().await; + locked_nonces.insert(local_nonce); + + // # Security + // + // Limit the amount of memory used for nonces. + // Nonces can be left in the set if the connection fails or times out between + // the nonce being inserted, and it being removed. + // + // Zebra has strict connection limits, so we limit the number of nonces to + // the configured connection limit. + // This is a tradeoff between: + // - avoiding memory denial of service attacks which make large numbers of connections, + // for example, 100 failed inbound connections takes 1 second. + // - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces + // - collision probability: 2^32 has ~50% collision probability, so we use a lower limit + // + while locked_nonces.len() > config.peerset_total_connection_limit() { + locked_nonces.shift_remove_index(0); + } + + std::mem::drop(locked_nonces); + }; // Don't leak our exact clock skew to our peers. On the other hand, // we can't deviate too much, or zcashd will get confused. @@ -684,10 +718,15 @@ where // We must wait for the lock before we continue with the connection, to avoid // self-connection. If the connection times out, the async lock will be // released. + // + // # Security + // + // Connections that get a `Version` message will remove their nonces here, + // but connections that fail before this point can leave their nonces in the nonce set. let nonce_reuse = { let mut locked_nonces = nonces.lock().await; let nonce_reuse = locked_nonces.contains(&remote.nonce); - // Regardless of whether we observed nonce reuse, clean up the nonce set. + // Regardless of whether we observed nonce reuse, remove our own nonce from the nonce set. locked_nonces.remove(&local_nonce); nonce_reuse }; diff --git a/zebra-network/src/peer/handshake/tests.rs b/zebra-network/src/peer/handshake/tests.rs new file mode 100644 index 000000000..bf175c2c9 --- /dev/null +++ b/zebra-network/src/peer/handshake/tests.rs @@ -0,0 +1,15 @@ +//! Implements methods for testing [`Handshake`] + +use super::*; + +impl Handshake +where + S: Service + Clone + Send + 'static, + S::Future: Send, + C: ChainTip + Clone + Send + 'static, +{ + /// Returns a count of how many connection nonces are stored in this [`Handshake`] + pub async fn nonce_count(&self) -> usize { + self.nonces.lock().await.len() + } +} diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index e072b37e7..3a3d89338 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,3 +1,5 @@ +//! Candidate peer selection for outbound connections using the [`CandidateSet`]. + use std::{cmp::min, sync::Arc}; use chrono::Utc; @@ -361,7 +363,8 @@ where /// /// Zebra resists distributed denial of service attacks by making sure that /// new peer connections are initiated at least - /// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart. + /// [`MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`][constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL] + /// apart. /// /// [`Responded`]: crate::PeerAddrState::Responded pub async fn next(&mut self) -> Option { @@ -397,7 +400,7 @@ where // Security: rate-limit new outbound peer connections sleep_until(self.min_next_handshake).await; - self.min_next_handshake = Instant::now() + constants::MIN_PEER_CONNECTION_INTERVAL; + self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL; Some(next_peer) } diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index a54446d21..394e35df6 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -1,3 +1,5 @@ +//! Randomised property tests for candidate peer selection. + use std::{ env, net::SocketAddr, @@ -13,7 +15,7 @@ use tracing::Span; use zebra_chain::{parameters::Network::*, serialization::DateTime32}; use crate::{ - constants::MIN_PEER_CONNECTION_INTERVAL, + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL, meta_addr::{MetaAddr, MetaAddrChange}, AddressBook, BoxError, Request, Response, }; @@ -75,7 +77,7 @@ proptest! { // // Check that it takes less than the peer set candidate delay, // and hope that is enough time for test machines with high CPU load. - let less_than_min_interval = MIN_PEER_CONNECTION_INTERVAL - Duration::from_millis(1); + let less_than_min_interval = MIN_OUTBOUND_PEER_CONNECTION_INTERVAL - Duration::from_millis(1); assert_eq!(runtime.block_on(timeout(less_than_min_interval, candidate_set.next())), Ok(None)); } } @@ -112,7 +114,7 @@ proptest! { // Check rate limiting for initial peers check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await; // Sleep more than the rate limiting delay - sleep(MAX_TEST_CANDIDATES * MIN_PEER_CONNECTION_INTERVAL).await; + sleep(MAX_TEST_CANDIDATES * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL).await; // Check that the next peers are still respecting the rate limiting, without causing a // burst of reconnections check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await; @@ -121,7 +123,7 @@ proptest! { // Allow enough time for the maximum number of candidates, // plus some extra time for test machines with high CPU load. // (The max delay asserts usually fail before hitting this timeout.) - let max_rate_limit_sleep = 3 * MAX_TEST_CANDIDATES * MIN_PEER_CONNECTION_INTERVAL; + let max_rate_limit_sleep = 3 * MAX_TEST_CANDIDATES * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL; let max_extra_delay = (2 * MAX_TEST_CANDIDATES + 1) * MAX_SLEEP_EXTRA_DELAY; assert!(runtime.block_on(timeout(max_rate_limit_sleep + max_extra_delay, checks)).is_ok()); } @@ -161,7 +163,8 @@ where "rate-limited candidates should not be delayed too long: now: {now:?} max: {maximum_reconnect_instant:?}. Hint: is the test machine overloaded?", ); - minimum_reconnect_instant = now + MIN_PEER_CONNECTION_INTERVAL; - maximum_reconnect_instant = now + MIN_PEER_CONNECTION_INTERVAL + MAX_SLEEP_EXTRA_DELAY; + minimum_reconnect_instant = now + MIN_OUTBOUND_PEER_CONNECTION_INTERVAL; + maximum_reconnect_instant = + now + MIN_OUTBOUND_PEER_CONNECTION_INTERVAL + MAX_SLEEP_EXTRA_DELAY; } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index ba745756b..4abf769bb 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -7,6 +7,7 @@ use std::{ collections::{BTreeMap, HashSet}, net::SocketAddr, sync::Arc, + time::Duration, }; use futures::{ @@ -175,6 +176,7 @@ where let listen_fut = accept_inbound_connections( config.clone(), tcp_listener, + constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL, listen_handshaker, peerset_tx.clone(), ); @@ -273,9 +275,7 @@ where // # Security // // Resists distributed denial of service attacks by making sure that - // new peer connections are initiated at least - // [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] - // apart. + // new peer connections are initiated at least `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL` apart. // // # Correctness // @@ -297,9 +297,13 @@ where // Spawn a new task to make the outbound connection. tokio::spawn( async move { - // Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`, - // sleeping for an interval according to its index in the list. - sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await; + // Only spawn one outbound connector per + // `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`, + // by sleeping for the interval multiplied by the peer's index in the list. + sleep( + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32), + ) + .await; // As soon as we create the connector future, // the handshake starts running as a spawned task. @@ -507,11 +511,13 @@ pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends /// the [`peer::Client`] result over `peerset_tx`. /// -/// Limit the number of active inbound connections based on `config`. +/// Limits the number of active inbound connections based on `config`, +/// and waits `min_inbound_peer_connection_interval` between connections. #[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] async fn accept_inbound_connections( config: Config, listener: TcpListener, + min_inbound_peer_connection_interval: Duration, mut handshaker: S, peerset_tx: futures::channel::mpsc::Sender, ) -> Result<(), BoxError> @@ -596,28 +602,35 @@ where handshakes.push(Box::pin(handshake_task)); } - // Only spawn one inbound connection handshake per `MIN_PEER_CONNECTION_INTERVAL`. - // But clear out failed connections as fast as possible. + // Rate-limit inbound connection handshakes. + // But sleep longer after a successful connection, + // so we can clear out failed connections at a higher rate. // // If there is a flood of connections, // this stops Zebra overloading the network with handshake data. // // Zebra can't control how many queued connections are waiting, // but most OSes also limit the number of queued inbound connections on a listener port. - tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await; + tokio::time::sleep(min_inbound_peer_connection_interval).await; } else { + // Allow invalid connections to be cleared quickly, + // but still put a limit on our CPU and network usage from failed connections. debug!(?inbound_result, "error accepting inbound connection"); + tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await; } // Security: Let other tasks run after each connection is processed. // - // Avoids remote peers starving other Zebra tasks using inbound connection successes or errors. + // Avoids remote peers starving other Zebra tasks using inbound connection successes or + // errors. + // + // Preventing a denial of service is important in this code, so we want to sleep *and* make + // the next connection after other tasks have run. (Sleeps are not guaranteed to do that.) tokio::task::yield_now().await; } } /// An action that the peer crawler can take. -#[allow(dead_code)] enum CrawlerAction { /// Drop the demand signal because there are too many pending handshakes. DemandDrop, diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index c557a4f12..ff249feef 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -22,8 +22,8 @@ use std::{ use chrono::Utc; use futures::{channel::mpsc, FutureExt, StreamExt}; use indexmap::IndexSet; -use tokio::{net::TcpStream, task::JoinHandle}; -use tower::{service_fn, Service}; +use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle}; +use tower::{service_fn, Layer, Service, ServiceExt}; use tracing::Span; use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32}; @@ -58,6 +58,9 @@ const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10); /// Using a very short time can make the listener not run at all. const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10); +/// The amount of time to make the inbound connection acceptor wait between peer connections. +const MIN_INBOUND_PEER_CONNECTION_INTERVAL_FOR_TESTS: Duration = Duration::from_millis(25); + /// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports, /// and sends them to the `AddressBook`. /// @@ -1067,7 +1070,9 @@ async fn add_initial_peers_is_rate_limited() { assert_eq!(connections.len(), PEER_COUNT); // Make sure the rate limiting worked by checking if it took long enough assert!( - elapsed > constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul((PEER_COUNT - 1) as u32), + elapsed + > constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL + .saturating_mul((PEER_COUNT - 1) as u32), "elapsed only {elapsed:?}" ); @@ -1090,6 +1095,94 @@ async fn add_initial_peers_is_rate_limited() { ); } +/// Test that the number of nonces is limited when peers send an invalid response or +/// if handshakes time out and are dropped. +#[tokio::test] +async fn remnant_nonces_from_outbound_connections_are_limited() { + use tower::timeout::TimeoutLayer; + + let _init_guard = zebra_test::init(); + + // This test should not require network access. + + const TEST_PEERSET_INITIAL_TARGET_SIZE: usize = 3; + + // Create a test config that listens on an unused port. + let listen_addr = "127.0.0.1:0".parse().unwrap(); + let config = Config { + listen_addr, + peerset_initial_target_size: TEST_PEERSET_INITIAL_TARGET_SIZE, + ..Config::default() + }; + + let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT); + let nil_inbound_service = + tower::service_fn(|_req| async move { Ok::(Response::Nil) }); + + let hs = peer::Handshake::builder() + .with_config(config.clone()) + .with_inbound_service(nil_inbound_service) + .with_user_agent(crate::constants::USER_AGENT.to_string()) + .with_latest_chain_tip(NoChainTip) + .want_transactions(true) + .finish() + .expect("configured all required parameters"); + + let mut outbound_connector = hs_timeout.layer(peer::Connector::new(hs.clone())); + + let mut active_outbound_connections = ActiveConnectionCounter::new_counter(); + + let expected_max_nonces = config.peerset_total_connection_limit(); + let num_connection_attempts = 2 * expected_max_nonces; + + for i in 1..num_connection_attempts { + let expected_nonce_count = expected_max_nonces.min(i); + + let (tcp_listener, addr) = open_listener(&config.clone()).await; + + tokio::spawn(async move { + let (mut tcp_stream, _addr) = tcp_listener + .accept() + .await + .expect("client connection should succeed"); + + tcp_stream + .shutdown() + .await + .expect("shutdown should succeed"); + }); + + let outbound_connector = outbound_connector + .ready() + .await + .expect("outbound connector never errors"); + + let connection_tracker = active_outbound_connections.track_connection(); + + let req = OutboundConnectorRequest { + addr, + connection_tracker, + }; + + outbound_connector + .call(req) + .await + .expect_err("connection attempt should fail"); + + let nonce_count = hs.nonce_count().await; + + assert!( + expected_max_nonces >= nonce_count, + "number of nonces should be limited to `peerset_total_connection_limit`" + ); + + assert!( + expected_nonce_count == nonce_count, + "number of nonces should be the lesser of the number of closed connections and `peerset_total_connection_limit`" + ) + } +} + /// Test that [`init`] does not deadlock in `add_initial_peers`, /// even if the seeders return a lot of peers. #[tokio::test] @@ -1360,6 +1453,7 @@ where let listen_fut = accept_inbound_connections( config.clone(), tcp_listener, + MIN_INBOUND_PEER_CONNECTION_INTERVAL_FOR_TESTS, listen_handshaker, peerset_tx.clone(), );