From 43e54d1cb2dab4238b5c0d6d6c912959e17ae103 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 6 Nov 2023 08:28:58 +1000 Subject: [PATCH] fix(net): Fix a potential hang caused by accessing the address book directly (#7902) * Fix a potential hang accessing the address book directly * Remove unused connection shutdown MetaAddr arguments * Add an UpdateConnected MetaAddrChange, that sends initial connection info * Fix some tests * Fix a panic with a zero channel size --- .../src/address_book/tests/vectors.rs | 4 +- zebra-network/src/address_book_updater.rs | 10 +- zebra-network/src/meta_addr.rs | 103 +++++++++++------- zebra-network/src/meta_addr/tests/vectors.rs | 14 +-- zebra-network/src/peer/handshake.rs | 44 ++------ zebra-network/src/peer_set/candidate_set.rs | 2 + zebra-network/src/peer_set/initialize.rs | 51 ++++----- .../src/peer_set/initialize/tests/vectors.rs | 18 ++- 8 files changed, 118 insertions(+), 128 deletions(-) diff --git a/zebra-network/src/address_book/tests/vectors.rs b/zebra-network/src/address_book/tests/vectors.rs index e401e6a5d..16a954442 100644 --- a/zebra-network/src/address_book/tests/vectors.rs +++ b/zebra-network/src/address_book/tests/vectors.rs @@ -128,9 +128,7 @@ fn address_book_peer_order() { fn reconnection_peers_skips_recently_updated_ip() { // tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent: // - `last_response` - test_reconnection_peers_skips_recently_updated_ip(true, |addr| { - MetaAddr::new_responded(addr, &PeerServices::NODE_NETWORK) - }); + test_reconnection_peers_skips_recently_updated_ip(true, MetaAddr::new_responded); // tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent: // - `last_attempt` diff --git a/zebra-network/src/address_book_updater.rs b/zebra-network/src/address_book_updater.rs index ef503bc8d..f0729b3ef 100644 --- a/zebra-network/src/address_book_updater.rs +++ b/zebra-network/src/address_book_updater.rs @@ -1,6 +1,6 @@ //! The timestamp collector collects liveness information from peers. -use std::{net::SocketAddr, sync::Arc}; +use std::{cmp::max, net::SocketAddr, sync::Arc}; use thiserror::Error; use tokio::{ @@ -13,6 +13,9 @@ use crate::{ address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config, }; +/// The minimum size of the address book updater channel. +pub const MIN_CHANNEL_SIZE: usize = 10; + /// The `AddressBookUpdater` hooks into incoming message streams for each peer /// and lets the owner of the sender handle update the address book. For /// example, it can be used to record per-connection last-seen timestamps, or @@ -46,7 +49,10 @@ impl AddressBookUpdater { ) { // Create an mpsc channel for peerset address book updates, // based on the maximum number of inbound and outbound peers. - let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit()); + let (worker_tx, mut worker_rx) = mpsc::channel(max( + config.peerset_total_connection_limit(), + MIN_CHANNEL_SIZE, + )); let address_book = AddressBook::new( local_listener, diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 1f8572fd5..98d3f3222 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -280,6 +280,16 @@ pub enum MetaAddrChange { addr: PeerSocketAddr, }, + /// Updates an existing `MetaAddr` when we've made a successful connection with a peer. + UpdateConnected { + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "canonical_peer_addr_strategy()") + )] + addr: PeerSocketAddr, + services: PeerServices, + }, + /// Updates an existing `MetaAddr` when a peer responds with a message. UpdateResponded { #[cfg_attr( @@ -287,7 +297,6 @@ pub enum MetaAddrChange { proptest(strategy = "canonical_peer_addr_strategy()") )] addr: PeerSocketAddr, - services: PeerServices, }, /// Updates an existing `MetaAddr` when a peer fails. @@ -345,8 +354,8 @@ impl MetaAddr { }) } - /// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just - /// sent us a message. + /// Returns a [`MetaAddrChange::UpdateConnected`] for a peer that has just successfully + /// connected. /// /// # Security /// @@ -354,16 +363,33 @@ impl MetaAddr { /// and the services must be the services from that peer's handshake. /// /// Otherwise: - /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook) state, - /// or + /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook) + /// state, or /// - Zebra could advertise unreachable addresses to its own peers. - pub fn new_responded(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange { - UpdateResponded { + pub fn new_connected(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange { + UpdateConnected { addr: canonical_peer_addr(*addr), services: *services, } } + /// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just + /// sent us a message. + /// + /// # Security + /// + /// This address must be the remote address from an outbound connection. + /// + /// Otherwise: + /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook) + /// state, or + /// - Zebra could advertise unreachable addresses to its own peers. + pub fn new_responded(addr: PeerSocketAddr) -> MetaAddrChange { + UpdateResponded { + addr: canonical_peer_addr(*addr), + } + } + /// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we /// want to make an outbound connection to. pub fn new_reconnect(addr: PeerSocketAddr) -> MetaAddrChange { @@ -391,8 +417,7 @@ impl MetaAddr { } } - /// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had - /// an error. + /// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had an error. pub fn new_errored( addr: PeerSocketAddr, services: impl Into>, @@ -404,13 +429,10 @@ impl MetaAddr { } /// Create a new `MetaAddr` for a peer that has just shut down. - pub fn new_shutdown( - addr: PeerSocketAddr, - services: impl Into>, - ) -> MetaAddrChange { + pub fn new_shutdown(addr: PeerSocketAddr) -> MetaAddrChange { // TODO: if the peer shut down in the Responded state, preserve that // state. All other states should be treated as (timeout) errors. - MetaAddr::new_errored(addr, services.into()) + MetaAddr::new_errored(addr, None) } /// Return the address for this `MetaAddr`. @@ -696,6 +718,7 @@ impl MetaAddrChange { | NewAlternate { addr, .. } | NewLocal { addr, .. } | UpdateAttempt { addr } + | UpdateConnected { addr, .. } | UpdateResponded { addr, .. } | UpdateFailed { addr, .. } => *addr, } @@ -712,6 +735,7 @@ impl MetaAddrChange { | NewAlternate { addr, .. } | NewLocal { addr, .. } | UpdateAttempt { addr } + | UpdateConnected { addr, .. } | UpdateResponded { addr, .. } | UpdateFailed { addr, .. } => *addr = new_addr, } @@ -721,17 +745,18 @@ impl MetaAddrChange { pub fn untrusted_services(&self) -> Option { match self { NewInitial { .. } => None, + // TODO: split untrusted and direct services (#2324) NewGossiped { untrusted_services, .. - } => Some(*untrusted_services), - NewAlternate { + } + | NewAlternate { untrusted_services, .. } => Some(*untrusted_services), // TODO: create a "services implemented by Zebra" constant (#2324) NewLocal { .. } => Some(PeerServices::NODE_NETWORK), UpdateAttempt { .. } => None, - // TODO: split untrusted and direct services (#2324) - UpdateResponded { services, .. } => Some(*services), + UpdateConnected { services, .. } => Some(*services), + UpdateResponded { .. } => None, UpdateFailed { services, .. } => *services, } } @@ -747,9 +772,10 @@ impl MetaAddrChange { NewAlternate { .. } => None, // We know that our local listener is available NewLocal { .. } => Some(now), - UpdateAttempt { .. } => None, - UpdateResponded { .. } => None, - UpdateFailed { .. } => None, + UpdateAttempt { .. } + | UpdateConnected { .. } + | UpdateResponded { .. } + | UpdateFailed { .. } => None, } } @@ -775,33 +801,29 @@ impl MetaAddrChange { /// Return the last attempt for this change, if available. pub fn last_attempt(&self, now: Instant) -> Option { match self { - NewInitial { .. } => None, - NewGossiped { .. } => None, - NewAlternate { .. } => None, - NewLocal { .. } => None, + NewInitial { .. } | NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => None, // Attempt changes are applied before we start the handshake to the // peer address. So the attempt time is a lower bound for the actual // handshake time. UpdateAttempt { .. } => Some(now), - UpdateResponded { .. } => None, - UpdateFailed { .. } => None, + UpdateConnected { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None, } } /// Return the last response for this change, if available. pub fn last_response(&self, now: DateTime32) -> Option { match self { - NewInitial { .. } => None, - NewGossiped { .. } => None, - NewAlternate { .. } => None, - NewLocal { .. } => None, - UpdateAttempt { .. } => None, + NewInitial { .. } + | NewGossiped { .. } + | NewAlternate { .. } + | NewLocal { .. } + | UpdateAttempt { .. } => None, // If there is a large delay applying this change, then: // - the peer might stay in the `AttemptPending` state for longer, // - we might send outdated last seen times to our peers, and // - the peer will appear to be live for longer, delaying future // reconnection attempts. - UpdateResponded { .. } => Some(now), + UpdateConnected { .. } | UpdateResponded { .. } => Some(now), UpdateFailed { .. } => None, } } @@ -809,12 +831,13 @@ impl MetaAddrChange { /// Return the last failure for this change, if available. pub fn last_failure(&self, now: Instant) -> Option { match self { - NewInitial { .. } => None, - NewGossiped { .. } => None, - NewAlternate { .. } => None, - NewLocal { .. } => None, - UpdateAttempt { .. } => None, - UpdateResponded { .. } => None, + NewInitial { .. } + | NewGossiped { .. } + | NewAlternate { .. } + | NewLocal { .. } + | UpdateAttempt { .. } + | UpdateConnected { .. } + | UpdateResponded { .. } => None, // If there is a large delay applying this change, then: // - the peer might stay in the `AttemptPending` or `Responded` // states for longer, and @@ -833,7 +856,7 @@ impl MetaAddrChange { // local listeners get sanitized, so the state doesn't matter here NewLocal { .. } => NeverAttemptedGossiped, UpdateAttempt { .. } => AttemptPending, - UpdateResponded { .. } => Responded, + UpdateConnected { .. } | UpdateResponded { .. } => Responded, UpdateFailed { .. } => Failed, } } diff --git a/zebra-network/src/meta_addr/tests/vectors.rs b/zebra-network/src/meta_addr/tests/vectors.rs index 5b341901b..40e00b665 100644 --- a/zebra-network/src/meta_addr/tests/vectors.rs +++ b/zebra-network/src/meta_addr/tests/vectors.rs @@ -170,7 +170,7 @@ fn recently_responded_peer_is_gossipable() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); @@ -191,7 +191,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let mut peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); @@ -222,7 +222,7 @@ fn responded_long_ago_peer_is_not_gossipable() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let mut peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); @@ -253,7 +253,7 @@ fn long_delayed_change_is_not_applied() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); @@ -297,7 +297,7 @@ fn later_revert_change_is_applied() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); @@ -340,7 +340,7 @@ fn concurrent_state_revert_change_is_not_applied() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); @@ -400,7 +400,7 @@ fn concurrent_state_progress_change_is_applied() { .into_new_meta_addr(instant_now, local_now); // Create a peer that has responded - let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) + let peer = MetaAddr::new_responded(address) .apply_to_meta_addr(peer_seed, instant_now, chrono_now) .expect("Failed to create MetaAddr for responded peer"); diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index e0d981951..e343e1c20 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -929,12 +929,13 @@ where let _ = address_book_updater.send(alt_addr).await; } - // The handshake succeeded: update the peer status from AttemptPending to Responded + // The handshake succeeded: update the peer status from AttemptPending to Responded, + // and send initial connection info. if let Some(book_addr) = connected_addr.get_address_book_addr() { // the collector doesn't depend on network activity, // so this await should not hang let _ = address_book_updater - .send(MetaAddr::new_responded(book_addr, &remote_services)) + .send(MetaAddr::new_connected(book_addr, &remote_services)) .await; } @@ -1075,7 +1076,6 @@ where let heartbeat_task = tokio::spawn( send_periodic_heartbeats_with_shutdown_handle( connected_addr, - remote_services, shutdown_rx, server_tx.clone(), address_book_updater.clone(), @@ -1213,7 +1213,6 @@ pub(crate) async fn register_inventory_status( /// Returning from this function terminates the connection's heartbeat task. async fn send_periodic_heartbeats_with_shutdown_handle( connected_addr: ConnectedAddr, - remote_services: PeerServices, shutdown_rx: oneshot::Receiver, server_tx: futures::channel::mpsc::Sender, heartbeat_ts_collector: tokio::sync::mpsc::Sender, @@ -1222,7 +1221,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle( let heartbeat_run_loop = send_periodic_heartbeats_run_loop( connected_addr, - remote_services, server_tx, heartbeat_ts_collector.clone(), ); @@ -1246,7 +1244,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle( PeerError::ClientCancelledHeartbeatTask, &heartbeat_ts_collector, &connected_addr, - &remote_services, ) .await } @@ -1256,7 +1253,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle( PeerError::ClientDropped, &heartbeat_ts_collector, &connected_addr, - &remote_services, ) .await } @@ -1275,7 +1271,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle( /// See `send_periodic_heartbeats_with_shutdown_handle` for details. async fn send_periodic_heartbeats_run_loop( connected_addr: ConnectedAddr, - remote_services: PeerServices, mut server_tx: futures::channel::mpsc::Sender, heartbeat_ts_collector: tokio::sync::mpsc::Sender, ) -> Result<(), BoxError> { @@ -1294,13 +1289,7 @@ async fn send_periodic_heartbeats_run_loop( // We've reached another heartbeat interval without // shutting down, so do a heartbeat request. let heartbeat = send_one_heartbeat(&mut server_tx); - heartbeat_timeout( - heartbeat, - &heartbeat_ts_collector, - &connected_addr, - &remote_services, - ) - .await?; + heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?; // # Security // @@ -1312,7 +1301,7 @@ async fn send_periodic_heartbeats_run_loop( // the collector doesn't depend on network activity, // so this await should not hang let _ = heartbeat_ts_collector - .send(MetaAddr::new_responded(book_addr, &remote_services)) + .send(MetaAddr::new_responded(book_addr)) .await; } } @@ -1375,29 +1364,16 @@ async fn heartbeat_timeout( fut: F, address_book_updater: &tokio::sync::mpsc::Sender, connected_addr: &ConnectedAddr, - remote_services: &PeerServices, ) -> Result where F: Future>, { let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await { Ok(inner_result) => { - handle_heartbeat_error( - inner_result, - address_book_updater, - connected_addr, - remote_services, - ) - .await? + handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await? } Err(elapsed) => { - handle_heartbeat_error( - Err(elapsed), - address_book_updater, - connected_addr, - remote_services, - ) - .await? + handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await? } }; @@ -1409,7 +1385,6 @@ async fn handle_heartbeat_error( result: Result, address_book_updater: &tokio::sync::mpsc::Sender, connected_addr: &ConnectedAddr, - remote_services: &PeerServices, ) -> Result where E: std::fmt::Debug, @@ -1427,7 +1402,7 @@ where // - after the first error or shutdown, the peer is disconnected if let Some(book_addr) = connected_addr.get_address_book_addr() { let _ = address_book_updater - .send(MetaAddr::new_errored(book_addr, *remote_services)) + .send(MetaAddr::new_errored(book_addr, None)) .await; } Err(err) @@ -1440,13 +1415,12 @@ async fn handle_heartbeat_shutdown( peer_error: PeerError, address_book_updater: &tokio::sync::mpsc::Sender, connected_addr: &ConnectedAddr, - remote_services: &PeerServices, ) -> Result<(), BoxError> { tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat"); if let Some(book_addr) = connected_addr.get_address_book_addr() { let _ = address_book_updater - .send(MetaAddr::new_shutdown(book_addr, *remote_services)) + .send(MetaAddr::new_shutdown(book_addr)) .await; } diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index f951bda5b..b60dc7473 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -414,6 +414,8 @@ where } /// Returns the address book for this `CandidateSet`. + #[cfg(any(test, feature = "proptest-impl"))] + #[allow(dead_code)] pub async fn address_book(&self) -> Arc> { self.address_book.clone() } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 9dc74d99f..30404ac6a 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -29,7 +29,6 @@ use tokio_stream::wrappers::IntervalStream; use tower::{ buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt, }; -use tracing::Span; use tracing_futures::Instrument; use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics}; @@ -197,7 +196,7 @@ where config.clone(), outbound_connector.clone(), peerset_tx.clone(), - address_book_updater, + address_book_updater.clone(), ); let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span()); @@ -242,6 +241,7 @@ where outbound_connector, peerset_tx, active_outbound_connections, + address_book_updater, ); let crawl_guard = tokio::spawn(crawl_fut.in_current_span()); @@ -740,6 +740,7 @@ enum CrawlerAction { /// /// Uses `active_outbound_connections` to limit the number of active outbound connections /// across both the initial peers and crawler. The limit is based on `config`. +#[allow(clippy::too_many_arguments)] #[instrument( skip( config, @@ -749,6 +750,7 @@ enum CrawlerAction { outbound_connector, peerset_tx, active_outbound_connections, + address_book_updater, ), fields( new_peer_interval = ?config.crawl_new_peer_interval, @@ -762,6 +764,7 @@ async fn crawl_and_dial( outbound_connector: C, peerset_tx: futures::channel::mpsc::Sender, mut active_outbound_connections: ActiveConnectionCounter, + address_book_updater: tokio::sync::mpsc::Sender, ) -> Result<(), BoxError> where C: Service< @@ -783,8 +786,6 @@ where "starting the peer address crawler", ); - let address_book = candidates.address_book().await; - // # Concurrency // // Allow tasks using the candidate set to be spawned, so they can run concurrently. @@ -857,7 +858,7 @@ where let candidates = candidates.clone(); let outbound_connector = outbound_connector.clone(); let peerset_tx = peerset_tx.clone(); - let address_book = address_book.clone(); + let address_book_updater = address_book_updater.clone(); let demand_tx = demand_tx.clone(); // Increment the connection count before we spawn the connection. @@ -886,7 +887,7 @@ where outbound_connector, outbound_connection_tracker, peerset_tx, - address_book, + address_book_updater, demand_tx, ) .await?; @@ -1020,7 +1021,7 @@ where outbound_connector, outbound_connection_tracker, peerset_tx, - address_book, + address_book_updater, demand_tx ))] async fn dial( @@ -1028,7 +1029,7 @@ async fn dial( mut outbound_connector: C, outbound_connection_tracker: ConnectionTracker, mut peerset_tx: futures::channel::mpsc::Sender, - address_book: Arc>, + address_book_updater: tokio::sync::mpsc::Sender, mut demand_tx: futures::channel::mpsc::Sender, ) -> Result<(), BoxError> where @@ -1069,8 +1070,8 @@ where } // The connection was never opened, or it failed the handshake and was dropped. Err(error) => { - debug!(?error, ?candidate.addr, "failed to make outbound connection to peer"); - report_failed(address_book.clone(), candidate).await; + info!(?error, ?candidate.addr, "failed to make outbound connection to peer"); + report_failed(address_book_updater.clone(), candidate).await; // The demand signal that was taken out of the queue to attempt to connect to the // failed candidate never turned into a connection, so add it back. @@ -1090,25 +1091,15 @@ where Ok(()) } -/// Mark `addr` as a failed peer in `address_book`. -#[instrument(skip(address_book))] -async fn report_failed(address_book: Arc>, addr: MetaAddr) { - let addr = MetaAddr::new_errored(addr.addr, addr.services); +/// Mark `addr` as a failed peer to `address_book_updater`. +#[instrument(skip(address_book_updater))] +async fn report_failed( + address_book_updater: tokio::sync::mpsc::Sender, + addr: MetaAddr, +) { + // The connection info is the same as what's already in the address book. + let addr = MetaAddr::new_errored(addr.addr, None); - // # Correctness - // - // Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976). - let span = Span::current(); - let updated_addr = tokio::task::spawn_blocking(move || { - span.in_scope(|| address_book.lock().unwrap().update(addr)) - }) - .wait_for_panics() - .await; - - assert_eq!( - updated_addr.map(|addr| addr.addr()), - Some(addr.addr()), - "incorrect address updated by address book: \ - original: {addr:?}, updated: {updated_addr:?}" - ); + // Ignore send errors on Zebra shutdown. + let _ = address_book_updater.send(addr).await; } diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index c871ab432..59bafdc77 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -24,7 +24,6 @@ use futures::{channel::mpsc, FutureExt, StreamExt}; use indexmap::IndexSet; use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle}; use tower::{service_fn, Layer, Service, ServiceExt}; -use tracing::Span; use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32}; use zebra_test::net::random_known_port; @@ -1517,13 +1516,8 @@ where config.peerset_initial_target_size = peerset_initial_target_size; } - // Manually initialize an address book without a timestamp tracker. - let mut address_book = AddressBook::new( - config.listen_addr, - config.network, - config.max_connections_per_ip, - Span::current(), - ); + let (address_book, address_book_updater, _address_metrics, _address_book_updater_guard) = + AddressBookUpdater::spawn(&config, config.listen_addr); // Add enough fake peers to go over the limit, even if the limit is zero. let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1; @@ -1540,7 +1534,10 @@ where .new_gossiped_change() .expect("created MetaAddr contains enough information to represent a gossiped address"); - address_book.update(addr); + address_book + .lock() + .expect("panic in previous thread while accessing the address book") + .update(addr); } // Create a fake peer set. @@ -1555,8 +1552,6 @@ where Ok(rsp) }); - let address_book = Arc::new(std::sync::Mutex::new(address_book)); - // Make the channels large enough to hold all the peers. let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_peers); let (mut demand_tx, demand_rx) = mpsc::channel::(over_limit_peers); @@ -1581,6 +1576,7 @@ where outbound_connector, peerset_tx, active_outbound_connections, + address_book_updater, ); let crawl_task_handle = tokio::spawn(crawl_fut);