Prevent burst of reconnection attempts (#2251)

* Rate-limit new outbound peer connections

Set the rate-limiting sleep timer to use a delay added to the maximum
between the next peer connection instant and now. This ensures that the
timer always sleeps at least the time used for the delay.

This change fixes rate-limiting new outbound peer connections, since
before there could be a burst of attempts until the deadline progressed
to the current instant.

Fixes #2216

* Create `MetaAddr::alternate_node_strategy` helper

Creates arbitrary `MetaAddr`s as if they were network nodes that sent
their listening address.

* Test outbound peer connection rate limiting

Tests if connections are rate limited to 10 per second, and also tests
that sleeping before continuing with the attempts still respets the rate
limit and does not result in a burst of reconnection attempts.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-06-07 01:13:46 -03:00 committed by GitHub
parent 8b5b367c8a
commit aaef94c2bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 79 additions and 4 deletions

View File

@ -16,6 +16,12 @@ impl MetaAddr {
}) })
.boxed() .boxed()
} }
pub fn alternate_node_strategy() -> BoxedStrategy<Self> {
canonical_socket_addr()
.prop_map(|address| MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK))
.boxed()
}
} }
impl Arbitrary for MetaAddr { impl Arbitrary for MetaAddr {

View File

@ -1,7 +1,7 @@
use std::{cmp::min, mem, sync::Arc, time::Duration}; use std::{cmp::min, mem, sync::Arc, time::Duration};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, sleep_until, timeout, Sleep}; use tokio::time::{sleep, sleep_until, timeout, Instant, Sleep};
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use zebra_chain::serialization::DateTime32; use zebra_chain::serialization::DateTime32;
@ -282,7 +282,7 @@ where
/// new peer connections are initiated at least /// new peer connections are initiated at least
/// `MIN_PEER_CONNECTION_INTERVAL` apart. /// `MIN_PEER_CONNECTION_INTERVAL` apart.
pub async fn next(&mut self) -> Option<MetaAddr> { pub async fn next(&mut self) -> Option<MetaAddr> {
let current_deadline = self.next_peer_min_wait.deadline(); let current_deadline = self.next_peer_min_wait.deadline().max(Instant::now());
let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL);
mem::swap(&mut self.next_peer_min_wait, &mut sleep); mem::swap(&mut self.next_peer_min_wait, &mut sleep);

View File

@ -1,9 +1,19 @@
use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use tokio::{
runtime::Runtime,
time::{sleep, timeout},
};
use tracing::Span;
use zebra_chain::serialization::DateTime32; use zebra_chain::serialization::DateTime32;
use super::super::validate_addrs; use super::super::{validate_addrs, CandidateSet};
use crate::types::MetaAddr; use crate::{types::MetaAddr, AddressBook, BoxError, Config, Request, Response};
proptest! { proptest! {
/// Test that validated gossiped peers never have a `last_seen` time that's in the future. /// Test that validated gossiped peers never have a `last_seen` time that's in the future.
@ -21,3 +31,62 @@ proptest! {
} }
} }
} }
proptest! {
#![proptest_config(ProptestConfig::with_cases(16))]
/// Test that new outbound peer connections are rate-limited.
#[test]
fn new_outbound_peer_connections_are_rate_limited(
peers in vec(MetaAddr::alternate_node_strategy(), 10),
initial_candidates in 0..4usize,
extra_candidates in 0..4usize,
) {
zebra_test::init();
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
let peer_service = tower::service_fn(|_| async {
unreachable!("Mock peer service is never used");
});
let mut address_book = AddressBook::new(&Config::default(), Span::none());
address_book.extend(peers);
let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service);
let checks = async move {
// Check rate limiting for initial peers
check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await;
// Sleep more than the rate limiting delay
sleep(Duration::from_millis(400)).await;
// Check that the next peers are still respecting the rate limiting, without causing a
// burst of reconnections
check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await;
};
assert!(runtime.block_on(timeout(Duration::from_secs(10), checks)).is_ok());
}
}
/// Check if obtaining a certain number of reconnection peers is rate limited.
///
/// # Panics
///
/// Will panic if a reconnection peer is returned too quickly, or if no reconnection peer is
/// returned.
async fn check_candidates_rate_limiting<S>(candidate_set: &mut CandidateSet<S>, candidates: usize)
where
S: tower::Service<Request, Response = Response, Error = BoxError>,
S::Future: Send + 'static,
{
let mut minimum_reconnect_instant = Instant::now();
for _ in 0..candidates {
assert!(candidate_set.next().await.is_some());
assert!(Instant::now() >= minimum_reconnect_instant);
minimum_reconnect_instant += CandidateSet::<S>::MIN_PEER_CONNECTION_INTERVAL;
}
}