Limit the number of initial peers (#2913)

* limit the number of initial peers

* Move more code out of zebra_network::initialize

* Always limit the number of initial peers in the Config

This way, we can never get the unused peers out.

* Revert "Always limit the number of initial peers in the Config"

This reverts commit 81ede597c8.

Actually, this doesn't work, because we want those extra peers.

* Minor tweaks

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Alfredo Garcia 2021-10-21 20:04:46 -03:00 committed by GitHub
parent 4cdd12e2c4
commit 2de93bba8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 12 deletions

View File

@ -3,7 +3,7 @@
// Portions of this submodule were adapted from tower-balance, // Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed). // which is (c) 2019 Tower Contributors (MIT licensed).
use std::{net::SocketAddr, sync::Arc}; use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use futures::{ use futures::{
channel::mpsc, channel::mpsc,
@ -12,6 +12,7 @@ use futures::{
stream::{FuturesUnordered, StreamExt}, stream::{FuturesUnordered, StreamExt},
TryFutureExt, TryFutureExt,
}; };
use rand::seq::SliceRandom;
use tokio::{net::TcpListener, sync::broadcast, time::Instant}; use tokio::{net::TcpListener, sync::broadcast, time::Instant};
use tower::{ use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
@ -141,11 +142,7 @@ where
let config = config.clone(); let config = config.clone();
let outbound_connector = outbound_connector.clone(); let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone(); let peerset_tx = peerset_tx.clone();
async move { async move { add_initial_peers(&config, outbound_connector, peerset_tx).await }.boxed()
let initial_peers = config.initial_peers().await;
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
}
.boxed()
}; };
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
@ -192,11 +189,11 @@ where
(peer_set, address_book) (peer_set, address_book)
} }
/// Use the provided `handshaker` to connect to `initial_peers`, then send /// Use the provided `outbound_connector` to connect to the configured initial peers,
/// the results over `peerset_tx`. /// then send the resulting peer connections over `peerset_tx`.
#[instrument(skip(initial_peers, outbound_connector, peerset_tx))] #[instrument(skip(config, outbound_connector, peerset_tx))]
async fn add_initial_peers<S>( async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>, config: &Config,
outbound_connector: S, outbound_connector: S,
mut peerset_tx: mpsc::Sender<PeerChange>, mut peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<ActiveConnectionCounter, BoxError> ) -> Result<ActiveConnectionCounter, BoxError>
@ -208,14 +205,15 @@ where
> + Clone, > + Clone,
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
let initial_peer_count = initial_peers.len(); let initial_peers = limit_initial_peers(config).await;
let mut handshake_success_total: usize = 0; let mut handshake_success_total: usize = 0;
let mut handshake_error_total: usize = 0; let mut handshake_error_total: usize = 0;
let mut active_outbound_connections = ActiveConnectionCounter::new_counter(); let mut active_outbound_connections = ActiveConnectionCounter::new_counter();
info!( info!(
?initial_peer_count, initial_peer_count = ?initial_peers.len(),
?initial_peers, ?initial_peers,
"connecting to initial peer set" "connecting to initial peer set"
); );
@ -286,6 +284,32 @@ where
Ok(active_outbound_connections) Ok(active_outbound_connections)
} }
/// Limit the number of `initial_peers` addresses entries to the configured
/// `peerset_initial_target_size`.
///
/// The result is randomly chosen entries from the provided set of addresses.
async fn limit_initial_peers(config: &Config) -> HashSet<SocketAddr> {
let initial_peers = config.initial_peers().await;
let initial_peer_count = initial_peers.len();
// Limit the number of initial peers to `config.peerset_initial_target_size`
if initial_peer_count > config.peerset_initial_target_size {
info!(
"Limiting the initial peers list from {} to {}",
initial_peer_count, config.peerset_initial_target_size
);
}
let initial_peers_vect: Vec<SocketAddr> = initial_peers.iter().copied().collect();
// TODO: add unused peers to the AddressBook (#2931)
// https://docs.rs/rand/0.8.4/rand/seq/trait.SliceRandom.html#tymethod.partial_shuffle
initial_peers_vect
.choose_multiple(&mut rand::thread_rng(), config.peerset_initial_target_size)
.copied()
.collect()
}
/// Open a peer connection listener on `config.listen_addr`, /// Open a peer connection listener on `config.listen_addr`,
/// returning the opened [`TcpListener`], and the address it is bound to. /// returning the opened [`TcpListener`], and the address it is bound to.
/// ///