Rate limit `GetAddr` messages to any peer, Credit: Equilibrium (#2254)

* Rename field to `wait_next_handshake`

Make the name a bit more clear regarding to the field's purpose.

* Move `MIN_PEER_CONNECTION_INTERVAL` to `constants`

Move it to the `constants` module so that it is placed closer to other
constants for consistency and to make it easier to see any relationships
when changing them.

* Rate limit calls to `CandidateSet::update()`

This effectively rate limits requests asking for more peer addresses
sent to the same peer. A new `min_next_crawl` field was added to
`CandidateSet`, and `update` only sends requests for more peer addresses
if the call happens after the instant specified by that field. After
sending the requests, the field value is updated so that there is a
`MIN_PEER_GET_ADDR_INTERVAL` wait time until the next `update` call
sends requests again.

* Include `update_initial` in rate limiting

Move the rate limiting code from `update` to `update_timeout`, so that
both `update` and `update_initial` get rate limited.

* Test `CandidateSet::update` rate limiting

Create a `CandidateSet` that uses a mocked `PeerService`. The mocked
service always returns an empty list of peers, but it also checks that
the requests only happen after expected instants, determined by the
fanout amount and the rate limiting interval.

* Refactor to create a `mock_peer_service` helper

Move the code from the test to a utility function so that another test
will be able to use it as well.

* Check number of times service was called

Use an `AtomicUsize` shared between the service and the test body that
the service increments on every call. The test can then verify if the
service was called the number of times it expected.

* Test calling `update` after `update_initial`

The call to `update` should be skipped because the call to
`update_initial` should also be considered in the rate limiting.

* Mention that call to `update` may be skipped

Make it clearer that in this case the rate limiting causes calls to be
skipped, and not that there's an internal sleep that happens.

Also remove "to the same peers", because it's more general than that.

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2021-06-08 20:42:45 -03:00 committed by GitHub
parent b07b825286
commit e8d5f6978d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 209 additions and 31 deletions

View File

@ -38,6 +38,7 @@ zebra-chain = { path = "../zebra-chain" }
[dev-dependencies]
proptest = "0.10"
proptest-derive = "0.3"
tokio = { version = "0.3.6", features = ["test-util"] }
toml = "0.5"
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }

View File

@ -49,6 +49,22 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20);
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);
/// The minimum time between successive calls to [`CandidateSet::next()`][Self::next].
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that new peer connections
/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart.
pub const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);
/// The minimum time between successive calls to [`CandidateSet::update()`][Self::update].
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that requests for more
/// peer addresses are sent at least `MIN_PEER_GET_ADDR_INTERVAL` apart.
pub const MIN_PEER_GET_ADDR_INTERVAL: Duration = Duration::from_secs(10);
/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY

View File

@ -108,7 +108,8 @@ mod tests;
pub(super) struct CandidateSet<S> {
pub(super) address_book: Arc<std::sync::Mutex<AddressBook>>,
pub(super) peer_service: S,
next_peer_min_wait: Sleep,
wait_next_handshake: Sleep,
min_next_crawl: Instant,
}
impl<S> CandidateSet<S>
@ -116,14 +117,6 @@ where
S: Service<Request, Response = Response, Error = BoxError>,
S::Future: Send + 'static,
{
/// The minimum time between successive calls to `CandidateSet::next()`.
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that new peer connections
/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart.
const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);
/// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers.
pub fn new(
address_book: Arc<std::sync::Mutex<AddressBook>>,
@ -132,7 +125,8 @@ where
CandidateSet {
address_book,
peer_service,
next_peer_min_wait: sleep(Duration::from_secs(0)),
wait_next_handshake: sleep(Duration::from_secs(0)),
min_next_crawl: Instant::now(),
}
}
@ -165,6 +159,13 @@ where
///
/// [`next`][Self::next] puts peers into the [`AttemptPending`] state.
///
/// ## Security
///
/// This call is rate-limited to prevent sending a burst of repeated requests for new peer
/// addresses. Each call will only update the [`CandidateSet`] if more time
/// than [`MIN_PEER_GET_ADDR_INTERVAL`][constants::MIN_PEER_GET_ADDR_INTERVAL] has passed since
/// the last call. Otherwise, the update is skipped.
///
/// [`Responded`]: crate::PeerAddrState::Responded
/// [`NeverAttemptedGossiped`]: crate::PeerAddrState::NeverAttemptedGossiped
/// [`Failed`]: crate::PeerAddrState::Failed
@ -178,20 +179,27 @@ where
///
/// See [`update_initial`][Self::update_initial] for details.
async fn update_timeout(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
// CORRECTNESS
// SECURITY
//
// Use a timeout to avoid deadlocks when there are no connected
// peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task
// to complete.
if let Ok(fanout_result) =
timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await
{
fanout_result?;
} else {
// update must only return an error for permanent failures
info!("timeout waiting for the peer service to become ready");
// Rate limit sending `GetAddr` messages to peers.
if self.min_next_crawl <= Instant::now() {
// CORRECTNESS
//
// Use a timeout to avoid deadlocks when there are no connected
// peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task
// to complete.
if let Ok(fanout_result) =
timeout(constants::REQUEST_TIMEOUT, self.update_fanout(fanout_limit)).await
{
fanout_result?;
} else {
// update must only return an error for permanent failures
info!("timeout waiting for the peer service to become ready");
}
self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
}
Ok(())
@ -280,11 +288,11 @@ where
///
/// Zebra resists distributed denial of service attacks by making sure that
/// new peer connections are initiated at least
/// `MIN_PEER_CONNECTION_INTERVAL` apart.
/// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart.
pub async fn next(&mut self) -> Option<MetaAddr> {
let current_deadline = self.next_peer_min_wait.deadline().max(Instant::now());
let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL);
mem::swap(&mut self.next_peer_min_wait, &mut sleep);
let current_deadline = self.wait_next_handshake.deadline().max(Instant::now());
let mut sleep = sleep_until(current_deadline + constants::MIN_PEER_CONNECTION_INTERVAL);
mem::swap(&mut self.wait_next_handshake, &mut sleep);
// # Correctness
//

View File

@ -13,7 +13,10 @@ use tracing::Span;
use zebra_chain::serialization::DateTime32;
use super::super::{validate_addrs, CandidateSet};
use crate::{types::MetaAddr, AddressBook, BoxError, Config, Request, Response};
use crate::{
constants::MIN_PEER_CONNECTION_INTERVAL, types::MetaAddr, AddressBook, BoxError, Config,
Request, Response,
};
proptest! {
/// Test that validated gossiped peers never have a `last_seen` time that's in the future.
@ -87,6 +90,6 @@ where
assert!(candidate_set.next().await.is_some());
assert!(Instant::now() >= minimum_reconnect_instant);
minimum_reconnect_instant += CandidateSet::<S>::MIN_PEER_CONNECTION_INTERVAL;
minimum_reconnect_instant += MIN_PEER_CONNECTION_INTERVAL;
}
}

View File

@ -1,14 +1,32 @@
use std::{
collections::VecDeque,
convert::TryInto,
iter,
net::{IpAddr, SocketAddr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration as StdDuration,
};
use chrono::{DateTime, Duration, Utc};
use futures::future;
use tokio::{
runtime::Runtime,
time::{self, Instant},
};
use tower::Service;
use tracing::Span;
use zebra_chain::serialization::DateTime32;
use super::super::validate_addrs;
use crate::types::{MetaAddr, PeerServices};
use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::{GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL},
types::{MetaAddr, PeerServices},
AddressBook, Config, Request, Response,
};
/// Test that offset is applied when all addresses have `last_seen` times in the future.
#[test]
@ -115,6 +133,92 @@ fn rejects_all_addresses_if_applying_offset_causes_an_underflow() {
assert!(validated_peers.next().is_none());
}
/// Test that calls to [`CandidateSet::update`] are rate limited.
#[test]
fn candidate_set_updates_are_rate_limited() {
// Run the test for enough time for `update` to actually run three times
const INTERVALS_TO_RUN: u32 = 3;
// How many times should `update` be called in each rate limit interval
const POLL_FREQUENCY_FACTOR: u32 = 3;
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
let address_book = AddressBook::new(&Config::default(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service);
runtime.block_on(async move {
time::pause();
let time_limit = Instant::now()
+ INTERVALS_TO_RUN * MIN_PEER_GET_ADDR_INTERVAL
+ StdDuration::from_secs(1);
while Instant::now() <= time_limit {
candidate_set
.update()
.await
.expect("Call to CandidateSet::update should not fail");
time::advance(MIN_PEER_GET_ADDR_INTERVAL / POLL_FREQUENCY_FACTOR).await;
}
assert_eq!(
call_count.load(Ordering::SeqCst),
INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT
);
});
}
/// Test that a call to [`CandidateSet::update`] after a call to [`CandidateSet::update_inital`] is
/// rate limited.
#[test]
fn candidate_set_update_after_update_initial_is_rate_limited() {
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
let address_book = AddressBook::new(&Config::default(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set = CandidateSet::new(Arc::new(Mutex::new(address_book)), peer_service);
runtime.block_on(async move {
time::pause();
// Call `update_initial` first
candidate_set
.update_initial(GET_ADDR_FANOUT)
.await
.expect("Call to CandidateSet::update should not fail");
assert_eq!(call_count.load(Ordering::SeqCst), GET_ADDR_FANOUT);
// The following two calls to `update` should be skipped
candidate_set
.update()
.await
.expect("Call to CandidateSet::update should not fail");
time::advance(MIN_PEER_GET_ADDR_INTERVAL / 2).await;
candidate_set
.update()
.await
.expect("Call to CandidateSet::update should not fail");
assert_eq!(call_count.load(Ordering::SeqCst), GET_ADDR_FANOUT);
// After waiting for at least the minimum interval the call to `update` should succeed
time::advance(MIN_PEER_GET_ADDR_INTERVAL).await;
candidate_set
.update()
.await
.expect("Call to CandidateSet::update should not fail");
assert_eq!(call_count.load(Ordering::SeqCst), 2 * GET_ADDR_FANOUT);
});
}
// Utility functions
/// Create a mock list of gossiped [`MetaAddr`]s with the specified `last_seen_times`.
///
/// The IP address and port of the generated ports should not matter for the test.
@ -135,3 +239,49 @@ fn mock_gossiped_peers(last_seen_times: impl IntoIterator<Item = DateTime<Utc>>)
})
.collect()
}
/// Create a mock `PeerSet` service that checks that requests to it are rate limited.
///
/// The function also returns an atomic counter, that can be used for checking how many times the
/// service was called.
fn mock_peer_service<E>() -> (
impl Service<
Request,
Response = Response,
Future = future::Ready<Result<Response, E>>,
Error = E,
> + 'static,
Arc<AtomicUsize>,
) {
let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL;
let call_counter = Arc::new(AtomicUsize::new(0));
let call_counter_to_return = call_counter.clone();
let mut peer_request_tracker: VecDeque<_> =
iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect();
let service = tower::service_fn(move |request| {
match request {
Request::Peers => {
// Get time from queue that the request is authorized to be sent
let authorized_request_time = peer_request_tracker
.pop_front()
.expect("peer_request_tracker should always have GET_ADDR_FANOUT elements");
// Check that the request was rate limited
assert!(Instant::now() >= authorized_request_time);
// Push a new authorization, updated by the rate limit interval
peer_request_tracker.push_back(Instant::now() + rate_limit_interval);
// Increment count of calls
call_counter.fetch_add(1, Ordering::SeqCst);
// Return an empty list of peer addresses
future::ok(Response::Peers(vec![]))
}
_ => unreachable!("Received an unexpected internal message: {:?}", request),
}
});
(service, call_counter_to_return)
}