From d0e6de8040cd5c252caacc65153384d5c03309cf Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 20 Dec 2021 10:44:43 +1000 Subject: [PATCH] Avoid deadlocks in the address book mutex (#3244) * Tweak crawler timings so peers are more likely to be available * Tweak min peer connection interval so we try all peers * Let other tasks run between fanouts, so we're more likely to choose different peers * Let other tasks run between retries, so we're more likely to choose different peers * Let other tasks run after peer crawler DemandDrop This makes it more likely that peers will become ready. * Spawn the address book updater on a blocking thread * Spawn CandidateSet address book operations on blocking threads * Replace the PeerSet address book with a metrics watch channel * Fix comment * Await spawned address book tasks * Run the address book update tasks concurrently (except for the mutex) * Explain an internal-only method better * Fix a typo Co-authored-by: Alfredo Garcia Co-authored-by: Alfredo Garcia --- zebra-network/src/address_book.rs | 70 +++++++++++++- zebra-network/src/address_book_updater.rs | 43 ++++++--- zebra-network/src/peer/handshake.rs | 27 +++--- zebra-network/src/peer_set/candidate_set.rs | 92 ++++++++++++------- .../src/peer_set/candidate_set/tests/prop.rs | 9 +- zebra-network/src/peer_set/initialize.rs | 29 +++--- .../src/peer_set/initialize/tests/vectors.rs | 2 +- zebra-network/src/peer_set/set.rs | 36 ++------ zebra-network/src/peer_set/set/tests.rs | 24 +++-- 9 files changed, 214 insertions(+), 118 deletions(-) diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index d1a4e82f2..7986b1915 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -5,6 +5,7 @@ use std::{cmp::Reverse, iter::Extend, net::SocketAddr, time::Instant}; use chrono::Utc; use ordered_map::OrderedMap; +use tokio::sync::watch; use tracing::Span; use crate::{ @@ -48,7 +49,7 @@ mod tests; /// Updates must not be based on: /// - the remote addresses of inbound connections, or /// - the canonical address of any connection. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct AddressBook { /// Peer listener addresses, suitable for outbound connections, /// in connection attempt order. @@ -71,12 +72,15 @@ pub struct AddressBook { /// The span for operations on this address book. span: Span, + /// A channel used to send the latest address book metrics. + address_metrics_tx: watch::Sender, + /// The last time we logged a message about the address metrics. last_address_log: Option, } /// Metrics about the states of the addresses in an [`AddressBook`]. -#[derive(Debug)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)] pub struct AddressMetrics { /// The number of addresses in the `Responded` state. responded: usize, @@ -111,11 +115,16 @@ impl AddressBook { let instant_now = Instant::now(); let chrono_now = Utc::now(); + // The default value is correct for an empty address book, + // and it gets replaced by `update_metrics` anyway. + let (address_metrics_tx, _address_metrics_rx) = watch::channel(AddressMetrics::default()); + let mut new_book = AddressBook { by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)), addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK, local_listener: canonical_socket_addr(local_listener), span, + address_metrics_tx, last_address_log: None, }; @@ -169,6 +178,17 @@ impl AddressBook { new_book } + /// Return a watch channel for the address book metrics. + /// + /// The metrics in the watch channel are only updated when the address book updates, + /// so they can be significantly outdated if Zebra is disconnected or hung. + /// + /// The current metrics value is marked as seen. + /// So `Receiver::changed` will only return after the next address book update. + pub fn address_metrics_watcher(&self) -> watch::Receiver { + self.address_metrics_tx.subscribe() + } + /// Get the local listener address. /// /// This address contains minimal state, but it is not sanitized. @@ -422,7 +442,25 @@ impl AddressBook { } /// Returns metrics for the addresses in this address book. + /// Only for use in tests. + /// + /// # Correctness + /// + /// Use [`AddressBook::address_metrics_watcher().borrow()`] in production code, + /// to avoid deadlocks. + #[cfg(test)] pub fn address_metrics(&self, now: chrono::DateTime) -> AddressMetrics { + self.address_metrics_internal(now) + } + + /// Returns metrics for the addresses in this address book. + /// + /// # Correctness + /// + /// External callers should use [`AddressBook::address_metrics_watcher().borrow()`] + /// in production code, to avoid deadlocks. + /// (Using the watch channel receiver does not lock the address book mutex.) + fn address_metrics_internal(&self, now: chrono::DateTime) -> AddressMetrics { let responded = self.state_peers(PeerAddrState::Responded).count(); let never_attempted_gossiped = self .state_peers(PeerAddrState::NeverAttemptedGossiped) @@ -453,7 +491,10 @@ impl AddressBook { fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime) { let _guard = self.span.enter(); - let m = self.address_metrics(chrono_now); + let m = self.address_metrics_internal(chrono_now); + + // Ignore errors: we don't care if any receivers are listening. + let _ = self.address_metrics_tx.send(m); // TODO: rename to address_book.[state_name] metrics::gauge!("candidate_set.responded", m.responded as f64); @@ -536,3 +577,26 @@ impl Extend for AddressBook { } } } + +impl Clone for AddressBook { + /// Clone the addresses, address limit, local listener address, and span. + /// + /// Cloned address books have a separate metrics struct watch channel, and an empty last address log. + /// + /// All address books update the same prometheus metrics. + fn clone(&self) -> AddressBook { + // The existing metrics might be outdated, but we avoid calling `update_metrics`, + // so we don't overwrite the prometheus metrics from the main address book. + let (address_metrics_tx, _address_metrics_rx) = + watch::channel(*self.address_metrics_tx.borrow()); + + AddressBook { + by_addr: self.by_addr.clone(), + addr_limit: self.addr_limit, + local_listener: self.local_listener, + span: self.span.clone(), + address_metrics_tx, + last_address_log: None, + } + } +} diff --git a/zebra-network/src/address_book_updater.rs b/zebra-network/src/address_book_updater.rs index b09d19b2a..294f5d7d3 100644 --- a/zebra-network/src/address_book_updater.rs +++ b/zebra-network/src/address_book_updater.rs @@ -2,11 +2,15 @@ use std::{net::SocketAddr, sync::Arc}; -use futures::{channel::mpsc, prelude::*}; use thiserror::Error; -use tokio::task::JoinHandle; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, +}; -use crate::{meta_addr::MetaAddrChange, AddressBook, BoxError, Config}; +use crate::{ + address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config, +}; /// The `AddressBookUpdater` hooks into incoming message streams for each peer /// and lets the owner of the sender handle update the address book. For @@ -24,15 +28,19 @@ impl AddressBookUpdater { /// configured with Zebra's actual `local_listener` address. /// /// Returns handles for: + /// - the address book, /// - the transmission channel for address book update events, - /// - the address book, and - /// - the address book updater task. + /// - a watch channel for address book metrics, and + /// - the address book updater task join handle. + /// + /// The task exits with an error when the returned [`mpsc::Sender`] is closed. pub fn spawn( config: &Config, local_listener: SocketAddr, ) -> ( Arc>, mpsc::Sender, + watch::Receiver, JoinHandle>, ) { use tracing::Level; @@ -41,14 +49,14 @@ impl AddressBookUpdater { // based on the maximum number of inbound and outbound peers. let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit()); - let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new( - local_listener, - span!(Level::TRACE, "address book updater"), - ))); - let worker_address_book = address_book.clone(); + let address_book = + AddressBook::new(local_listener, span!(Level::TRACE, "address book updater")); + let address_metrics = address_book.address_metrics_watcher(); + let address_book = Arc::new(std::sync::Mutex::new(address_book)); - let worker = async move { - while let Some(event) = worker_rx.next().await { + let worker_address_book = address_book.clone(); + let worker = move || { + while let Some(event) = worker_rx.blocking_recv() { // # Correctness // // Briefly hold the address book threaded mutex, to update the @@ -62,8 +70,15 @@ impl AddressBookUpdater { Err(AllAddressBookUpdaterSendersClosed.into()) }; - let address_book_updater_task_handle = tokio::spawn(worker.boxed()); + // Correctness: spawn address book accesses on a blocking thread, + // to avoid deadlocks (see #1976) + let address_book_updater_task_handle = tokio::task::spawn_blocking(worker); - (address_book, worker_tx, address_book_updater_task_handle) + ( + address_book, + worker_tx, + address_metrics, + address_book_updater_task_handle, + ) } } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 337b918aa..f633314e2 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -10,10 +10,7 @@ use std::{ }; use chrono::{TimeZone, Utc}; -use futures::{ - channel::{mpsc, oneshot}, - future, FutureExt, SinkExt, StreamExt, -}; +use futures::{channel::oneshot, future, FutureExt, SinkExt, StreamExt}; use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; use tokio_stream::wrappers::IntervalStream; use tokio_util::codec::Framed; @@ -54,7 +51,7 @@ use crate::{ pub struct Handshake { config: Config, inbound_service: S, - address_book_updater: mpsc::Sender, + address_book_updater: tokio::sync::mpsc::Sender, inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, nonces: Arc>>, user_agent: String, @@ -306,7 +303,7 @@ impl fmt::Debug for ConnectedAddr { pub struct Builder { config: Option, inbound_service: Option, - address_book_updater: Option>, + address_book_updater: Option>, our_services: Option, user_agent: Option, relay: Option, @@ -350,7 +347,7 @@ where /// make outbound connections to peers. pub fn with_address_book_updater( mut self, - address_book_updater: mpsc::Sender, + address_book_updater: tokio::sync::mpsc::Sender, ) -> Self { self.address_book_updater = Some(address_book_updater); self @@ -415,7 +412,7 @@ where let address_book_updater = self.address_book_updater.unwrap_or_else(|| { // No `AddressBookUpdater` for timestamp collection was passed, so create a stub // channel. Dropping the receiver means sends will fail, but we don't care. - let (tx, _rx) = mpsc::channel(1); + let (tx, _rx) = tokio::sync::mpsc::channel(1); tx }); let nonces = Arc::new(futures::lock::Mutex::new(HashSet::new())); @@ -713,7 +710,7 @@ where // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); let inbound_service = self.inbound_service.clone(); - let mut address_book_updater = self.address_book_updater.clone(); + let address_book_updater = self.address_book_updater.clone(); let inv_collector = self.inv_collector.clone(); let config = self.config.clone(); let user_agent = self.user_agent.clone(); @@ -787,7 +784,7 @@ where // These channels should not be cloned more than they are // in this block, see constants.rs for more. - let (server_tx, server_rx) = mpsc::channel(0); + let (server_tx, server_rx) = futures::channel::mpsc::channel(0); let (shutdown_tx, shutdown_rx) = oneshot::channel(); let error_slot = ErrorSlot::default(); @@ -831,7 +828,7 @@ where .then(move |msg| { // Add a metric for inbound messages and errors. // Fire a timestamp or failure event. - let mut inbound_ts_collector = inbound_ts_collector.clone(); + let inbound_ts_collector = inbound_ts_collector.clone(); let span = debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector"); async move { @@ -1018,7 +1015,9 @@ where } /// Send one heartbeat using `server_tx`. -async fn send_one_heartbeat(server_tx: &mut mpsc::Sender) -> Result<(), BoxError> { +async fn send_one_heartbeat( + server_tx: &mut futures::channel::mpsc::Sender, +) -> Result<(), BoxError> { // We just reached a heartbeat interval, so start sending // a heartbeat. let (tx, rx) = oneshot::channel(); @@ -1065,7 +1064,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender) -> Resu /// `handle_heartbeat_error`. async fn heartbeat_timeout( fut: F, - address_book_updater: &mut mpsc::Sender, + address_book_updater: &mut tokio::sync::mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1099,7 +1098,7 @@ where /// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`. async fn handle_heartbeat_error( result: Result, - address_book_updater: &mut mpsc::Sender, + address_book_updater: &mut tokio::sync::mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index fb54ff5cd..212aa35b3 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -8,7 +8,8 @@ use tower::{Service, ServiceExt}; use zebra_chain::serialization::DateTime32; use crate::{ - constants, peer_set::set::MorePeers, types::MetaAddr, AddressBook, BoxError, Request, Response, + constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook, + BoxError, Request, Response, }; #[cfg(test)] @@ -115,8 +116,10 @@ mod tests; // * show that seed peers that transition to other never attempted // states are already in the address book pub(crate) struct CandidateSet { - pub(super) address_book: Arc>, - pub(super) peer_service: S, + // Correctness: the address book must be private, + // so all operations are performed on a blocking thread (see #1976). + address_book: Arc>, + peer_service: S, min_next_handshake: Instant, min_next_crawl: Instant, } @@ -271,6 +274,8 @@ where responses.push(peer_service.call(Request::Peers)); } + let mut address_book_updates = FuturesUnordered::new(); + // Process responses while let Some(rsp) = responses.next().await { match rsp { @@ -281,7 +286,7 @@ where "got response to GetPeers" ); let addrs = validate_addrs(addrs, DateTime32::now()); - self.send_addrs(addrs); + address_book_updates.push(self.send_addrs(addrs)); more_peers = Some(MorePeers); } Err(e) => { @@ -293,25 +298,37 @@ where } } + // Wait until all the address book updates have finished + while let Some(()) = address_book_updates.next().await {} + Ok(more_peers) } /// Add new `addrs` to the address book. - fn send_addrs(&self, addrs: impl IntoIterator) { - let addrs = addrs + async fn send_addrs(&self, addrs: impl IntoIterator) { + let addrs: Vec = addrs .into_iter() .map(MetaAddr::new_gossiped_change) - .map(|maybe_addr| { - maybe_addr.expect("Received gossiped peers always have services set") - }); + .map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set")) + .collect(); + + debug!(count = ?addrs.len(), "sending gossiped addresses to the address book"); + + // Don't bother spawning a task if there are no addresses left. + if addrs.is_empty() { + return; + } // # Correctness // - // Briefly hold the address book threaded mutex, to extend - // the address list. + // Spawn address book accesses on a blocking thread, + // to avoid deadlocks (see #1976). // // Extend handles duplicate addresses internally. - self.address_book.lock().unwrap().extend(addrs); + let address_book = self.address_book.clone(); + tokio::task::spawn_blocking(move || address_book.lock().unwrap().extend(addrs)) + .await + .expect("panic in new peers address book update task"); } /// Returns the next candidate for a connection attempt, if any are available. @@ -335,19 +352,10 @@ where /// new peer connections are initiated at least /// [`MIN_PEER_CONNECTION_INTERVAL`][constants::MIN_PEER_CONNECTION_INTERVAL] apart. pub async fn next(&mut self) -> Option { - // # Correctness - // - // In this critical section, we hold the address mutex, blocking the - // current thread, and all async tasks scheduled on that thread. - // - // To avoid deadlocks, the critical section: - // - must not acquire any other locks - // - must not await any futures - // - // To avoid hangs, any computation in the critical section should - // be kept to a minimum. - let reconnect = { - let mut guard = self.address_book.lock().unwrap(); + // Correctness: To avoid hangs, computation in the critical section should be kept to a minimum. + let address_book = self.address_book.clone(); + let next_peer = move || -> Option { + let mut guard = address_book.lock().unwrap(); // Now we have the lock, get the current time let instant_now = std::time::Instant::now(); @@ -355,27 +363,43 @@ where // It's okay to return without sleeping here, because we're returning // `None`. We only need to sleep before yielding an address. - let reconnect = guard.reconnection_peers(instant_now, chrono_now).next()?; + let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?; - let reconnect = MetaAddr::new_reconnect(&reconnect.addr); - guard.update(reconnect)? + // TODO: only mark the peer as AttemptPending when it is actually used (#1976) + // + // If the future is dropped before `next` returns, the peer will be marked as AttemptPending, + // even if its address is not actually used for a connection. + // + // We could send a reconnect change to the AddressBookUpdater when the peer is actually used, + // but channel order is not guaranteed, so we could accidentally re-use the same peer. + let next_peer = MetaAddr::new_reconnect(&next_peer.addr); + guard.update(next_peer) }; - // SECURITY: rate-limit new outbound peer connections + // Correctness: Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976). + let next_peer = tokio::task::spawn_blocking(next_peer) + .await + .expect("panic in next peer address book task")?; + + // Security: rate-limit new outbound peer connections sleep_until(self.min_next_handshake).await; self.min_next_handshake = Instant::now() + constants::MIN_PEER_CONNECTION_INTERVAL; - Some(reconnect) + Some(next_peer) } /// Mark `addr` as a failed peer. - pub fn report_failed(&mut self, addr: &MetaAddr) { + pub async fn report_failed(&mut self, addr: &MetaAddr) { let addr = MetaAddr::new_errored(&addr.addr, addr.services); + // # Correctness // - // Briefly hold the address book threaded mutex, to update the state for - // a single address. - self.address_book.lock().unwrap().update(addr); + // Spawn address book accesses on a blocking thread, + // to avoid deadlocks (see #1976). + let address_book = self.address_book.clone(); + tokio::task::spawn_blocking(move || address_book.lock().unwrap().update(addr)) + .await + .expect("panic in peer failure address book update task"); } } diff --git a/zebra-network/src/peer_set/candidate_set/tests/prop.rs b/zebra-network/src/peer_set/candidate_set/tests/prop.rs index 2edc92d2f..8c50e08d9 100644 --- a/zebra-network/src/peer_set/candidate_set/tests/prop.rs +++ b/zebra-network/src/peer_set/candidate_set/tests/prop.rs @@ -6,7 +6,6 @@ use std::{ time::{Duration, Instant}, }; -use futures::FutureExt; use proptest::{collection::vec, prelude::*}; use tokio::time::{sleep, timeout}; use tracing::Span; @@ -71,8 +70,12 @@ proptest! { // Make sure that the rate-limit is never triggered, even after multiple calls for _ in 0..next_peer_attempts { - // An empty address book immediately returns "no next peer" - assert!(matches!(candidate_set.next().now_or_never(), Some(None))); + // An empty address book immediately returns "no next peer". + // + // Check that it takes less than the peer set candidate delay, + // and hope that is enough time for test machines with high CPU load. + let less_than_min_interval = MIN_PEER_CONNECTION_INTERVAL - Duration::from_millis(1); + assert_eq!(runtime.block_on(timeout(less_than_min_interval, candidate_set.next())), Ok(None)); } } } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 5fd8ecf5a..04b44211c 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -6,7 +6,6 @@ use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use futures::{ - channel::mpsc, future::{self, FutureExt}, sink::SinkExt, stream::{FuturesUnordered, StreamExt, TryStreamExt}, @@ -66,7 +65,9 @@ type DiscoveredPeer = Result<(SocketAddr, peer::Client), BoxError>; /// /// In addition to returning a service for outbound requests, this method /// returns a shared [`AddressBook`] updated with last-seen timestamps for -/// connected peers. +/// connected peers. The shared address book should be accessed using a +/// [blocking thread](https://docs.rs/tokio/1.15.0/tokio/task/index.html#blocking-and-yielding), +/// to avoid async task deadlocks. /// /// # Panics /// @@ -94,7 +95,7 @@ where let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; - let (address_book, address_book_updater, address_book_updater_guard) = + let (address_book, address_book_updater, address_metrics, address_book_updater_guard) = AddressBookUpdater::spawn(&config, listen_addr); // Create a broadcast channel for peer inventory advertisements. @@ -134,7 +135,7 @@ where // Create an mpsc channel for peer changes, // based on the maximum number of inbound and outbound peers. let (peerset_tx, peerset_rx) = - mpsc::channel::(config.peerset_total_connection_limit()); + futures::channel::mpsc::channel::(config.peerset_total_connection_limit()); let discovered_peers = peerset_rx // Discover interprets an error as stream termination, @@ -145,7 +146,7 @@ where // Create an mpsc channel for peerset demand signaling, // based on the maximum number of outbound peers. let (mut demand_tx, demand_rx) = - mpsc::channel::(config.peerset_outbound_connection_limit()); + futures::channel::mpsc::channel::(config.peerset_outbound_connection_limit()); // Create a oneshot to send background task JoinHandles to the peer set let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); @@ -157,7 +158,7 @@ where demand_tx.clone(), handle_rx, inv_receiver, - address_book.clone(), + address_metrics, MinimumPeerVersion::new(latest_chain_tip, config.network), ); let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE); @@ -241,8 +242,8 @@ where async fn add_initial_peers( config: Config, outbound_connector: S, - mut peerset_tx: mpsc::Sender, - address_book_updater: mpsc::Sender, + mut peerset_tx: futures::channel::mpsc::Sender, + address_book_updater: tokio::sync::mpsc::Sender, ) -> Result where S: Service @@ -379,7 +380,7 @@ where /// Sends any unused entries to the `address_book_updater`. async fn limit_initial_peers( config: &Config, - mut address_book_updater: mpsc::Sender, + address_book_updater: tokio::sync::mpsc::Sender, ) -> HashSet { let all_peers = config.initial_peers().await; let peers_count = all_peers.len(); @@ -475,7 +476,7 @@ async fn accept_inbound_connections( config: Config, listener: TcpListener, mut handshaker: S, - peerset_tx: mpsc::Sender, + peerset_tx: futures::channel::mpsc::Sender, ) -> Result<(), BoxError> where S: Service + Clone, @@ -623,11 +624,11 @@ enum CrawlerAction { ))] async fn crawl_and_dial( config: Config, - mut demand_tx: mpsc::Sender, - mut demand_rx: mpsc::Receiver, + mut demand_tx: futures::channel::mpsc::Sender, + mut demand_rx: futures::channel::mpsc::Receiver, mut candidates: CandidateSet, outbound_connector: C, - mut peerset_tx: mpsc::Sender, + mut peerset_tx: futures::channel::mpsc::Sender, mut active_outbound_connections: ActiveConnectionCounter, ) -> Result<(), BoxError> where @@ -765,7 +766,7 @@ where // The connection was never opened, or it failed the handshake and was dropped. debug!(?failed_addr.addr, "marking candidate as failed"); - candidates.report_failed(&failed_addr); + candidates.report_failed(&failed_addr).await; // The demand signal that was taken out of the queue // to attempt to connect to the failed candidate never // turned into a connection, so add it back: diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index faa27090e..de5ca0104 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -1560,7 +1560,7 @@ where let (peerset_tx, peerset_rx) = mpsc::channel::(peer_count + 1); - let (_address_book, address_book_updater, address_book_updater_guard) = + let (_address_book, address_book_updater, _address_metrics, address_book_updater_guard) = AddressBookUpdater::spawn(&config, unused_v4); let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater); diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 6f6641a28..8d0d9287a 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -97,12 +97,10 @@ use std::{ marker::PhantomData, net::SocketAddr, pin::Pin, - sync::Arc, task::{Context, Poll}, time::Instant, }; -use chrono::Utc; use futures::{ channel::{mpsc, oneshot}, future::{FutureExt, TryFutureExt}, @@ -110,7 +108,7 @@ use futures::{ stream::FuturesUnordered, }; use tokio::{ - sync::{broadcast, oneshot::error::TryRecvError}, + sync::{broadcast, oneshot::error::TryRecvError, watch}, task::JoinHandle, }; use tower::{ @@ -122,6 +120,7 @@ use tower::{ use zebra_chain::chain_tip::ChainTip; use crate::{ + address_book::AddressMetrics, peer::{LoadTrackedClient, MinimumPeerVersion}, peer_set::{ unready_service::{Error as UnreadyError, UnreadyService}, @@ -131,7 +130,7 @@ use crate::{ external::InventoryHash, internal::{Request, Response}, }, - AddressBook, BoxError, Config, + BoxError, Config, }; #[cfg(test)] @@ -210,10 +209,10 @@ where /// the `PeerSet` propagate errors from background tasks back to the user guards: futures::stream::FuturesUnordered>>, - /// A shared list of peer addresses. + /// Address book metrics watch channel. /// /// Used for logging diagnostics. - address_book: Arc>, + address_metrics: watch::Receiver, /// The last time we logged a message about the peer set size last_peer_log: Option, @@ -266,7 +265,7 @@ where demand_signal: mpsc::Sender, handle_rx: tokio::sync::oneshot::Receiver>>>, inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, - address_book: Arc>, + address_metrics: watch::Receiver, minimum_peer_version: MinimumPeerVersion, ) -> Self { Self { @@ -287,7 +286,7 @@ where // Metrics last_peer_log: None, - address_book, + address_metrics, peerset_total_connection_limit: config.peerset_total_connection_limit(), // Real-time parameters @@ -770,19 +769,7 @@ where self.last_peer_log = Some(Instant::now()); - // # Correctness - // - // Only log address metrics in exceptional circumstances, to avoid lock contention. - // - // Get the current time after acquiring the address book lock. - // - // TODO: replace with a watch channel that is updated in `AddressBook::update_metrics()`, - // or turn the address book into a service (#1976) - let address_metrics = self - .address_book - .lock() - .unwrap() - .address_metrics(Utc::now()); + let address_metrics = self.address_metrics.borrow(); if unready_services_len == 0 { warn!( ?address_metrics, @@ -809,12 +796,7 @@ where // Security: make sure we haven't exceeded the connection limit if num_peers > self.peerset_total_connection_limit { - // Correctness: Get the current time after acquiring the address book lock. - let address_metrics = self - .address_book - .lock() - .unwrap() - .address_metrics(Utc::now()); + let address_metrics = self.address_metrics.borrow(); panic!( "unexpectedly exceeded configured peer set connection limit: \n\ peers: {:?}, ready: {:?}, unready: {:?}, \n\ diff --git a/zebra-network/src/peer_set/set/tests.rs b/zebra-network/src/peer_set/set/tests.rs index e48c89244..f0696570f 100644 --- a/zebra-network/src/peer_set/set/tests.rs +++ b/zebra-network/src/peer_set/set/tests.rs @@ -6,7 +6,10 @@ use futures::{ }; use proptest::{collection::vec, prelude::*}; use proptest_derive::Arbitrary; -use tokio::{sync::broadcast, task::JoinHandle}; +use tokio::{ + sync::{broadcast, watch}, + task::JoinHandle, +}; use tower::{ discover::{Change, Discover}, BoxError, @@ -21,6 +24,7 @@ use zebra_chain::{ use super::MorePeers; use crate::{ + address_book::AddressMetrics, peer::{ CancelHeartbeatTask, Client, ClientRequest, ErrorSlot, LoadTrackedClient, MinimumPeerVersion, @@ -224,7 +228,7 @@ where .inv_stream .unwrap_or_else(|| guard.create_inventory_receiver()); - let address_book = guard.prepare_address_book(self.address_book); + let address_metrics = guard.prepare_address_book(self.address_book); let peer_set = PeerSet::new( &config, @@ -232,7 +236,7 @@ where demand_signal, handle_rx, inv_stream, - address_book, + address_metrics, minimum_peer_version, ); @@ -302,17 +306,21 @@ impl PeerSetGuard { /// inside the [`PeerSetGuard`] to keep track of it. Otherwise, a new instance is created with /// the [`Self::fallback_address_book`] method. /// - /// A reference to the [`AddressBook`] instance tracked by the [`PeerSetGuard`] is returned to - /// be passed to the [`PeerSet`] constructor. + /// Returns a metrics watch channel for the [`AddressBook`] instance tracked by the [`PeerSetGuard`], + /// so it can be passed to the [`PeerSet`] constructor. pub fn prepare_address_book( &mut self, maybe_address_book: Option>>, - ) -> Arc> { + ) -> watch::Receiver { let address_book = maybe_address_book.unwrap_or_else(Self::fallback_address_book); + let metrics_watcher = address_book + .lock() + .expect("unexpected panic in previous address book mutex guard") + .address_metrics_watcher(); - self.address_book = Some(address_book.clone()); + self.address_book = Some(address_book); - address_book + metrics_watcher } /// Create an empty [`AddressBook`] instance using a dummy local listener address.