diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs index 5c4d28723..8c85c95a0 100644 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ b/zebra-network/src/meta_addr/arbitrary.rs @@ -16,6 +16,12 @@ impl MetaAddr { }) .boxed() } + + pub fn alternate_node_strategy() -> BoxedStrategy { + canonical_socket_addr() + .prop_map(|address| MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK)) + .boxed() + } } impl Arbitrary for MetaAddr { diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index ef84b3409..2197ca149 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,7 +1,7 @@ use std::{cmp::min, mem, sync::Arc, time::Duration}; use futures::stream::{FuturesUnordered, StreamExt}; -use tokio::time::{sleep, sleep_until, timeout, Sleep}; +use tokio::time::{sleep, sleep_until, timeout, Instant, Sleep}; use tower::{Service, ServiceExt}; use zebra_chain::serialization::DateTime32; @@ -282,7 +282,7 @@ where /// new peer connections are initiated at least /// `MIN_PEER_CONNECTION_INTERVAL` apart. pub async fn next(&mut self) -> Option { - let current_deadline = self.next_peer_min_wait.deadline(); + let current_deadline = self.next_peer_min_wait.deadline().max(Instant::now()); let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL); mem::swap(&mut self.next_peer_min_wait, &mut sleep); diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index e35139cc0..da6c12ea9 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -1,9 +1,19 @@ +use std::{ + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + use proptest::{collection::vec, prelude::*}; +use tokio::{ + runtime::Runtime, + time::{sleep, timeout}, +}; +use tracing::Span; use zebra_chain::serialization::DateTime32; -use super::super::validate_addrs; -use crate::types::MetaAddr; +use super::super::{validate_addrs, CandidateSet}; +use crate::{types::MetaAddr, AddressBook, BoxError, Config, Request, Response}; proptest! { /// Test that validated gossiped peers never have a `last_seen` time that's in the future. @@ -21,3 +31,62 @@ proptest! { } } } + +proptest! { + #![proptest_config(ProptestConfig::with_cases(16))] + + /// Test that new outbound peer connections are rate-limited. + #[test] + fn new_outbound_peer_connections_are_rate_limited( + peers in vec(MetaAddr::alternate_node_strategy(), 10), + initial_candidates in 0..4usize, + extra_candidates in 0..4usize, + ) { + zebra_test::init(); + + let runtime = Runtime::new().expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + let peer_service = tower::service_fn(|_| async { + unreachable!("Mock peer service is never used"); + }); + + let mut address_book = AddressBook::new(&Config::default(), Span::none()); + address_book.extend(peers); + + let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service); + + let checks = async move { + // Check rate limiting for initial peers + check_candidates_rate_limiting(&mut candidate_set, initial_candidates).await; + // Sleep more than the rate limiting delay + sleep(Duration::from_millis(400)).await; + // Check that the next peers are still respecting the rate limiting, without causing a + // burst of reconnections + check_candidates_rate_limiting(&mut candidate_set, extra_candidates).await; + }; + + assert!(runtime.block_on(timeout(Duration::from_secs(10), checks)).is_ok()); + } +} + +/// Check if obtaining a certain number of reconnection peers is rate limited. +/// +/// # Panics +/// +/// Will panic if a reconnection peer is returned too quickly, or if no reconnection peer is +/// returned. +async fn check_candidates_rate_limiting(candidate_set: &mut CandidateSet, candidates: usize) +where + S: tower::Service, + S::Future: Send + 'static, +{ + let mut minimum_reconnect_instant = Instant::now(); + + for _ in 0..candidates { + assert!(candidate_set.next().await.is_some()); + assert!(Instant::now() >= minimum_reconnect_instant); + + minimum_reconnect_instant += CandidateSet::::MIN_PEER_CONNECTION_INTERVAL; + } +}