Avoid deadlocks in the address book mutex (#3244)

* Tweak crawler timings so peers are more likely to be available

* Tweak min peer connection interval so we try all peers

* Let other tasks run between fanouts, so we're more likely to choose different peers

* Let other tasks run between retries, so we're more likely to choose different peers

* Let other tasks run after peer crawler DemandDrop

This makes it more likely that peers will become ready.

* Spawn the address book updater on a blocking thread

* Spawn CandidateSet address book operations on blocking threads

* Replace the PeerSet address book with a metrics watch channel

* Fix comment

* Await spawned address book tasks

* Run the address book update tasks concurrently (except for the mutex)

* Explain an internal-only method better

* Fix a typo

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2021-12-20 10:44:43 +10:00 committed by GitHub
parent 410133435e
commit d0e6de8040
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 214 additions and 118 deletions

View File

@ -5,6 +5,7 @@ use std::{cmp::Reverse, iter::Extend, net::SocketAddr, time::Instant};
use chrono::Utc; use chrono::Utc;
use ordered_map::OrderedMap; use ordered_map::OrderedMap;
use tokio::sync::watch;
use tracing::Span; use tracing::Span;
use crate::{ use crate::{
@ -48,7 +49,7 @@ mod tests;
/// Updates must not be based on: /// Updates must not be based on:
/// - the remote addresses of inbound connections, or /// - the remote addresses of inbound connections, or
/// - the canonical address of any connection. /// - the canonical address of any connection.
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct AddressBook { pub struct AddressBook {
/// Peer listener addresses, suitable for outbound connections, /// Peer listener addresses, suitable for outbound connections,
/// in connection attempt order. /// in connection attempt order.
@ -71,12 +72,15 @@ pub struct AddressBook {
/// The span for operations on this address book. /// The span for operations on this address book.
span: Span, span: Span,
/// A channel used to send the latest address book metrics.
address_metrics_tx: watch::Sender<AddressMetrics>,
/// The last time we logged a message about the address metrics. /// The last time we logged a message about the address metrics.
last_address_log: Option<Instant>, last_address_log: Option<Instant>,
} }
/// Metrics about the states of the addresses in an [`AddressBook`]. /// Metrics about the states of the addresses in an [`AddressBook`].
#[derive(Debug)] #[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct AddressMetrics { pub struct AddressMetrics {
/// The number of addresses in the `Responded` state. /// The number of addresses in the `Responded` state.
responded: usize, responded: usize,
@ -111,11 +115,16 @@ impl AddressBook {
let instant_now = Instant::now(); let instant_now = Instant::now();
let chrono_now = Utc::now(); let chrono_now = Utc::now();
// The default value is correct for an empty address book,
// and it gets replaced by `update_metrics` anyway.
let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default());
let mut new_book = AddressBook { let mut new_book = AddressBook {
by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)), by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK, addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
local_listener: canonical_socket_addr(local_listener), local_listener: canonical_socket_addr(local_listener),
span, span,
address_metrics_tx,
last_address_log: None, last_address_log: None,
}; };
@ -169,6 +178,17 @@ impl AddressBook {
new_book new_book
} }
/// Return a watch channel for the address book metrics.
///
/// The metrics in the watch channel are only updated when the address book updates,
/// so they can be significantly outdated if Zebra is disconnected or hung.
///
/// The current metrics value is marked as seen.
/// So `Receiver::changed` will only return after the next address book update.
pub fn address_metrics_watcher(&self) -> watch::Receiver<AddressMetrics> {
self.address_metrics_tx.subscribe()
}
/// Get the local listener address. /// Get the local listener address.
/// ///
/// This address contains minimal state, but it is not sanitized. /// This address contains minimal state, but it is not sanitized.
@ -422,7 +442,25 @@ impl AddressBook {
} }
/// Returns metrics for the addresses in this address book. /// Returns metrics for the addresses in this address book.
/// Only for use in tests.
///
/// # Correctness
///
/// Use [`AddressBook::address_metrics_watcher().borrow()`] in production code,
/// to avoid deadlocks.
#[cfg(test)]
pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics { pub fn address_metrics(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
self.address_metrics_internal(now)
}
/// Returns metrics for the addresses in this address book.
///
/// # Correctness
///
/// External callers should use [`AddressBook::address_metrics_watcher().borrow()`]
/// in production code, to avoid deadlocks.
/// (Using the watch channel receiver does not lock the address book mutex.)
fn address_metrics_internal(&self, now: chrono::DateTime<Utc>) -> AddressMetrics {
let responded = self.state_peers(PeerAddrState::Responded).count(); let responded = self.state_peers(PeerAddrState::Responded).count();
let never_attempted_gossiped = self let never_attempted_gossiped = self
.state_peers(PeerAddrState::NeverAttemptedGossiped) .state_peers(PeerAddrState::NeverAttemptedGossiped)
@ -453,7 +491,10 @@ impl AddressBook {
fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) { fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime<Utc>) {
let _guard = self.span.enter(); let _guard = self.span.enter();
let m = self.address_metrics(chrono_now); let m = self.address_metrics_internal(chrono_now);
// Ignore errors: we don't care if any receivers are listening.
let _ = self.address_metrics_tx.send(m);
// TODO: rename to address_book.[state_name] // TODO: rename to address_book.[state_name]
metrics::gauge!("candidate_set.responded", m.responded as f64); metrics::gauge!("candidate_set.responded", m.responded as f64);
@ -536,3 +577,26 @@ impl Extend<MetaAddrChange> for AddressBook {
} }
} }
} }
impl Clone for AddressBook {
/// Clone the addresses, address limit, local listener address, and span.
///
/// Cloned address books have a separate metrics struct watch channel, and an empty last address log.
///
/// All address books update the same prometheus metrics.
fn clone(&self) -> AddressBook {
// The existing metrics might be outdated, but we avoid calling `update_metrics`,
// so we don't overwrite the prometheus metrics from the main address book.
let (address_metrics_tx, _address_metrics_rx) =
watch::channel(*self.address_metrics_tx.borrow());
AddressBook {
by_addr: self.by_addr.clone(),
addr_limit: self.addr_limit,
local_listener: self.local_listener,
span: self.span.clone(),
address_metrics_tx,
last_address_log: None,
}
}
}

View File

@ -2,11 +2,15 @@
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use futures::{channel::mpsc, prelude::*};
use thiserror::Error; use thiserror::Error;
use tokio::task::JoinHandle; use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};
use crate::{meta_addr::MetaAddrChange, AddressBook, BoxError, Config}; use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
};
/// The `AddressBookUpdater` hooks into incoming message streams for each peer /// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For /// and lets the owner of the sender handle update the address book. For
@ -24,15 +28,19 @@ impl AddressBookUpdater {
/// configured with Zebra's actual `local_listener` address. /// configured with Zebra's actual `local_listener` address.
/// ///
/// Returns handles for: /// Returns handles for:
/// - the address book,
/// - the transmission channel for address book update events, /// - the transmission channel for address book update events,
/// - the address book, and /// - a watch channel for address book metrics, and
/// - the address book updater task. /// - the address book updater task join handle.
///
/// The task exits with an error when the returned [`mpsc::Sender`] is closed.
pub fn spawn( pub fn spawn(
config: &Config, config: &Config,
local_listener: SocketAddr, local_listener: SocketAddr,
) -> ( ) -> (
Arc<std::sync::Mutex<AddressBook>>, Arc<std::sync::Mutex<AddressBook>>,
mpsc::Sender<MetaAddrChange>, mpsc::Sender<MetaAddrChange>,
watch::Receiver<AddressMetrics>,
JoinHandle<Result<(), BoxError>>, JoinHandle<Result<(), BoxError>>,
) { ) {
use tracing::Level; use tracing::Level;
@ -41,14 +49,14 @@ impl AddressBookUpdater {
// based on the maximum number of inbound and outbound peers. // based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit()); let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new( let address_book =
local_listener, AddressBook::new(local_listener, span!(Level::TRACE, "address book updater"));
span!(Level::TRACE, "address book updater"), let address_metrics = address_book.address_metrics_watcher();
))); let address_book = Arc::new(std::sync::Mutex::new(address_book));
let worker_address_book = address_book.clone();
let worker = async move { let worker_address_book = address_book.clone();
while let Some(event) = worker_rx.next().await { let worker = move || {
while let Some(event) = worker_rx.blocking_recv() {
// # Correctness // # Correctness
// //
// Briefly hold the address book threaded mutex, to update the // Briefly hold the address book threaded mutex, to update the
@ -62,8 +70,15 @@ impl AddressBookUpdater {
Err(AllAddressBookUpdaterSendersClosed.into()) Err(AllAddressBookUpdaterSendersClosed.into())
}; };
let address_book_updater_task_handle = tokio::spawn(worker.boxed()); // Correctness: spawn address book accesses on a blocking thread,
// to avoid deadlocks (see #1976)
let address_book_updater_task_handle = tokio::task::spawn_blocking(worker);
(address_book, worker_tx, address_book_updater_task_handle) (
address_book,
worker_tx,
address_metrics,
address_book_updater_task_handle,
)
} }
} }

View File

@ -10,10 +10,7 @@ use std::{
}; };
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use futures::{ use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt};
channel::{mpsc, oneshot},
future, FutureExt, SinkExt, StreamExt,
};
use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout};
use tokio_stream::wrappers::IntervalStream; use tokio_stream::wrappers::IntervalStream;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
@ -54,7 +51,7 @@ use crate::{
pub struct Handshake<S, C = NoChainTip> { pub struct Handshake<S, C = NoChainTip> {
config: Config, config: Config,
inbound_service: S, inbound_service: S,
address_book_updater: mpsc::Sender<MetaAddrChange>, address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>,
nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>, nonces: Arc<futures::lock::Mutex<HashSet<Nonce>>>,
user_agent: String, user_agent: String,
@ -306,7 +303,7 @@ impl fmt::Debug for ConnectedAddr {
pub struct Builder<S, C = NoChainTip> { pub struct Builder<S, C = NoChainTip> {
config: Option<Config>, config: Option<Config>,
inbound_service: Option<S>, inbound_service: Option<S>,
address_book_updater: Option<mpsc::Sender<MetaAddrChange>>, address_book_updater: Option<tokio::sync::mpsc::Sender<MetaAddrChange>>,
our_services: Option<PeerServices>, our_services: Option<PeerServices>,
user_agent: Option<String>, user_agent: Option<String>,
relay: Option<bool>, relay: Option<bool>,
@ -350,7 +347,7 @@ where
/// make outbound connections to peers. /// make outbound connections to peers.
pub fn with_address_book_updater( pub fn with_address_book_updater(
mut self, mut self,
address_book_updater: mpsc::Sender<MetaAddrChange>, address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> Self { ) -> Self {
self.address_book_updater = Some(address_book_updater); self.address_book_updater = Some(address_book_updater);
self self
@ -415,7 +412,7 @@ where
let address_book_updater = self.address_book_updater.unwrap_or_else(|| { let address_book_updater = self.address_book_updater.unwrap_or_else(|| {
// No `AddressBookUpdater` for timestamp collection was passed, so create a stub // No `AddressBookUpdater` for timestamp collection was passed, so create a stub
// channel. Dropping the receiver means sends will fail, but we don't care. // channel. Dropping the receiver means sends will fail, but we don't care.
let (tx, _rx) = mpsc::channel(1); let (tx, _rx) = tokio::sync::mpsc::channel(1);
tx tx
}); });
let nonces = Arc::new(futures::lock::Mutex::new(HashSet::new())); let nonces = Arc::new(futures::lock::Mutex::new(HashSet::new()));
@ -713,7 +710,7 @@ where
// Clone these upfront, so they can be moved into the future. // Clone these upfront, so they can be moved into the future.
let nonces = self.nonces.clone(); let nonces = self.nonces.clone();
let inbound_service = self.inbound_service.clone(); let inbound_service = self.inbound_service.clone();
let mut address_book_updater = self.address_book_updater.clone(); let address_book_updater = self.address_book_updater.clone();
let inv_collector = self.inv_collector.clone(); let inv_collector = self.inv_collector.clone();
let config = self.config.clone(); let config = self.config.clone();
let user_agent = self.user_agent.clone(); let user_agent = self.user_agent.clone();
@ -787,7 +784,7 @@ where
// These channels should not be cloned more than they are // These channels should not be cloned more than they are
// in this block, see constants.rs for more. // in this block, see constants.rs for more.
let (server_tx, server_rx) = mpsc::channel(0); let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (shutdown_tx, shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default(); let error_slot = ErrorSlot::default();
@ -831,7 +828,7 @@ where
.then(move |msg| { .then(move |msg| {
// Add a metric for inbound messages and errors. // Add a metric for inbound messages and errors.
// Fire a timestamp or failure event. // Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone(); let inbound_ts_collector = inbound_ts_collector.clone();
let span = let span =
debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector"); debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
async move { async move {
@ -1018,7 +1015,9 @@ where
} }
/// Send one heartbeat using `server_tx`. /// Send one heartbeat using `server_tx`.
async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Result<(), BoxError> { async fn send_one_heartbeat(
server_tx: &mut futures::channel::mpsc::Sender<ClientRequest>,
) -> Result<(), BoxError> {
// We just reached a heartbeat interval, so start sending // We just reached a heartbeat interval, so start sending
// a heartbeat. // a heartbeat.
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@ -1065,7 +1064,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender<ClientRequest>) -> Resu
/// `handle_heartbeat_error`. /// `handle_heartbeat_error`.
async fn heartbeat_timeout<F, T>( async fn heartbeat_timeout<F, T>(
fut: F, fut: F,
address_book_updater: &mut mpsc::Sender<MetaAddrChange>, address_book_updater: &mut tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr, connected_addr: &ConnectedAddr,
remote_services: &PeerServices, remote_services: &PeerServices,
) -> Result<T, BoxError> ) -> Result<T, BoxError>
@ -1099,7 +1098,7 @@ where
/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`. /// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`.
async fn handle_heartbeat_error<T, E>( async fn handle_heartbeat_error<T, E>(
result: Result<T, E>, result: Result<T, E>,
address_book_updater: &mut mpsc::Sender<MetaAddrChange>, address_book_updater: &mut tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr, connected_addr: &ConnectedAddr,
remote_services: &PeerServices, remote_services: &PeerServices,
) -> Result<T, E> ) -> Result<T, E>

View File

@ -8,7 +8,8 @@ use tower::{Service, ServiceExt};
use zebra_chain::serialization::DateTime32; use zebra_chain::serialization::DateTime32;
use crate::{ use crate::{
constants, peer_set::set::MorePeers, types::MetaAddr, AddressBook, BoxError, Request, Response, constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook,
BoxError, Request, Response,
}; };
#[cfg(test)] #[cfg(test)]
@ -115,8 +116,10 @@ mod tests;
// * show that seed peers that transition to other never attempted // * show that seed peers that transition to other never attempted
// states are already in the address book // states are already in the address book
pub(crate) struct CandidateSet<S> { pub(crate) struct CandidateSet<S> {
pub(super) address_book: Arc<std::sync::Mutex<AddressBook>>, // Correctness: the address book must be private,
pub(super) peer_service: S, // so all operations are performed on a blocking thread (see #1976).
address_book: Arc<std::sync::Mutex<AddressBook>>,
peer_service: S,
min_next_handshake: Instant, min_next_handshake: Instant,
min_next_crawl: Instant, min_next_crawl: Instant,
} }
@ -271,6 +274,8 @@ where
responses.push(peer_service.call(Request::Peers)); responses.push(peer_service.call(Request::Peers));
} }
let mut address_book_updates = FuturesUnordered::new();
// Process responses // Process responses
while let Some(rsp) = responses.next().await { while let Some(rsp) = responses.next().await {
match rsp { match rsp {
@ -281,7 +286,7 @@ where
"got response to GetPeers" "got response to GetPeers"
); );
let addrs = validate_addrs(addrs, DateTime32::now()); let addrs = validate_addrs(addrs, DateTime32::now());
self.send_addrs(addrs); address_book_updates.push(self.send_addrs(addrs));
more_peers = Some(MorePeers); more_peers = Some(MorePeers);
} }
Err(e) => { Err(e) => {
@ -293,25 +298,37 @@ where
} }
} }
// Wait until all the address book updates have finished
while let Some(()) = address_book_updates.next().await {}
Ok(more_peers) Ok(more_peers)
} }
/// Add new `addrs` to the address book. /// Add new `addrs` to the address book.
fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) { async fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
let addrs = addrs let addrs: Vec<MetaAddrChange> = addrs
.into_iter() .into_iter()
.map(MetaAddr::new_gossiped_change) .map(MetaAddr::new_gossiped_change)
.map(|maybe_addr| { .map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set"))
maybe_addr.expect("Received gossiped peers always have services set") .collect();
});
debug!(count = ?addrs.len(), "sending gossiped addresses to the address book");
// Don't bother spawning a task if there are no addresses left.
if addrs.is_empty() {
return;
}
// # Correctness // # Correctness
// //
// Briefly hold the address book threaded mutex, to extend // Spawn address book accesses on a blocking thread,
// the address list. // to avoid deadlocks (see #1976).
// //
// Extend handles duplicate addresses internally. // Extend handles duplicate addresses internally.
self.address_book.lock().unwrap().extend(addrs); let address_book = self.address_book.clone();
tokio::task::spawn_blocking(move || address_book.lock().unwrap().extend(addrs))
.await
.expect("panic in new peers address book update task");
} }
/// Returns the next candidate for a connection attempt, if any are available. /// Returns the next candidate for a connection attempt, if any are available.
@ -335,19 +352,10 @@ where
/// new peer connections are initiated at least /// new peer connections are initiated at least
/// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart. /// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart.
pub async fn next(&mut self) -> Option<MetaAddr> { pub async fn next(&mut self) -> Option<MetaAddr> {
// # Correctness // Correctness: To avoid hangs, computation in the critical section should be kept to a minimum.
// let address_book = self.address_book.clone();
// In this critical section, we hold the address mutex, blocking the let next_peer = move || -> Option<MetaAddr> {
// current thread, and all async tasks scheduled on that thread. let mut guard = address_book.lock().unwrap();
//
// To avoid deadlocks, the critical section:
// - must not acquire any other locks
// - must not await any futures
//
// To avoid hangs, any computation in the critical section should
// be kept to a minimum.
let reconnect = {
let mut guard = self.address_book.lock().unwrap();
// Now we have the lock, get the current time // Now we have the lock, get the current time
let instant_now = std::time::Instant::now(); let instant_now = std::time::Instant::now();
@ -355,27 +363,43 @@ where
// It's okay to return without sleeping here, because we're returning // It's okay to return without sleeping here, because we're returning
// `None`. We only need to sleep before yielding an address. // `None`. We only need to sleep before yielding an address.
let reconnect = guard.reconnection_peers(instant_now, chrono_now).next()?; let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?;
let reconnect = MetaAddr::new_reconnect(&reconnect.addr); // TODO: only mark the peer as AttemptPending when it is actually used (#1976)
guard.update(reconnect)? //
// If the future is dropped before `next` returns, the peer will be marked as AttemptPending,
// even if its address is not actually used for a connection.
//
// We could send a reconnect change to the AddressBookUpdater when the peer is actually used,
// but channel order is not guaranteed, so we could accidentally re-use the same peer.
let next_peer = MetaAddr::new_reconnect(&next_peer.addr);
guard.update(next_peer)
}; };
// SECURITY: rate-limit new outbound peer connections // Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
let next_peer = tokio::task::spawn_blocking(next_peer)
.await
.expect("panic in next peer address book task")?;
// Security: rate-limit new outbound peer connections
sleep_until(self.min_next_handshake).await; sleep_until(self.min_next_handshake).await;
self.min_next_handshake = Instant::now() + constants::MIN_PEER_CONNECTION_INTERVAL; self.min_next_handshake = Instant::now() + constants::MIN_PEER_CONNECTION_INTERVAL;
Some(reconnect) Some(next_peer)
} }
/// Mark `addr` as a failed peer. /// Mark `addr` as a failed peer.
pub fn report_failed(&mut self, addr: &MetaAddr) { pub async fn report_failed(&mut self, addr: &MetaAddr) {
let addr = MetaAddr::new_errored(&addr.addr, addr.services); let addr = MetaAddr::new_errored(&addr.addr, addr.services);
// # Correctness // # Correctness
// //
// Briefly hold the address book threaded mutex, to update the state for // Spawn address book accesses on a blocking thread,
// a single address. // to avoid deadlocks (see #1976).
self.address_book.lock().unwrap().update(addr); let address_book = self.address_book.clone();
tokio::task::spawn_blocking(move || address_book.lock().unwrap().update(addr))
.await
.expect("panic in peer failure address book update task");
} }
} }

View File

@ -6,7 +6,6 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use futures::FutureExt;
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use tokio::time::{sleep, timeout}; use tokio::time::{sleep, timeout};
use tracing::Span; use tracing::Span;
@ -71,8 +70,12 @@ proptest! {
// Make sure that the rate-limit is never triggered, even after multiple calls // Make sure that the rate-limit is never triggered, even after multiple calls
for _ in 0..next_peer_attempts { for _ in 0..next_peer_attempts {
// An empty address book immediately returns "no next peer" // An empty address book immediately returns "no next peer".
assert!(matches!(candidate_set.next().now_or_never(), Some(None))); //
// Check that it takes less than the peer set candidate delay,
// and hope that is enough time for test machines with high CPU load.
let less_than_min_interval = MIN_PEER_CONNECTION_INTERVAL - Duration::from_millis(1);
assert_eq!(runtime.block_on(timeout(less_than_min_interval, candidate_set.next())), Ok(None));
} }
} }
} }

View File

@ -6,7 +6,6 @@
use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use futures::{ use futures::{
channel::mpsc,
future::{self, FutureExt}, future::{self, FutureExt},
sink::SinkExt, sink::SinkExt,
stream::{FuturesUnordered, StreamExt, TryStreamExt}, stream::{FuturesUnordered, StreamExt, TryStreamExt},
@ -66,7 +65,9 @@ type DiscoveredPeer = Result<(SocketAddr, peer::Client), BoxError>;
/// ///
/// In addition to returning a service for outbound requests, this method /// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for /// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers. /// connected peers. The shared address book should be accessed using a
/// [blocking thread](https://docs.rs/tokio/1.15.0/tokio/task/index.html#blocking-and-yielding),
/// to avoid async task deadlocks.
/// ///
/// # Panics /// # Panics
/// ///
@ -94,7 +95,7 @@ where
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, address_book_updater, address_book_updater_guard) = let (address_book, address_book_updater, address_metrics, address_book_updater_guard) =
AddressBookUpdater::spawn(&config, listen_addr); AddressBookUpdater::spawn(&config, listen_addr);
// Create a broadcast channel for peer inventory advertisements. // Create a broadcast channel for peer inventory advertisements.
@ -134,7 +135,7 @@ where
// Create an mpsc channel for peer changes, // Create an mpsc channel for peer changes,
// based on the maximum number of inbound and outbound peers. // based on the maximum number of inbound and outbound peers.
let (peerset_tx, peerset_rx) = let (peerset_tx, peerset_rx) =
mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit()); futures::channel::mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit());
let discovered_peers = peerset_rx let discovered_peers = peerset_rx
// Discover interprets an error as stream termination, // Discover interprets an error as stream termination,
@ -145,7 +146,7 @@ where
// Create an mpsc channel for peerset demand signaling, // Create an mpsc channel for peerset demand signaling,
// based on the maximum number of outbound peers. // based on the maximum number of outbound peers.
let (mut demand_tx, demand_rx) = let (mut demand_tx, demand_rx) =
mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit()); futures::channel::mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());
// Create a oneshot to send background task JoinHandles to the peer set // Create a oneshot to send background task JoinHandles to the peer set
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
@ -157,7 +158,7 @@ where
demand_tx.clone(), demand_tx.clone(),
handle_rx, handle_rx,
inv_receiver, inv_receiver,
address_book.clone(), address_metrics,
MinimumPeerVersion::new(latest_chain_tip, config.network), MinimumPeerVersion::new(latest_chain_tip, config.network),
); );
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
@ -241,8 +242,8 @@ where
async fn add_initial_peers<S>( async fn add_initial_peers<S>(
config: Config, config: Config,
outbound_connector: S, outbound_connector: S,
mut peerset_tx: mpsc::Sender<DiscoveredPeer>, mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
address_book_updater: mpsc::Sender<MetaAddrChange>, address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> Result<ActiveConnectionCounter, BoxError> ) -> Result<ActiveConnectionCounter, BoxError>
where where
S: Service<OutboundConnectorRequest, Response = (SocketAddr, peer::Client), Error = BoxError> S: Service<OutboundConnectorRequest, Response = (SocketAddr, peer::Client), Error = BoxError>
@ -379,7 +380,7 @@ where
/// Sends any unused entries to the `address_book_updater`. /// Sends any unused entries to the `address_book_updater`.
async fn limit_initial_peers( async fn limit_initial_peers(
config: &Config, config: &Config,
mut address_book_updater: mpsc::Sender<MetaAddrChange>, address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> HashSet<SocketAddr> { ) -> HashSet<SocketAddr> {
let all_peers = config.initial_peers().await; let all_peers = config.initial_peers().await;
let peers_count = all_peers.len(); let peers_count = all_peers.len();
@ -475,7 +476,7 @@ async fn accept_inbound_connections<S>(
config: Config, config: Config,
listener: TcpListener, listener: TcpListener,
mut handshaker: S, mut handshaker: S,
peerset_tx: mpsc::Sender<DiscoveredPeer>, peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
) -> Result<(), BoxError> ) -> Result<(), BoxError>
where where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone, S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
@ -623,11 +624,11 @@ enum CrawlerAction {
))] ))]
async fn crawl_and_dial<C, S>( async fn crawl_and_dial<C, S>(
config: Config, config: Config,
mut demand_tx: mpsc::Sender<MorePeers>, mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
mut demand_rx: mpsc::Receiver<MorePeers>, mut demand_rx: futures::channel::mpsc::Receiver<MorePeers>,
mut candidates: CandidateSet<S>, mut candidates: CandidateSet<S>,
outbound_connector: C, outbound_connector: C,
mut peerset_tx: mpsc::Sender<DiscoveredPeer>, mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
mut active_outbound_connections: ActiveConnectionCounter, mut active_outbound_connections: ActiveConnectionCounter,
) -> Result<(), BoxError> ) -> Result<(), BoxError>
where where
@ -765,7 +766,7 @@ where
// The connection was never opened, or it failed the handshake and was dropped. // The connection was never opened, or it failed the handshake and was dropped.
debug!(?failed_addr.addr, "marking candidate as failed"); debug!(?failed_addr.addr, "marking candidate as failed");
candidates.report_failed(&failed_addr); candidates.report_failed(&failed_addr).await;
// The demand signal that was taken out of the queue // The demand signal that was taken out of the queue
// to attempt to connect to the failed candidate never // to attempt to connect to the failed candidate never
// turned into a connection, so add it back: // turned into a connection, so add it back:

View File

@ -1560,7 +1560,7 @@ where
let (peerset_tx, peerset_rx) = mpsc::channel::<DiscoveredPeer>(peer_count + 1); let (peerset_tx, peerset_rx) = mpsc::channel::<DiscoveredPeer>(peer_count + 1);
let (_address_book, address_book_updater, address_book_updater_guard) = let (_address_book, address_book_updater, _address_metrics, address_book_updater_guard) =
AddressBookUpdater::spawn(&config, unused_v4); AddressBookUpdater::spawn(&config, unused_v4);
let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater); let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater);

View File

@ -97,12 +97,10 @@ use std::{
marker::PhantomData, marker::PhantomData,
net::SocketAddr, net::SocketAddr,
pin::Pin, pin::Pin,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Instant, time::Instant,
}; };
use chrono::Utc;
use futures::{ use futures::{
channel::{mpsc, oneshot}, channel::{mpsc, oneshot},
future::{FutureExt, TryFutureExt}, future::{FutureExt, TryFutureExt},
@ -110,7 +108,7 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use tokio::{ use tokio::{
sync::{broadcast, oneshot::error::TryRecvError}, sync::{broadcast, oneshot::error::TryRecvError, watch},
task::JoinHandle, task::JoinHandle,
}; };
use tower::{ use tower::{
@ -122,6 +120,7 @@ use tower::{
use zebra_chain::chain_tip::ChainTip; use zebra_chain::chain_tip::ChainTip;
use crate::{ use crate::{
address_book::AddressMetrics,
peer::{LoadTrackedClient, MinimumPeerVersion}, peer::{LoadTrackedClient, MinimumPeerVersion},
peer_set::{ peer_set::{
unready_service::{Error as UnreadyError, UnreadyService}, unready_service::{Error as UnreadyError, UnreadyService},
@ -131,7 +130,7 @@ use crate::{
external::InventoryHash, external::InventoryHash,
internal::{Request, Response}, internal::{Request, Response},
}, },
AddressBook, BoxError, Config, BoxError, Config,
}; };
#[cfg(test)] #[cfg(test)]
@ -210,10 +209,10 @@ where
/// the `PeerSet` propagate errors from background tasks back to the user /// the `PeerSet` propagate errors from background tasks back to the user
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>, guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
/// A shared list of peer addresses. /// Address book metrics watch channel.
/// ///
/// Used for logging diagnostics. /// Used for logging diagnostics.
address_book: Arc<std::sync::Mutex<AddressBook>>, address_metrics: watch::Receiver<AddressMetrics>,
/// The last time we logged a message about the peer set size /// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>, last_peer_log: Option<Instant>,
@ -266,7 +265,7 @@ where
demand_signal: mpsc::Sender<MorePeers>, demand_signal: mpsc::Sender<MorePeers>,
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>, handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
address_book: Arc<std::sync::Mutex<AddressBook>>, address_metrics: watch::Receiver<AddressMetrics>,
minimum_peer_version: MinimumPeerVersion<C>, minimum_peer_version: MinimumPeerVersion<C>,
) -> Self { ) -> Self {
Self { Self {
@ -287,7 +286,7 @@ where
// Metrics // Metrics
last_peer_log: None, last_peer_log: None,
address_book, address_metrics,
peerset_total_connection_limit: config.peerset_total_connection_limit(), peerset_total_connection_limit: config.peerset_total_connection_limit(),
// Real-time parameters // Real-time parameters
@ -770,19 +769,7 @@ where
self.last_peer_log = Some(Instant::now()); self.last_peer_log = Some(Instant::now());
// # Correctness let address_metrics = self.address_metrics.borrow();
//
// Only log address metrics in exceptional circumstances, to avoid lock contention.
//
// Get the current time after acquiring the address book lock.
//
// TODO: replace with a watch channel that is updated in `AddressBook::update_metrics()`,
// or turn the address book into a service (#1976)
let address_metrics = self
.address_book
.lock()
.unwrap()
.address_metrics(Utc::now());
if unready_services_len == 0 { if unready_services_len == 0 {
warn!( warn!(
?address_metrics, ?address_metrics,
@ -809,12 +796,7 @@ where
// Security: make sure we haven't exceeded the connection limit // Security: make sure we haven't exceeded the connection limit
if num_peers > self.peerset_total_connection_limit { if num_peers > self.peerset_total_connection_limit {
// Correctness: Get the current time after acquiring the address book lock. let address_metrics = self.address_metrics.borrow();
let address_metrics = self
.address_book
.lock()
.unwrap()
.address_metrics(Utc::now());
panic!( panic!(
"unexpectedly exceeded configured peer set connection limit: \n\ "unexpectedly exceeded configured peer set connection limit: \n\
peers: {:?}, ready: {:?}, unready: {:?}, \n\ peers: {:?}, ready: {:?}, unready: {:?}, \n\

View File

@ -6,7 +6,10 @@ use futures::{
}; };
use proptest::{collection::vec, prelude::*}; use proptest::{collection::vec, prelude::*};
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;
use tokio::{sync::broadcast, task::JoinHandle}; use tokio::{
sync::{broadcast, watch},
task::JoinHandle,
};
use tower::{ use tower::{
discover::{Change, Discover}, discover::{Change, Discover},
BoxError, BoxError,
@ -21,6 +24,7 @@ use zebra_chain::{
use super::MorePeers; use super::MorePeers;
use crate::{ use crate::{
address_book::AddressMetrics,
peer::{ peer::{
CancelHeartbeatTask, Client, ClientRequest, ErrorSlot, LoadTrackedClient, CancelHeartbeatTask, Client, ClientRequest, ErrorSlot, LoadTrackedClient,
MinimumPeerVersion, MinimumPeerVersion,
@ -224,7 +228,7 @@ where
.inv_stream .inv_stream
.unwrap_or_else(|| guard.create_inventory_receiver()); .unwrap_or_else(|| guard.create_inventory_receiver());
let address_book = guard.prepare_address_book(self.address_book); let address_metrics = guard.prepare_address_book(self.address_book);
let peer_set = PeerSet::new( let peer_set = PeerSet::new(
&config, &config,
@ -232,7 +236,7 @@ where
demand_signal, demand_signal,
handle_rx, handle_rx,
inv_stream, inv_stream,
address_book, address_metrics,
minimum_peer_version, minimum_peer_version,
); );
@ -302,17 +306,21 @@ impl PeerSetGuard {
/// inside the [`PeerSetGuard`] to keep track of it. Otherwise, a new instance is created with /// inside the [`PeerSetGuard`] to keep track of it. Otherwise, a new instance is created with
/// the [`Self::fallback_address_book`] method. /// the [`Self::fallback_address_book`] method.
/// ///
/// A reference to the [`AddressBook`] instance tracked by the [`PeerSetGuard`] is returned to /// Returns a metrics watch channel for the [`AddressBook`] instance tracked by the [`PeerSetGuard`],
/// be passed to the [`PeerSet`] constructor. /// so it can be passed to the [`PeerSet`] constructor.
pub fn prepare_address_book( pub fn prepare_address_book(
&mut self, &mut self,
maybe_address_book: Option<Arc<std::sync::Mutex<AddressBook>>>, maybe_address_book: Option<Arc<std::sync::Mutex<AddressBook>>>,
) -> Arc<std::sync::Mutex<AddressBook>> { ) -> watch::Receiver<AddressMetrics> {
let address_book = maybe_address_book.unwrap_or_else(Self::fallback_address_book); let address_book = maybe_address_book.unwrap_or_else(Self::fallback_address_book);
let metrics_watcher = address_book
.lock()
.expect("unexpected panic in previous address book mutex guard")
.address_metrics_watcher();
self.address_book = Some(address_book.clone()); self.address_book = Some(address_book);
address_book metrics_watcher
} }
/// Create an empty [`AddressBook`] instance using a dummy local listener address. /// Create an empty [`AddressBook`] instance using a dummy local listener address.