Add unused seed peers to the AddressBook (#2974)

* Add unused seed peers to the AddressBook

* Document a new `await`

We added an extra await on the AddressBook thread mutex.

Co-authored-by: teor <teor@riseup.net>

* Fix a typo

* Refactor names

* Return early from `limit_initial_peers`

* Add `proptest`s regressions

* Return `MetaAddr` instead of `None`

* Test if `zebra_network::init()` deadlocks

* Remove unneeded regressions

* Rename `TimestampCollector` to `AddressBookUpdater` (#2992)

* Rename `TimestampCollector` to `AddressBookUpdater`

* Update comments

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

* Move `all_peers` instead of copying them

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

* Make `Duration` a const

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

* Use a timeout instead of measuring the elapsed time

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>

* Copy `initial_peers` instead of moving them

* Refactor the position of `NewInitial` and `new_initial`

Co-authored-by: teor <teor@riseup.net>
Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
Marek 2021-11-04 12:34:00 +01:00 committed by GitHub
parent 59ee4168cb
commit d03161c63f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 67 deletions

View File

@ -6,12 +6,14 @@ use futures::{channel::mpsc, prelude::*};
use crate::{meta_addr::MetaAddrChange, AddressBook};
/// The timestamp collector hooks into incoming message streams for each peer and
/// records per-connection last-seen timestamps into an [`AddressBook`].
pub struct TimestampCollector {}
/// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or
/// add new initial peers to the address book.
pub struct AddressBookUpdater {}
impl TimestampCollector {
/// Spawn a new [`TimestampCollector`] task, updating a new [`AddressBook`]
impl AddressBookUpdater {
/// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`]
/// configured with a `local_listener`.
///
/// Returns handles for the transmission channel for timestamp events, and

View File

@ -56,6 +56,7 @@ extern crate bitflags;
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
mod address_book;
mod address_book_updater;
mod config;
pub mod constants;
mod isolated;
@ -64,7 +65,6 @@ mod peer;
mod peer_set;
mod policies;
mod protocol;
mod timestamp_collector;
pub use crate::{
address_book::AddressBook,

View File

@ -186,6 +186,15 @@ pub struct MetaAddr {
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
pub enum MetaAddrChange {
/// Creates a `MetaAddr` for an initial peer.
NewInitial {
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "canonical_socket_addr_strategy()")
)]
addr: SocketAddr,
},
/// Creates a new gossiped `MetaAddr`.
NewGossiped {
#[cfg_attr(
@ -250,6 +259,14 @@ pub enum MetaAddrChange {
}
impl MetaAddr {
/// Returns a [`MetaAddrChange::NewInitial`] for a peer that was excluded from
/// the list of the initial peers.
pub fn new_initial_peer(addr: SocketAddr) -> MetaAddrChange {
NewInitial {
addr: canonical_socket_addr(addr),
}
}
/// Returns a new `MetaAddr`, based on the deserialized fields from a
/// gossiped peer [`Addr`][crate::protocol::external::Message::Addr] message.
pub fn new_gossiped_meta_addr(
@ -562,7 +579,8 @@ impl MetaAddrChange {
/// Return the address for this change.
pub fn addr(&self) -> SocketAddr {
match self {
NewGossiped { addr, .. }
NewInitial { addr }
| NewGossiped { addr, .. }
| NewAlternate { addr, .. }
| NewLocal { addr, .. }
| UpdateAttempt { addr }
@ -577,7 +595,8 @@ impl MetaAddrChange {
/// This method should only be used in tests.
pub fn set_addr(&mut self, new_addr: SocketAddr) {
match self {
NewGossiped { addr, .. }
NewInitial { addr }
| NewGossiped { addr, .. }
| NewAlternate { addr, .. }
| NewLocal { addr, .. }
| UpdateAttempt { addr }
@ -589,6 +608,7 @@ impl MetaAddrChange {
/// Return the untrusted services for this change, if available.
pub fn untrusted_services(&self) -> Option<PeerServices> {
match self {
NewInitial { .. } => None,
NewGossiped {
untrusted_services, ..
} => Some(*untrusted_services),
@ -607,6 +627,7 @@ impl MetaAddrChange {
/// Return the untrusted last seen time for this change, if available.
pub fn untrusted_last_seen(&self) -> Option<DateTime32> {
match self {
NewInitial { .. } => None,
NewGossiped {
untrusted_last_seen,
..
@ -623,6 +644,7 @@ impl MetaAddrChange {
/// Return the last attempt for this change, if available.
pub fn last_attempt(&self) -> Option<Instant> {
match self {
NewInitial { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
@ -638,6 +660,7 @@ impl MetaAddrChange {
/// Return the last response for this change, if available.
pub fn last_response(&self) -> Option<DateTime32> {
match self {
NewInitial { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
@ -652,9 +675,10 @@ impl MetaAddrChange {
}
}
/// Return the last attempt for this change, if available.
/// Return the last failure for this change, if available.
pub fn last_failure(&self) -> Option<Instant> {
match self {
NewInitial { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
@ -672,6 +696,7 @@ impl MetaAddrChange {
/// Return the peer connection state for this change.
pub fn peer_addr_state(&self) -> PeerAddrState {
match self {
NewInitial { .. } => NeverAttemptedGossiped,
NewGossiped { .. } => NeverAttemptedGossiped,
NewAlternate { .. } => NeverAttemptedAlternate,
// local listeners get sanitized, so the state doesn't matter here
@ -685,15 +710,18 @@ impl MetaAddrChange {
/// If this change can create a new `MetaAddr`, return that address.
pub fn into_new_meta_addr(self) -> Option<MetaAddr> {
match self {
NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => Some(MetaAddr {
addr: self.addr(),
services: self.untrusted_services(),
untrusted_last_seen: self.untrusted_last_seen(),
last_response: None,
last_attempt: None,
last_failure: None,
last_connection_state: self.peer_addr_state(),
}),
NewInitial { .. } | NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => {
Some(MetaAddr {
addr: self.addr(),
services: self.untrusted_services(),
untrusted_last_seen: self.untrusted_last_seen(),
last_response: None,
last_attempt: None,
last_failure: None,
last_connection_state: self.peer_addr_state(),
})
}
UpdateAttempt { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None,
}
}

View File

@ -52,7 +52,7 @@ use crate::{
pub struct Handshake<S, C = NoChainTip> {
config: Config,
inbound_service: S,
timestamp_collector: mpsc::Sender<MetaAddrChange>,
address_book_updater: mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
user_agent: String,
@ -304,7 +304,7 @@ impl fmt::Debug for ConnectedAddr {
pub struct Builder<S, C = NoChainTip> {
config: Option<Config>,
inbound_service: Option<S>,
timestamp_collector: Option<mpsc::Sender<MetaAddrChange>>,
address_book_updater: Option<mpsc::Sender<MetaAddrChange>>,
our_services: Option<PeerServices>,
user_agent: Option<String>,
relay: Option<bool>,
@ -346,11 +346,11 @@ where
///
/// This channel takes `MetaAddr`s, permanent addresses which can be used to
/// make outbound connections to peers.
pub fn with_timestamp_collector(
pub fn with_address_book_updater(
mut self,
timestamp_collector: mpsc::Sender<MetaAddrChange>,
address_book_updater: mpsc::Sender<MetaAddrChange>,
) -> Self {
self.timestamp_collector = Some(timestamp_collector);
self.address_book_updater = Some(address_book_updater);
self
}
@ -382,7 +382,7 @@ where
// TODO: Until Rust RFC 2528 reaches stable, we can't do `..self`
config: self.config,
inbound_service: self.inbound_service,
timestamp_collector: self.timestamp_collector,
address_book_updater: self.address_book_updater,
our_services: self.our_services,
user_agent: self.user_agent,
relay: self.relay,
@ -410,9 +410,9 @@ where
let (tx, _) = broadcast::channel(100);
tx
});
let timestamp_collector = self.timestamp_collector.unwrap_or_else(|| {
// No timestamp collector was passed, so create a stub channel.
// Dropping the receiver means sends will fail, but we don't care.
let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
// No `AddressBookUpdater` for timestamp collection was passed, so create a stub
// channel. Dropping the receiver means sends will fail, but we don't care.
let (tx, _rx) = mpsc::channel(1);
tx
});
@ -425,7 +425,7 @@ where
config,
inbound_service,
inv_collector,
timestamp_collector,
address_book_updater,
nonces,
user_agent,
our_services,
@ -449,7 +449,7 @@ where
Builder {
config: None,
inbound_service: None,
timestamp_collector: None,
address_book_updater: None,
user_agent: None,
our_services: None,
relay: None,
@ -709,7 +709,7 @@ where
// Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone();
let inbound_service = self.inbound_service.clone();
let mut timestamp_collector = self.timestamp_collector.clone();
let mut address_book_updater = self.address_book_updater.clone();
let inv_collector = self.inv_collector.clone();
let config = self.config.clone();
let user_agent = self.user_agent.clone();
@ -764,7 +764,7 @@ where
for alt_addr in alternate_addrs {
let alt_addr = MetaAddr::new_alternate(&alt_addr, &remote_services);
// awaiting a local task won't hang
let _ = timestamp_collector.send(alt_addr).await;
let _ = address_book_updater.send(alt_addr).await;
}
// Set the connection's version to the minimum of the received version or our own.
@ -818,7 +818,7 @@ where
//
// Every message and error must update the peer address state via
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let inbound_ts_collector = address_book_updater.clone();
let inv_collector = inv_collector.clone();
let ts_inner_conn_span = connection_span.clone();
let inv_inner_conn_span = connection_span.clone();
@ -939,14 +939,14 @@ where
//
// Returning from the spawned closure terminates the connection's heartbeat task.
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
let heartbeat_ts_collector = timestamp_collector.clone();
let heartbeat_ts_collector = address_book_updater.clone();
tokio::spawn(
async move {
use futures::future::Either;
let mut shutdown_rx = shutdown_rx;
let mut server_tx = server_tx;
let mut timestamp_collector = heartbeat_ts_collector.clone();
let mut heartbeat_ts_collector = heartbeat_ts_collector.clone();
let mut interval_stream =
IntervalStream::new(tokio::time::interval(constants::HEARTBEAT_INTERVAL));
@ -969,7 +969,7 @@ where
tracing::trace!("shutting down due to Client shut down");
if let Some(book_addr) = connected_addr.get_address_book_addr() {
// awaiting a local task won't hang
let _ = timestamp_collector
let _ = heartbeat_ts_collector
.send(MetaAddr::new_shutdown(&book_addr, remote_services))
.await;
}
@ -985,7 +985,7 @@ where
let heartbeat = send_one_heartbeat(&mut server_tx);
if heartbeat_timeout(
heartbeat,
&mut timestamp_collector,
&mut heartbeat_ts_collector,
&connected_addr,
&remote_services,
)
@ -1058,7 +1058,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Resu
/// `handle_heartbeat_error`.
async fn heartbeat_timeout<F, T>(
fut: F,
timestamp_collector: &mut mpsc::Sender<MetaAddrChange>,
address_book_updater: &mut mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, BoxError>
@ -1069,7 +1069,7 @@ where
Ok(inner_result) => {
handle_heartbeat_error(
inner_result,
timestamp_collector,
address_book_updater,
connected_addr,
remote_services,
)
@ -1078,7 +1078,7 @@ where
Err(elapsed) => {
handle_heartbeat_error(
Err(elapsed),
timestamp_collector,
address_book_updater,
connected_addr,
remote_services,
)
@ -1089,10 +1089,10 @@ where
Ok(t)
}
/// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`.
/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
async fn handle_heartbeat_error<T, E>(
result: Result<T, E>,
timestamp_collector: &mut mpsc::Sender<MetaAddrChange>,
address_book_updater: &mut mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, E>
@ -1105,7 +1105,7 @@ where
tracing::debug!(?err, "heartbeat error, shutting down");
if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = timestamp_collector
let _ = address_book_updater
.send(MetaAddr::new_errored(&book_addr, *remote_services))
.await;
}

View File

@ -29,11 +29,11 @@ use tracing_futures::Instrument;
use zebra_chain::{chain_tip::ChainTip, parameters::Network};
use crate::{
address_book_updater::AddressBookUpdater,
constants,
meta_addr::MetaAddr,
meta_addr::{MetaAddr, MetaAddrChange},
peer::{self, HandshakeRequest, OutboundConnectorRequest},
peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
timestamp_collector::TimestampCollector,
AddressBook, BoxError, Config, Request, Response,
};
@ -95,7 +95,7 @@ where
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (address_book, address_book_updater) = AddressBookUpdater::spawn(listen_addr);
// Create a broadcast channel for peer inventory advertisements.
// If it reaches capacity, this channel drops older inventory advertisements.
@ -118,7 +118,7 @@ where
.with_config(config.clone())
.with_inbound_service(inbound_service)
.with_inventory_collector(inv_sender)
.with_timestamp_collector(timestamp_collector)
.with_address_book_updater(address_book_updater.clone())
.with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string())
.with_latest_chain_tip(latest_chain_tip)
@ -177,6 +177,7 @@ where
config.clone(),
outbound_connector.clone(),
peerset_tx.clone(),
address_book_updater,
);
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
@ -232,6 +233,7 @@ async fn add_initial_peers<S>(
config: Config,
outbound_connector: S,
mut peerset_tx: mpsc::Sender<PeerChange>,
address_book_updater: mpsc::Sender<MetaAddrChange>,
) -> Result<ActiveConnectionCounter, BoxError>
where
S: Service<
@ -241,7 +243,7 @@ where
> + Clone,
S::Future: Send + 'static,
{
let initial_peers = limit_initial_peers(&config).await;
let initial_peers = limit_initial_peers(&config, address_book_updater).await;
let mut handshake_success_total: usize = 0;
let mut handshake_error_total: usize = 0;
@ -359,26 +361,38 @@ where
/// `peerset_initial_target_size`.
///
/// The result is randomly chosen entries from the provided set of addresses.
async fn limit_initial_peers(config: &Config) -> HashSet<SocketAddr> {
let initial_peers = config.initial_peers().await;
let initial_peer_count = initial_peers.len();
async fn limit_initial_peers(
config: &Config,
mut address_book_updater: mpsc::Sender<MetaAddrChange>,
) -> HashSet<SocketAddr> {
let all_peers = config.initial_peers().await;
let peers_count = all_peers.len();
// Limit the number of initial peers to `config.peerset_initial_target_size`
if initial_peer_count > config.peerset_initial_target_size {
info!(
"Limiting the initial peers list from {} to {}",
initial_peer_count, config.peerset_initial_target_size
);
if peers_count <= config.peerset_initial_target_size {
return all_peers;
}
let initial_peers_vect: Vec<SocketAddr> = initial_peers.iter().copied().collect();
// Limit the number of initial peers to `config.peerset_initial_target_size`
info!(
"Limiting the initial peers list from {} to {}",
peers_count, config.peerset_initial_target_size
);
// TODO: add unused peers to the AddressBook (#2931)
// https://docs.rs/rand/0.8.4/rand/seq/trait.SliceRandom.html#tymethod.partial_shuffle
initial_peers_vect
.choose_multiple(&mut rand::thread_rng(), config.peerset_initial_target_size)
.copied()
.collect()
// Split all the peers into the `initial_peers` that will be returned and
// `unused_peers` that will be sent to the address book.
let mut all_peers: Vec<SocketAddr> = all_peers.into_iter().collect();
let (initial_peers, unused_peers) =
all_peers.partial_shuffle(&mut rand::thread_rng(), config.peerset_initial_target_size);
// Send the unused peers to the address book.
for peer in unused_peers {
let peer_addr = MetaAddr::new_initial_peer(*peer);
// `send` only waits when the channel is full.
// The address book updater is a separate task, so we will only wait for a short time.
let _ = address_book_updater.send(peer_addr).await;
}
initial_peers.iter().copied().collect()
}
/// Open a peer connection listener on `config.listen_addr`,
@ -391,7 +405,7 @@ async fn limit_initial_peers(config: &Config) -> HashSet<SocketAddr> {
///
/// If opening the listener fails.
#[instrument(skip(config), fields(addr = ?config.listen_addr))]
async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
// Warn if we're configured using the wrong network port.
use Network::*;
let wrong_net = match config.network {

View File

@ -32,6 +32,7 @@ use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::Dat
use zebra_test::net::random_known_port;
use crate::{
address_book_updater::AddressBookUpdater,
constants, init,
meta_addr::MetaAddr,
peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest},
@ -1140,7 +1141,7 @@ async fn add_initial_peers_is_rate_limited() {
let before = Instant::now();
let (initial_peers_task_handle, peerset_rx) =
spawn_add_initial_peers(PEER_COUNT, outbound_connector);
spawn_add_initial_peers(PEER_COUNT, outbound_connector).await;
let connections = peerset_rx.take(PEER_COUNT).collect::<Vec<_>>().await;
let elapsed = Instant::now() - before;
@ -1161,6 +1162,43 @@ async fn add_initial_peers_is_rate_limited() {
);
}
/// Test that [`init`] does not deadlock.
#[tokio::test]
async fn network_init_deadlock() {
// The `PEER_COUNT` is the amount of initial seed peers. The value is set so
// that the peers fill up `PEERSET_INITIAL_TARGET_SIZE`, fill up the channel
// for sending unused peers to the `AddressBook`, and so that there are
// still some extra peers left.
const PEER_COUNT: usize = 200;
const PEERSET_INITIAL_TARGET_SIZE: usize = 2;
const TIME_LIMIT: Duration = Duration::from_secs(10);
zebra_test::init();
// Create a list of dummy IPs, and initialize a config using them as the
// initial peers. The amount of these peers will overflow
// `PEERSET_INITIAL_TARGET_SIZE`.
let mut peers = HashSet::new();
for address_number in 0..PEER_COUNT {
peers.insert(
SocketAddr::new(Ipv4Addr::new(127, 1, 1, address_number as _).into(), 1).to_string(),
);
}
let config = Config {
initial_mainnet_peers: peers,
peerset_initial_target_size: PEERSET_INITIAL_TARGET_SIZE,
network: Network::Mainnet,
..Config::default()
};
let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) });
let init_future = init(config, nil_inbound_service, NoChainTip);
assert!(tokio::time::timeout(TIME_LIMIT, init_future).await.is_ok());
}
/// Open a local listener on `listen_addr` for `network`.
/// Asserts that the local listener address works as expected.
async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
@ -1442,7 +1480,7 @@ where
/// Dummy IPs are used.
///
/// Returns the task [`JoinHandle`], and the peer set receiver.
fn spawn_add_initial_peers<C>(
async fn spawn_add_initial_peers<C>(
peer_count: usize,
outbound_connector: C,
) -> (
@ -1475,7 +1513,10 @@ where
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(peer_count + 1);
let add_fut = add_initial_peers(config, outbound_connector, peerset_tx);
let (_tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (_address_book, address_book_updater) = AddressBookUpdater::spawn(listen_addr);
let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater);
let add_task_handle = tokio::spawn(add_fut);
(add_task_handle, peerset_rx)