Implement outbound connection rate limiting - includes config rename with alias (#1855)

* Implement outbound connection rate limiting
* fix breaking change on config

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Jane Lusby 2021-03-09 17:36:05 -08:00 committed by GitHub
parent d1ab8e9cf4
commit 03aa6f671f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 15 deletions

View File

@ -33,8 +33,14 @@ pub struct Config {
/// set size to reduce Zebra's bandwidth usage.
pub peerset_initial_target_size: usize,
/// How frequently we attempt to connect to a new peer.
pub new_peer_interval: Duration,
/// How frequently we attempt to crawl the network to discover new peer
/// connections.
///
/// This duration only pertains to the rate at which zebra crawls for new
/// peers, not the rate zebra connects to new peers, which is restricted to
/// CandidateSet::PEER_CONNECTION_INTERVAL
#[serde(alias = "new_peer_interval")]
pub crawl_new_peer_interval: Duration,
}
impl Config {
@ -146,7 +152,7 @@ impl Default for Config {
network: Network::Mainnet,
initial_mainnet_peers: mainnet_peers,
initial_testnet_peers: testnet_peers,
new_peer_interval: Duration::from_secs(60),
crawl_new_peer_interval: Duration::from_secs(60),
// The default peerset target size should be large enough to ensure
// nodes have a reliable set of peers. But it should also be limited

View File

@ -1,7 +1,12 @@
use std::sync::{Arc, Mutex};
use std::{
mem,
sync::{Arc, Mutex},
time::Duration,
};
use chrono::Utc;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, sleep_until, Sleep};
use tower::{Service, ServiceExt};
use crate::{types::MetaAddr, AddressBook, BoxError, PeerAddrState, Request, Response};
@ -102,6 +107,7 @@ use crate::{types::MetaAddr, AddressBook, BoxError, PeerAddrState, Request, Resp
pub(super) struct CandidateSet<S> {
pub(super) peer_set: Arc<Mutex<AddressBook>>,
pub(super) peer_service: S,
next_peer_min_wait: Sleep,
}
impl<S> CandidateSet<S>
@ -109,11 +115,20 @@ where
S: Service<Request, Response = Response, Error = BoxError>,
S::Future: Send + 'static,
{
/// The minimum time between successive calls to `CandidateSet::next()`.
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that new peer connections
/// are initiated at least `MIN_PEER_CONNECTION_INTERVAL` apart.
const MIN_PEER_CONNECTION_INTERVAL: Duration = Duration::from_millis(100);
/// Uses `peer_set` and `peer_service` to manage a [`CandidateSet`] of peers.
pub fn new(peer_set: Arc<Mutex<AddressBook>>, peer_service: S) -> CandidateSet<S> {
CandidateSet {
peer_set,
peer_service,
next_peer_min_wait: sleep(Duration::from_secs(0)),
}
}
@ -188,13 +203,31 @@ where
///
/// Live `Responded` peers will stay live if they keep responding, or
/// become a reconnection candidate if they stop responding.
pub fn next(&mut self) -> Option<MetaAddr> {
let mut peer_set_guard = self.peer_set.lock().unwrap();
let mut reconnect = peer_set_guard.reconnection_peers().next()?;
///
/// ## Security
///
/// Zebra resists distributed denial of service attacks by making sure that
/// new peer connections are initiated at least
/// `MIN_PEER_CONNECTION_INTERVAL` apart.
pub async fn next(&mut self) -> Option<MetaAddr> {
let current_deadline = self.next_peer_min_wait.deadline();
let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL);
mem::swap(&mut self.next_peer_min_wait, &mut sleep);
reconnect.last_seen = Utc::now();
reconnect.last_connection_state = PeerAddrState::AttemptPending;
peer_set_guard.update(reconnect);
let reconnect = {
let mut peer_set_guard = self.peer_set.lock().unwrap();
// It's okay to early return here because we're returning None
// instead of yielding the next connection.
let mut reconnect = peer_set_guard.reconnection_peers().next()?;
reconnect.last_seen = Utc::now();
reconnect.last_connection_state = PeerAddrState::AttemptPending;
peer_set_guard.update(reconnect);
reconnect
};
// This is the line that is most relevant to the above ## Security section
sleep.await;
Some(reconnect)
}

View File

@ -171,7 +171,7 @@ where
let crawl_guard = tokio::spawn(
crawl_and_dial(
config.new_peer_interval,
config.crawl_new_peer_interval,
demand_tx,
demand_rx,
candidates,
@ -269,7 +269,7 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `peer::Client` through a channel.
#[instrument(skip(
new_peer_interval,
crawl_new_peer_interval,
demand_tx,
demand_rx,
candidates,
@ -277,7 +277,7 @@ where
success_tx
))]
async fn crawl_and_dial<C, S>(
new_peer_interval: std::time::Duration,
crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
@ -304,7 +304,7 @@ where
// never terminates.
handshakes.push(future::pending().boxed());
let mut crawl_timer = tokio::time::interval(new_peer_interval);
let mut crawl_timer = tokio::time::interval(crawl_new_peer_interval);
loop {
metrics::gauge!(
@ -328,7 +328,7 @@ where
trace!("too many in-flight handshakes, dropping demand signal");
continue;
}
if let Some(candidate) = candidates.next() {
if let Some(candidate) = candidates.next().await {
debug!(?candidate.addr, "attempting outbound connection in response to demand");
connector.ready_and().await?;
handshakes.push(