fix(net): Limit the number of leftover nonces in the self-connection nonce set (#6534)
* Use a stricter connection rate limit for successful inbound peer connections * Limit the number of nonces in the self-connection nonce set * Rate-limit failed inbound connections as well * Justify the sleep and the yield_now * Use the configured connection limit rather than a constant * Tests that the number of nonces is limited (#37) * Tests that the number of nonces is limited * removes unused constant * test that it reaches the nonce limit --------- Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
parent
3343c8494d
commit
0d50d973d2
|
@ -149,14 +149,48 @@ pub const MAX_RECENT_PEER_AGE: Duration32 = Duration32::from_days(3);
|
||||||
/// Using a prime number makes sure that heartbeats don't synchronise with crawls.
|
/// Using a prime number makes sure that heartbeats don't synchronise with crawls.
|
||||||
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59);
|
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(59);
|
||||||
|
|
||||||
/// The minimum time between successive calls to
|
/// The minimum time between outbound peer connections, implemented by
|
||||||
/// [`CandidateSet::next`][crate::peer_set::CandidateSet::next].
|
/// [`CandidateSet::next`][crate::peer_set::CandidateSet::next].
|
||||||
///
|
///
|
||||||
/// ## Security
|
/// ## Security
|
||||||
///
|
///
|
||||||
/// Zebra resists distributed denial of service attacks by making sure that new peer connections
|
/// Zebra resists distributed denial of service attacks by making sure that new outbound peer
|
||||||
/// are initiated at least [`MIN_PEER_CONNECTION_INTERVAL`] apart.
|
/// connections are only initiated after this minimum time has elapsed.
|
||||||
pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(25);
|
///
|
||||||
|
/// It also enforces a minimum per-peer reconnection interval, and filters failed outbound peers.
|
||||||
|
pub const MIN_OUTBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(50);
|
||||||
|
|
||||||
|
/// The minimum time between _successful_ inbound peer connections, implemented by
|
||||||
|
/// `peer_set::initialize::accept_inbound_connections`.
|
||||||
|
///
|
||||||
|
/// To support multiple peers connecting simultaneously, this is less than the
|
||||||
|
/// [`HANDSHAKE_TIMEOUT`].
|
||||||
|
///
|
||||||
|
/// ## Security
|
||||||
|
///
|
||||||
|
/// Zebra resists distributed denial of service attacks by limiting the inbound connection rate.
|
||||||
|
/// After a _successful_ inbound connection, new inbound peer connections are only accepted,
|
||||||
|
/// and our side of the handshake initiated, after this minimum time has elapsed.
|
||||||
|
///
|
||||||
|
/// The inbound interval is much longer than the outbound interval, because Zebra does not
|
||||||
|
/// control the selection or reconnections of inbound peers.
|
||||||
|
pub const MIN_INBOUND_PEER_CONNECTION_INTERVAL: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
/// The minimum time between _failed_ inbound peer connections, implemented by
|
||||||
|
/// `peer_set::initialize::accept_inbound_connections`.
|
||||||
|
///
|
||||||
|
/// This is a tradeoff between:
|
||||||
|
/// - the memory, CPU, and network usage of each new connection attempt, and
|
||||||
|
/// - denying service to honest peers due to an attack which makes many inbound connections.
|
||||||
|
///
|
||||||
|
/// Attacks that reach this limit should be managed using a firewall or intrusion prevention system.
|
||||||
|
///
|
||||||
|
/// ## Security
|
||||||
|
///
|
||||||
|
/// Zebra resists distributed denial of service attacks by limiting the inbound connection rate.
|
||||||
|
/// After a _failed_ inbound connection, new inbound peer connections are only accepted,
|
||||||
|
/// and our side of the handshake initiated, after this minimum time has elapsed.
|
||||||
|
pub const MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL: Duration = Duration::from_millis(10);
|
||||||
|
|
||||||
/// The minimum time between successive calls to
|
/// The minimum time between successive calls to
|
||||||
/// [`CandidateSet::update`][crate::peer_set::CandidateSet::update].
|
/// [`CandidateSet::update`][crate::peer_set::CandidateSet::update].
|
||||||
|
@ -324,9 +358,6 @@ pub mod magics {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
||||||
use std::convert::TryFrom;
|
|
||||||
|
|
||||||
use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING;
|
use zebra_chain::parameters::POST_BLOSSOM_POW_TARGET_SPACING;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -363,18 +394,20 @@ mod tests {
|
||||||
"The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA.");
|
"The EWMA decay time should be higher than the request timeout, so timed out peers are penalised by the EWMA.");
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
|
MIN_PEER_RECONNECTION_DELAY.as_secs() as f32
|
||||||
* MIN_PEER_CONNECTION_INTERVAL
|
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
|
||||||
< MIN_PEER_RECONNECTION_DELAY,
|
* MIN_OUTBOUND_PEER_CONNECTION_INTERVAL)
|
||||||
"each peer should get at least one connection attempt in each connection interval",
|
.as_secs() as f32
|
||||||
|
>= 0.5,
|
||||||
|
"most peers should get a connection attempt in each connection interval",
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
MIN_PEER_RECONNECTION_DELAY.as_secs()
|
MIN_PEER_RECONNECTION_DELAY.as_secs() as f32
|
||||||
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
|
/ (u32::try_from(MAX_ADDRS_IN_ADDRESS_BOOK).expect("fits in u32")
|
||||||
* MIN_PEER_CONNECTION_INTERVAL)
|
* MIN_OUTBOUND_PEER_CONNECTION_INTERVAL)
|
||||||
.as_secs()
|
.as_secs() as f32
|
||||||
<= 2,
|
<= 2.0,
|
||||||
"each peer should only have a few connection attempts in each connection interval",
|
"each peer should only have a few connection attempts in each connection interval",
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
cmp::min,
|
cmp::min,
|
||||||
collections::HashSet,
|
|
||||||
fmt,
|
fmt,
|
||||||
future::Future,
|
future::Future,
|
||||||
net::{IpAddr, Ipv4Addr, SocketAddr},
|
net::{IpAddr, Ipv4Addr, SocketAddr},
|
||||||
|
@ -14,6 +13,7 @@ use std::{
|
||||||
|
|
||||||
use chrono::{TimeZone, Utc};
|
use chrono::{TimeZone, Utc};
|
||||||
use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
|
use futures::{channel::oneshot, future, pin_mut, FutureExt, SinkExt, StreamExt};
|
||||||
|
use indexmap::IndexSet;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncRead, AsyncWrite},
|
io::{AsyncRead, AsyncWrite},
|
||||||
sync::broadcast,
|
sync::broadcast,
|
||||||
|
@ -48,6 +48,9 @@ use crate::{
|
||||||
BoxError, Config, VersionMessage,
|
BoxError, Config, VersionMessage,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
/// A [`Service`] that handshakes with a remote peer and constructs a
|
/// A [`Service`] that handshakes with a remote peer and constructs a
|
||||||
/// client/server pair.
|
/// client/server pair.
|
||||||
///
|
///
|
||||||
|
@ -71,7 +74,7 @@ where
|
||||||
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
|
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
|
||||||
inv_collector: broadcast::Sender<InventoryChange>,
|
inv_collector: broadcast::Sender<InventoryChange>,
|
||||||
minimum_peer_version: MinimumPeerVersion<C>,
|
minimum_peer_version: MinimumPeerVersion<C>,
|
||||||
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
|
nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
|
||||||
|
|
||||||
parent_span: Span,
|
parent_span: Span,
|
||||||
}
|
}
|
||||||
|
@ -515,7 +518,7 @@ where
|
||||||
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
let (tx, _rx) = tokio::sync::mpsc::channel(1);
|
||||||
tx
|
tx
|
||||||
});
|
});
|
||||||
let nonces = Arc::new(futures::lock::Mutex::new(HashSet::new()));
|
let nonces = Arc::new(futures::lock::Mutex::new(IndexSet::new()));
|
||||||
let user_agent = self.user_agent.unwrap_or_default();
|
let user_agent = self.user_agent.unwrap_or_default();
|
||||||
let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
|
let our_services = self.our_services.unwrap_or_else(PeerServices::empty);
|
||||||
let relay = self.relay.unwrap_or(false);
|
let relay = self.relay.unwrap_or(false);
|
||||||
|
@ -572,7 +575,7 @@ pub async fn negotiate_version<PeerTransport>(
|
||||||
peer_conn: &mut Framed<PeerTransport, Codec>,
|
peer_conn: &mut Framed<PeerTransport, Codec>,
|
||||||
connected_addr: &ConnectedAddr,
|
connected_addr: &ConnectedAddr,
|
||||||
config: Config,
|
config: Config,
|
||||||
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
|
nonces: Arc<futures::lock::Mutex<IndexSet<Nonce>>>,
|
||||||
user_agent: String,
|
user_agent: String,
|
||||||
our_services: PeerServices,
|
our_services: PeerServices,
|
||||||
relay: bool,
|
relay: bool,
|
||||||
|
@ -583,12 +586,43 @@ where
|
||||||
{
|
{
|
||||||
// Create a random nonce for this connection
|
// Create a random nonce for this connection
|
||||||
let local_nonce = Nonce::default();
|
let local_nonce = Nonce::default();
|
||||||
|
|
||||||
|
// Insert the nonce for this handshake into the shared nonce set.
|
||||||
|
// Each connection has its own connection state, and handshakes execute concurrently.
|
||||||
|
//
|
||||||
// # Correctness
|
// # Correctness
|
||||||
//
|
//
|
||||||
// It is ok to wait for the lock here, because handshakes have a short
|
// It is ok to wait for the lock here, because handshakes have a short
|
||||||
// timeout, and the async mutex will be released when the task times
|
// timeout, and the async mutex will be released when the task times
|
||||||
// out.
|
// out.
|
||||||
nonces.lock().await.insert(local_nonce);
|
//
|
||||||
|
// Duplicate nonces don't matter here, because 64-bit random collisions are very rare.
|
||||||
|
// If they happen, we're probably replacing a leftover nonce from a failed connection,
|
||||||
|
// which wasn't cleaned up when it closed.
|
||||||
|
{
|
||||||
|
let mut locked_nonces = nonces.lock().await;
|
||||||
|
locked_nonces.insert(local_nonce);
|
||||||
|
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Limit the amount of memory used for nonces.
|
||||||
|
// Nonces can be left in the set if the connection fails or times out between
|
||||||
|
// the nonce being inserted, and it being removed.
|
||||||
|
//
|
||||||
|
// Zebra has strict connection limits, so we limit the number of nonces to
|
||||||
|
// the configured connection limit.
|
||||||
|
// This is a tradeoff between:
|
||||||
|
// - avoiding memory denial of service attacks which make large numbers of connections,
|
||||||
|
// for example, 100 failed inbound connections takes 1 second.
|
||||||
|
// - memory usage: 16 bytes per `Nonce`, 3.2 kB for 200 nonces
|
||||||
|
// - collision probability: 2^32 has ~50% collision probability, so we use a lower limit
|
||||||
|
// <https://en.wikipedia.org/wiki/Birthday_problem#Probability_of_a_shared_birthday_(collision)>
|
||||||
|
while locked_nonces.len() > config.peerset_total_connection_limit() {
|
||||||
|
locked_nonces.shift_remove_index(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::mem::drop(locked_nonces);
|
||||||
|
};
|
||||||
|
|
||||||
// Don't leak our exact clock skew to our peers. On the other hand,
|
// Don't leak our exact clock skew to our peers. On the other hand,
|
||||||
// we can't deviate too much, or zcashd will get confused.
|
// we can't deviate too much, or zcashd will get confused.
|
||||||
|
@ -684,10 +718,15 @@ where
|
||||||
// We must wait for the lock before we continue with the connection, to avoid
|
// We must wait for the lock before we continue with the connection, to avoid
|
||||||
// self-connection. If the connection times out, the async lock will be
|
// self-connection. If the connection times out, the async lock will be
|
||||||
// released.
|
// released.
|
||||||
|
//
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Connections that get a `Version` message will remove their nonces here,
|
||||||
|
// but connections that fail before this point can leave their nonces in the nonce set.
|
||||||
let nonce_reuse = {
|
let nonce_reuse = {
|
||||||
let mut locked_nonces = nonces.lock().await;
|
let mut locked_nonces = nonces.lock().await;
|
||||||
let nonce_reuse = locked_nonces.contains(&remote.nonce);
|
let nonce_reuse = locked_nonces.contains(&remote.nonce);
|
||||||
// Regardless of whether we observed nonce reuse, clean up the nonce set.
|
// Regardless of whether we observed nonce reuse, remove our own nonce from the nonce set.
|
||||||
locked_nonces.remove(&local_nonce);
|
locked_nonces.remove(&local_nonce);
|
||||||
nonce_reuse
|
nonce_reuse
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
//! Implements methods for testing [`Handshake`]
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
impl<S, C> Handshake<S, C>
|
||||||
|
where
|
||||||
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
||||||
|
S::Future: Send,
|
||||||
|
C: ChainTip + Clone + Send + 'static,
|
||||||
|
{
|
||||||
|
/// Returns a count of how many connection nonces are stored in this [`Handshake`]
|
||||||
|
pub async fn nonce_count(&self) -> usize {
|
||||||
|
self.nonces.lock().await.len()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,3 +1,5 @@
|
||||||
|
//! Candidate peer selection for outbound connections using the [`CandidateSet`].
|
||||||
|
|
||||||
use std::{cmp::min, sync::Arc};
|
use std::{cmp::min, sync::Arc};
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
@ -361,7 +363,8 @@ where
|
||||||
///
|
///
|
||||||
/// Zebra resists distributed denial of service attacks by making sure that
|
/// Zebra resists distributed denial of service attacks by making sure that
|
||||||
/// new peer connections are initiated at least
|
/// new peer connections are initiated at least
|
||||||
/// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart.
|
/// [`MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`][constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL]
|
||||||
|
/// apart.
|
||||||
///
|
///
|
||||||
/// [`Responded`]: crate::PeerAddrState::Responded
|
/// [`Responded`]: crate::PeerAddrState::Responded
|
||||||
pub async fn next(&mut self) -> Option<MetaAddr> {
|
pub async fn next(&mut self) -> Option<MetaAddr> {
|
||||||
|
@ -397,7 +400,7 @@ where
|
||||||
|
|
||||||
// Security: rate-limit new outbound peer connections
|
// Security: rate-limit new outbound peer connections
|
||||||
sleep_until(self.min_next_handshake).await;
|
sleep_until(self.min_next_handshake).await;
|
||||||
self.min_next_handshake = Instant::now() + constants::MIN_PEER_CONNECTION_INTERVAL;
|
self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
|
||||||
|
|
||||||
Some(next_peer)
|
Some(next_peer)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
//! Randomised property tests for candidate peer selection.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
env,
|
env,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
|
@ -13,7 +15,7 @@ use tracing::Span;
|
||||||
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
constants::MIN_PEER_CONNECTION_INTERVAL,
|
constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL,
|
||||||
meta_addr::{MetaAddr, MetaAddrChange},
|
meta_addr::{MetaAddr, MetaAddrChange},
|
||||||
AddressBook, BoxError, Request, Response,
|
AddressBook, BoxError, Request, Response,
|
||||||
};
|
};
|
||||||
|
@ -75,7 +77,7 @@ proptest! {
|
||||||
//
|
//
|
||||||
// Check that it takes less than the peer set candidate delay,
|
// Check that it takes less than the peer set candidate delay,
|
||||||
// and hope that is enough time for test machines with high CPU load.
|
// and hope that is enough time for test machines with high CPU load.
|
||||||
let less_than_min_interval = MIN_PEER_CONNECTION_INTERVAL - Duration::from_millis(1);
|
let less_than_min_interval = MIN_OUTBOUND_PEER_CONNECTION_INTERVAL - Duration::from_millis(1);
|
||||||
assert_eq!(runtime.block_on(timeout(less_than_min_interval, candidate_set.next())), Ok(None));
|
assert_eq!(runtime.block_on(timeout(less_than_min_interval, candidate_set.next())), Ok(None));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -112,7 +114,7 @@ proptest! {
|
||||||
// Check rate limiting for initial peers
|
// Check rate limiting for initial peers
|
||||||
check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await;
|
check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await;
|
||||||
// Sleep more than the rate limiting delay
|
// Sleep more than the rate limiting delay
|
||||||
sleep(MAX_TEST_CANDIDATES * MIN_PEER_CONNECTION_INTERVAL).await;
|
sleep(MAX_TEST_CANDIDATES * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL).await;
|
||||||
// Check that the next peers are still respecting the rate limiting, without causing a
|
// Check that the next peers are still respecting the rate limiting, without causing a
|
||||||
// burst of reconnections
|
// burst of reconnections
|
||||||
check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await;
|
check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await;
|
||||||
|
@ -121,7 +123,7 @@ proptest! {
|
||||||
// Allow enough time for the maximum number of candidates,
|
// Allow enough time for the maximum number of candidates,
|
||||||
// plus some extra time for test machines with high CPU load.
|
// plus some extra time for test machines with high CPU load.
|
||||||
// (The max delay asserts usually fail before hitting this timeout.)
|
// (The max delay asserts usually fail before hitting this timeout.)
|
||||||
let max_rate_limit_sleep = 3 * MAX_TEST_CANDIDATES * MIN_PEER_CONNECTION_INTERVAL;
|
let max_rate_limit_sleep = 3 * MAX_TEST_CANDIDATES * MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
|
||||||
let max_extra_delay = (2 * MAX_TEST_CANDIDATES + 1) * MAX_SLEEP_EXTRA_DELAY;
|
let max_extra_delay = (2 * MAX_TEST_CANDIDATES + 1) * MAX_SLEEP_EXTRA_DELAY;
|
||||||
assert!(runtime.block_on(timeout(max_rate_limit_sleep + max_extra_delay, checks)).is_ok());
|
assert!(runtime.block_on(timeout(max_rate_limit_sleep + max_extra_delay, checks)).is_ok());
|
||||||
}
|
}
|
||||||
|
@ -161,7 +163,8 @@ where
|
||||||
"rate-limited candidates should not be delayed too long: now: {now:?} max: {maximum_reconnect_instant:?}. Hint: is the test machine overloaded?",
|
"rate-limited candidates should not be delayed too long: now: {now:?} max: {maximum_reconnect_instant:?}. Hint: is the test machine overloaded?",
|
||||||
);
|
);
|
||||||
|
|
||||||
minimum_reconnect_instant = now + MIN_PEER_CONNECTION_INTERVAL;
|
minimum_reconnect_instant = now + MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
|
||||||
maximum_reconnect_instant = now + MIN_PEER_CONNECTION_INTERVAL + MAX_SLEEP_EXTRA_DELAY;
|
maximum_reconnect_instant =
|
||||||
|
now + MIN_OUTBOUND_PEER_CONNECTION_INTERVAL + MAX_SLEEP_EXTRA_DELAY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ use std::{
|
||||||
collections::{BTreeMap, HashSet},
|
collections::{BTreeMap, HashSet},
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
|
@ -175,6 +176,7 @@ where
|
||||||
let listen_fut = accept_inbound_connections(
|
let listen_fut = accept_inbound_connections(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
tcp_listener,
|
tcp_listener,
|
||||||
|
constants::MIN_INBOUND_PEER_CONNECTION_INTERVAL,
|
||||||
listen_handshaker,
|
listen_handshaker,
|
||||||
peerset_tx.clone(),
|
peerset_tx.clone(),
|
||||||
);
|
);
|
||||||
|
@ -273,9 +275,7 @@ where
|
||||||
// # Security
|
// # Security
|
||||||
//
|
//
|
||||||
// Resists distributed denial of service attacks by making sure that
|
// Resists distributed denial of service attacks by making sure that
|
||||||
// new peer connections are initiated at least
|
// new peer connections are initiated at least `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL` apart.
|
||||||
// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL]
|
|
||||||
// apart.
|
|
||||||
//
|
//
|
||||||
// # Correctness
|
// # Correctness
|
||||||
//
|
//
|
||||||
|
@ -297,9 +297,13 @@ where
|
||||||
// Spawn a new task to make the outbound connection.
|
// Spawn a new task to make the outbound connection.
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
// Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`,
|
// Only spawn one outbound connector per
|
||||||
// sleeping for an interval according to its index in the list.
|
// `MIN_OUTBOUND_PEER_CONNECTION_INTERVAL`,
|
||||||
sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await;
|
// by sleeping for the interval multiplied by the peer's index in the list.
|
||||||
|
sleep(
|
||||||
|
constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
// As soon as we create the connector future,
|
// As soon as we create the connector future,
|
||||||
// the handshake starts running as a spawned task.
|
// the handshake starts running as a spawned task.
|
||||||
|
@ -507,11 +511,13 @@ pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr)
|
||||||
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
||||||
/// the [`peer::Client`] result over `peerset_tx`.
|
/// the [`peer::Client`] result over `peerset_tx`.
|
||||||
///
|
///
|
||||||
/// Limit the number of active inbound connections based on `config`.
|
/// Limits the number of active inbound connections based on `config`,
|
||||||
|
/// and waits `min_inbound_peer_connection_interval` between connections.
|
||||||
#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
|
#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
|
||||||
async fn accept_inbound_connections<S>(
|
async fn accept_inbound_connections<S>(
|
||||||
config: Config,
|
config: Config,
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
|
min_inbound_peer_connection_interval: Duration,
|
||||||
mut handshaker: S,
|
mut handshaker: S,
|
||||||
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
|
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
|
||||||
) -> Result<(), BoxError>
|
) -> Result<(), BoxError>
|
||||||
|
@ -596,28 +602,35 @@ where
|
||||||
handshakes.push(Box::pin(handshake_task));
|
handshakes.push(Box::pin(handshake_task));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only spawn one inbound connection handshake per `MIN_PEER_CONNECTION_INTERVAL`.
|
// Rate-limit inbound connection handshakes.
|
||||||
// But clear out failed connections as fast as possible.
|
// But sleep longer after a successful connection,
|
||||||
|
// so we can clear out failed connections at a higher rate.
|
||||||
//
|
//
|
||||||
// If there is a flood of connections,
|
// If there is a flood of connections,
|
||||||
// this stops Zebra overloading the network with handshake data.
|
// this stops Zebra overloading the network with handshake data.
|
||||||
//
|
//
|
||||||
// Zebra can't control how many queued connections are waiting,
|
// Zebra can't control how many queued connections are waiting,
|
||||||
// but most OSes also limit the number of queued inbound connections on a listener port.
|
// but most OSes also limit the number of queued inbound connections on a listener port.
|
||||||
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
|
tokio::time::sleep(min_inbound_peer_connection_interval).await;
|
||||||
} else {
|
} else {
|
||||||
|
// Allow invalid connections to be cleared quickly,
|
||||||
|
// but still put a limit on our CPU and network usage from failed connections.
|
||||||
debug!(?inbound_result, "error accepting inbound connection");
|
debug!(?inbound_result, "error accepting inbound connection");
|
||||||
|
tokio::time::sleep(constants::MIN_INBOUND_PEER_FAILED_CONNECTION_INTERVAL).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Security: Let other tasks run after each connection is processed.
|
// Security: Let other tasks run after each connection is processed.
|
||||||
//
|
//
|
||||||
// Avoids remote peers starving other Zebra tasks using inbound connection successes or errors.
|
// Avoids remote peers starving other Zebra tasks using inbound connection successes or
|
||||||
|
// errors.
|
||||||
|
//
|
||||||
|
// Preventing a denial of service is important in this code, so we want to sleep *and* make
|
||||||
|
// the next connection after other tasks have run. (Sleeps are not guaranteed to do that.)
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An action that the peer crawler can take.
|
/// An action that the peer crawler can take.
|
||||||
#[allow(dead_code)]
|
|
||||||
enum CrawlerAction {
|
enum CrawlerAction {
|
||||||
/// Drop the demand signal because there are too many pending handshakes.
|
/// Drop the demand signal because there are too many pending handshakes.
|
||||||
DemandDrop,
|
DemandDrop,
|
||||||
|
|
|
@ -22,8 +22,8 @@ use std::{
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use futures::{channel::mpsc, FutureExt, StreamExt};
|
use futures::{channel::mpsc, FutureExt, StreamExt};
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
use tokio::{net::TcpStream, task::JoinHandle};
|
use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle};
|
||||||
use tower::{service_fn, Service};
|
use tower::{service_fn, Layer, Service, ServiceExt};
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32};
|
use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32};
|
||||||
|
@ -58,6 +58,9 @@ const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10);
|
||||||
/// Using a very short time can make the listener not run at all.
|
/// Using a very short time can make the listener not run at all.
|
||||||
const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10);
|
const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
/// The amount of time to make the inbound connection acceptor wait between peer connections.
|
||||||
|
const MIN_INBOUND_PEER_CONNECTION_INTERVAL_FOR_TESTS: Duration = Duration::from_millis(25);
|
||||||
|
|
||||||
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
|
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
|
||||||
/// and sends them to the `AddressBook`.
|
/// and sends them to the `AddressBook`.
|
||||||
///
|
///
|
||||||
|
@ -1067,7 +1070,9 @@ async fn add_initial_peers_is_rate_limited() {
|
||||||
assert_eq!(connections.len(), PEER_COUNT);
|
assert_eq!(connections.len(), PEER_COUNT);
|
||||||
// Make sure the rate limiting worked by checking if it took long enough
|
// Make sure the rate limiting worked by checking if it took long enough
|
||||||
assert!(
|
assert!(
|
||||||
elapsed > constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul((PEER_COUNT - 1) as u32),
|
elapsed
|
||||||
|
> constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL
|
||||||
|
.saturating_mul((PEER_COUNT - 1) as u32),
|
||||||
"elapsed only {elapsed:?}"
|
"elapsed only {elapsed:?}"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1090,6 +1095,94 @@ async fn add_initial_peers_is_rate_limited() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test that the number of nonces is limited when peers send an invalid response or
|
||||||
|
/// if handshakes time out and are dropped.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn remnant_nonces_from_outbound_connections_are_limited() {
|
||||||
|
use tower::timeout::TimeoutLayer;
|
||||||
|
|
||||||
|
let _init_guard = zebra_test::init();
|
||||||
|
|
||||||
|
// This test should not require network access.
|
||||||
|
|
||||||
|
const TEST_PEERSET_INITIAL_TARGET_SIZE: usize = 3;
|
||||||
|
|
||||||
|
// Create a test config that listens on an unused port.
|
||||||
|
let listen_addr = "127.0.0.1:0".parse().unwrap();
|
||||||
|
let config = Config {
|
||||||
|
listen_addr,
|
||||||
|
peerset_initial_target_size: TEST_PEERSET_INITIAL_TARGET_SIZE,
|
||||||
|
..Config::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
|
||||||
|
let nil_inbound_service =
|
||||||
|
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });
|
||||||
|
|
||||||
|
let hs = peer::Handshake::builder()
|
||||||
|
.with_config(config.clone())
|
||||||
|
.with_inbound_service(nil_inbound_service)
|
||||||
|
.with_user_agent(crate::constants::USER_AGENT.to_string())
|
||||||
|
.with_latest_chain_tip(NoChainTip)
|
||||||
|
.want_transactions(true)
|
||||||
|
.finish()
|
||||||
|
.expect("configured all required parameters");
|
||||||
|
|
||||||
|
let mut outbound_connector = hs_timeout.layer(peer::Connector::new(hs.clone()));
|
||||||
|
|
||||||
|
let mut active_outbound_connections = ActiveConnectionCounter::new_counter();
|
||||||
|
|
||||||
|
let expected_max_nonces = config.peerset_total_connection_limit();
|
||||||
|
let num_connection_attempts = 2 * expected_max_nonces;
|
||||||
|
|
||||||
|
for i in 1..num_connection_attempts {
|
||||||
|
let expected_nonce_count = expected_max_nonces.min(i);
|
||||||
|
|
||||||
|
let (tcp_listener, addr) = open_listener(&config.clone()).await;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (mut tcp_stream, _addr) = tcp_listener
|
||||||
|
.accept()
|
||||||
|
.await
|
||||||
|
.expect("client connection should succeed");
|
||||||
|
|
||||||
|
tcp_stream
|
||||||
|
.shutdown()
|
||||||
|
.await
|
||||||
|
.expect("shutdown should succeed");
|
||||||
|
});
|
||||||
|
|
||||||
|
let outbound_connector = outbound_connector
|
||||||
|
.ready()
|
||||||
|
.await
|
||||||
|
.expect("outbound connector never errors");
|
||||||
|
|
||||||
|
let connection_tracker = active_outbound_connections.track_connection();
|
||||||
|
|
||||||
|
let req = OutboundConnectorRequest {
|
||||||
|
addr,
|
||||||
|
connection_tracker,
|
||||||
|
};
|
||||||
|
|
||||||
|
outbound_connector
|
||||||
|
.call(req)
|
||||||
|
.await
|
||||||
|
.expect_err("connection attempt should fail");
|
||||||
|
|
||||||
|
let nonce_count = hs.nonce_count().await;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
expected_max_nonces >= nonce_count,
|
||||||
|
"number of nonces should be limited to `peerset_total_connection_limit`"
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
expected_nonce_count == nonce_count,
|
||||||
|
"number of nonces should be the lesser of the number of closed connections and `peerset_total_connection_limit`"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Test that [`init`] does not deadlock in `add_initial_peers`,
|
/// Test that [`init`] does not deadlock in `add_initial_peers`,
|
||||||
/// even if the seeders return a lot of peers.
|
/// even if the seeders return a lot of peers.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -1360,6 +1453,7 @@ where
|
||||||
let listen_fut = accept_inbound_connections(
|
let listen_fut = accept_inbound_connections(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
tcp_listener,
|
tcp_listener,
|
||||||
|
MIN_INBOUND_PEER_CONNECTION_INTERVAL_FOR_TESTS,
|
||||||
listen_handshaker,
|
listen_handshaker,
|
||||||
peerset_tx.clone(),
|
peerset_tx.clone(),
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue