fix(net): Fix a potential hang caused by accessing the address book directly (#7902)

* Fix a potential hang accessing the address book directly

* Remove unused connection shutdown MetaAddr arguments

* Add an UpdateConnected MetaAddrChange, that sends initial connection info

* Fix some tests

* Fix a panic with a zero channel size
This commit is contained in:
teor 2023-11-06 08:28:58 +10:00 committed by GitHub
parent f836f7f849
commit 43e54d1cb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 118 additions and 128 deletions

View File

@ -128,9 +128,7 @@ fn address_book_peer_order() {
fn reconnection_peers_skips_recently_updated_ip() { fn reconnection_peers_skips_recently_updated_ip() {
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent: // tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
// - `last_response` // - `last_response`
test_reconnection_peers_skips_recently_updated_ip(true, |addr| { test_reconnection_peers_skips_recently_updated_ip(true, MetaAddr::new_responded);
MetaAddr::new_responded(addr, &PeerServices::NODE_NETWORK)
});
// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent: // tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
// - `last_attempt` // - `last_attempt`

View File

@ -1,6 +1,6 @@
//! The timestamp collector collects liveness information from peers. //! The timestamp collector collects liveness information from peers.
use std::{net::SocketAddr, sync::Arc}; use std::{cmp::max, net::SocketAddr, sync::Arc};
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
@ -13,6 +13,9 @@ use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config, address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
}; };
/// The minimum size of the address book updater channel.
pub const MIN_CHANNEL_SIZE: usize = 10;
/// The `AddressBookUpdater` hooks into incoming message streams for each peer /// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For /// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or /// example, it can be used to record per-connection last-seen timestamps, or
@ -46,7 +49,10 @@ impl AddressBookUpdater {
) { ) {
// Create an mpsc channel for peerset address book updates, // Create an mpsc channel for peerset address book updates,
// based on the maximum number of inbound and outbound peers. // based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit()); let (worker_tx, mut worker_rx) = mpsc::channel(max(
config.peerset_total_connection_limit(),
MIN_CHANNEL_SIZE,
));
let address_book = AddressBook::new( let address_book = AddressBook::new(
local_listener, local_listener,

View File

@ -280,6 +280,16 @@ pub enum MetaAddrChange {
addr: PeerSocketAddr, addr: PeerSocketAddr,
}, },
/// Updates an existing `MetaAddr` when we've made a successful connection with a peer.
UpdateConnected {
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "canonical_peer_addr_strategy()")
)]
addr: PeerSocketAddr,
services: PeerServices,
},
/// Updates an existing `MetaAddr` when a peer responds with a message. /// Updates an existing `MetaAddr` when a peer responds with a message.
UpdateResponded { UpdateResponded {
#[cfg_attr( #[cfg_attr(
@ -287,7 +297,6 @@ pub enum MetaAddrChange {
proptest(strategy = "canonical_peer_addr_strategy()") proptest(strategy = "canonical_peer_addr_strategy()")
)] )]
addr: PeerSocketAddr, addr: PeerSocketAddr,
services: PeerServices,
}, },
/// Updates an existing `MetaAddr` when a peer fails. /// Updates an existing `MetaAddr` when a peer fails.
@ -345,8 +354,8 @@ impl MetaAddr {
}) })
} }
/// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just /// Returns a [`MetaAddrChange::UpdateConnected`] for a peer that has just successfully
/// sent us a message. /// connected.
/// ///
/// # Security /// # Security
/// ///
@ -354,16 +363,33 @@ impl MetaAddr {
/// and the services must be the services from that peer's handshake. /// and the services must be the services from that peer's handshake.
/// ///
/// Otherwise: /// Otherwise:
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook) state, /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
/// or /// state, or
/// - Zebra could advertise unreachable addresses to its own peers. /// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_responded(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange { pub fn new_connected(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange {
UpdateResponded { UpdateConnected {
addr: canonical_peer_addr(*addr), addr: canonical_peer_addr(*addr),
services: *services, services: *services,
} }
} }
/// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just
/// sent us a message.
///
/// # Security
///
/// This address must be the remote address from an outbound connection.
///
/// Otherwise:
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
/// state, or
/// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_responded(addr: PeerSocketAddr) -> MetaAddrChange {
UpdateResponded {
addr: canonical_peer_addr(*addr),
}
}
/// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we /// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we
/// want to make an outbound connection to. /// want to make an outbound connection to.
pub fn new_reconnect(addr: PeerSocketAddr) -> MetaAddrChange { pub fn new_reconnect(addr: PeerSocketAddr) -> MetaAddrChange {
@ -391,8 +417,7 @@ impl MetaAddr {
} }
} }
/// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had /// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had an error.
/// an error.
pub fn new_errored( pub fn new_errored(
addr: PeerSocketAddr, addr: PeerSocketAddr,
services: impl Into<Option<PeerServices>>, services: impl Into<Option<PeerServices>>,
@ -404,13 +429,10 @@ impl MetaAddr {
} }
/// Create a new `MetaAddr` for a peer that has just shut down. /// Create a new `MetaAddr` for a peer that has just shut down.
pub fn new_shutdown( pub fn new_shutdown(addr: PeerSocketAddr) -> MetaAddrChange {
addr: PeerSocketAddr,
services: impl Into<Option<PeerServices>>,
) -> MetaAddrChange {
// TODO: if the peer shut down in the Responded state, preserve that // TODO: if the peer shut down in the Responded state, preserve that
// state. All other states should be treated as (timeout) errors. // state. All other states should be treated as (timeout) errors.
MetaAddr::new_errored(addr, services.into()) MetaAddr::new_errored(addr, None)
} }
/// Return the address for this `MetaAddr`. /// Return the address for this `MetaAddr`.
@ -696,6 +718,7 @@ impl MetaAddrChange {
| NewAlternate { addr, .. } | NewAlternate { addr, .. }
| NewLocal { addr, .. } | NewLocal { addr, .. }
| UpdateAttempt { addr } | UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdateResponded { addr, .. } | UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr, | UpdateFailed { addr, .. } => *addr,
} }
@ -712,6 +735,7 @@ impl MetaAddrChange {
| NewAlternate { addr, .. } | NewAlternate { addr, .. }
| NewLocal { addr, .. } | NewLocal { addr, .. }
| UpdateAttempt { addr } | UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdateResponded { addr, .. } | UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr = new_addr, | UpdateFailed { addr, .. } => *addr = new_addr,
} }
@ -721,17 +745,18 @@ impl MetaAddrChange {
pub fn untrusted_services(&self) -> Option<PeerServices> { pub fn untrusted_services(&self) -> Option<PeerServices> {
match self { match self {
NewInitial { .. } => None, NewInitial { .. } => None,
// TODO: split untrusted and direct services (#2324)
NewGossiped { NewGossiped {
untrusted_services, .. untrusted_services, ..
} => Some(*untrusted_services), }
NewAlternate { | NewAlternate {
untrusted_services, .. untrusted_services, ..
} => Some(*untrusted_services), } => Some(*untrusted_services),
// TODO: create a "services implemented by Zebra" constant (#2324) // TODO: create a "services implemented by Zebra" constant (#2324)
NewLocal { .. } => Some(PeerServices::NODE_NETWORK), NewLocal { .. } => Some(PeerServices::NODE_NETWORK),
UpdateAttempt { .. } => None, UpdateAttempt { .. } => None,
// TODO: split untrusted and direct services (#2324) UpdateConnected { services, .. } => Some(*services),
UpdateResponded { services, .. } => Some(*services), UpdateResponded { .. } => None,
UpdateFailed { services, .. } => *services, UpdateFailed { services, .. } => *services,
} }
} }
@ -747,9 +772,10 @@ impl MetaAddrChange {
NewAlternate { .. } => None, NewAlternate { .. } => None,
// We know that our local listener is available // We know that our local listener is available
NewLocal { .. } => Some(now), NewLocal { .. } => Some(now),
UpdateAttempt { .. } => None, UpdateAttempt { .. }
UpdateResponded { .. } => None, | UpdateConnected { .. }
UpdateFailed { .. } => None, | UpdateResponded { .. }
| UpdateFailed { .. } => None,
} }
} }
@ -775,33 +801,29 @@ impl MetaAddrChange {
/// Return the last attempt for this change, if available. /// Return the last attempt for this change, if available.
pub fn last_attempt(&self, now: Instant) -> Option<Instant> { pub fn last_attempt(&self, now: Instant) -> Option<Instant> {
match self { match self {
NewInitial { .. } => None, NewInitial { .. } | NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
// Attempt changes are applied before we start the handshake to the // Attempt changes are applied before we start the handshake to the
// peer address. So the attempt time is a lower bound for the actual // peer address. So the attempt time is a lower bound for the actual
// handshake time. // handshake time.
UpdateAttempt { .. } => Some(now), UpdateAttempt { .. } => Some(now),
UpdateResponded { .. } => None, UpdateConnected { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None,
UpdateFailed { .. } => None,
} }
} }
/// Return the last response for this change, if available. /// Return the last response for this change, if available.
pub fn last_response(&self, now: DateTime32) -> Option<DateTime32> { pub fn last_response(&self, now: DateTime32) -> Option<DateTime32> {
match self { match self {
NewInitial { .. } => None, NewInitial { .. }
NewGossiped { .. } => None, | NewGossiped { .. }
NewAlternate { .. } => None, | NewAlternate { .. }
NewLocal { .. } => None, | NewLocal { .. }
UpdateAttempt { .. } => None, | UpdateAttempt { .. } => None,
// If there is a large delay applying this change, then: // If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` state for longer, // - the peer might stay in the `AttemptPending` state for longer,
// - we might send outdated last seen times to our peers, and // - we might send outdated last seen times to our peers, and
// - the peer will appear to be live for longer, delaying future // - the peer will appear to be live for longer, delaying future
// reconnection attempts. // reconnection attempts.
UpdateResponded { .. } => Some(now), UpdateConnected { .. } | UpdateResponded { .. } => Some(now),
UpdateFailed { .. } => None, UpdateFailed { .. } => None,
} }
} }
@ -809,12 +831,13 @@ impl MetaAddrChange {
/// Return the last failure for this change, if available. /// Return the last failure for this change, if available.
pub fn last_failure(&self, now: Instant) -> Option<Instant> { pub fn last_failure(&self, now: Instant) -> Option<Instant> {
match self { match self {
NewInitial { .. } => None, NewInitial { .. }
NewGossiped { .. } => None, | NewGossiped { .. }
NewAlternate { .. } => None, | NewAlternate { .. }
NewLocal { .. } => None, | NewLocal { .. }
UpdateAttempt { .. } => None, | UpdateAttempt { .. }
UpdateResponded { .. } => None, | UpdateConnected { .. }
| UpdateResponded { .. } => None,
// If there is a large delay applying this change, then: // If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` or `Responded` // - the peer might stay in the `AttemptPending` or `Responded`
// states for longer, and // states for longer, and
@ -833,7 +856,7 @@ impl MetaAddrChange {
// local listeners get sanitized, so the state doesn't matter here // local listeners get sanitized, so the state doesn't matter here
NewLocal { .. } => NeverAttemptedGossiped, NewLocal { .. } => NeverAttemptedGossiped,
UpdateAttempt { .. } => AttemptPending, UpdateAttempt { .. } => AttemptPending,
UpdateResponded { .. } => Responded, UpdateConnected { .. } | UpdateResponded { .. } => Responded,
UpdateFailed { .. } => Failed, UpdateFailed { .. } => Failed,
} }
} }

View File

@ -170,7 +170,7 @@ fn recently_responded_peer_is_gossipable() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");
@ -191,7 +191,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let mut peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");
@ -222,7 +222,7 @@ fn responded_long_ago_peer_is_not_gossipable() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let mut peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");
@ -253,7 +253,7 @@ fn long_delayed_change_is_not_applied() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");
@ -297,7 +297,7 @@ fn later_revert_change_is_applied() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");
@ -340,7 +340,7 @@ fn concurrent_state_revert_change_is_not_applied() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");
@ -400,7 +400,7 @@ fn concurrent_state_progress_change_is_applied() {
.into_new_meta_addr(instant_now, local_now); .into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded // Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK) let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now) .apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer"); .expect("Failed to create MetaAddr for responded peer");

View File

@ -929,12 +929,13 @@ where
let _ = address_book_updater.send(alt_addr).await; let _ = address_book_updater.send(alt_addr).await;
} }
// The handshake succeeded: update the peer status from AttemptPending to Responded // The handshake succeeded: update the peer status from AttemptPending to Responded,
// and send initial connection info.
if let Some(book_addr) = connected_addr.get_address_book_addr() { if let Some(book_addr) = connected_addr.get_address_book_addr() {
// the collector doesn't depend on network activity, // the collector doesn't depend on network activity,
// so this await should not hang // so this await should not hang
let _ = address_book_updater let _ = address_book_updater
.send(MetaAddr::new_responded(book_addr, &remote_services)) .send(MetaAddr::new_connected(book_addr, &remote_services))
.await; .await;
} }
@ -1075,7 +1076,6 @@ where
let heartbeat_task = tokio::spawn( let heartbeat_task = tokio::spawn(
send_periodic_heartbeats_with_shutdown_handle( send_periodic_heartbeats_with_shutdown_handle(
connected_addr, connected_addr,
remote_services,
shutdown_rx, shutdown_rx,
server_tx.clone(), server_tx.clone(),
address_book_updater.clone(), address_book_updater.clone(),
@ -1213,7 +1213,6 @@ pub(crate) async fn register_inventory_status(
/// Returning from this function terminates the connection's heartbeat task. /// Returning from this function terminates the connection's heartbeat task.
async fn send_periodic_heartbeats_with_shutdown_handle( async fn send_periodic_heartbeats_with_shutdown_handle(
connected_addr: ConnectedAddr, connected_addr: ConnectedAddr,
remote_services: PeerServices,
shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>, shutdown_rx: oneshot::Receiver<CancelHeartbeatTask>,
server_tx: futures::channel::mpsc::Sender<ClientRequest>, server_tx: futures::channel::mpsc::Sender<ClientRequest>,
heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>, heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
@ -1222,7 +1221,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
let heartbeat_run_loop = send_periodic_heartbeats_run_loop( let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
connected_addr, connected_addr,
remote_services,
server_tx, server_tx,
heartbeat_ts_collector.clone(), heartbeat_ts_collector.clone(),
); );
@ -1246,7 +1244,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
PeerError::ClientCancelledHeartbeatTask, PeerError::ClientCancelledHeartbeatTask,
&heartbeat_ts_collector, &heartbeat_ts_collector,
&connected_addr, &connected_addr,
&remote_services,
) )
.await .await
} }
@ -1256,7 +1253,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
PeerError::ClientDropped, PeerError::ClientDropped,
&heartbeat_ts_collector, &heartbeat_ts_collector,
&connected_addr, &connected_addr,
&remote_services,
) )
.await .await
} }
@ -1275,7 +1271,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
/// See `send_periodic_heartbeats_with_shutdown_handle` for details. /// See `send_periodic_heartbeats_with_shutdown_handle` for details.
async fn send_periodic_heartbeats_run_loop( async fn send_periodic_heartbeats_run_loop(
connected_addr: ConnectedAddr, connected_addr: ConnectedAddr,
remote_services: PeerServices,
mut server_tx: futures::channel::mpsc::Sender<ClientRequest>, mut server_tx: futures::channel::mpsc::Sender<ClientRequest>,
heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>, heartbeat_ts_collector: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> Result<(), BoxError> { ) -> Result<(), BoxError> {
@ -1294,13 +1289,7 @@ async fn send_periodic_heartbeats_run_loop(
// We've reached another heartbeat interval without // We've reached another heartbeat interval without
// shutting down, so do a heartbeat request. // shutting down, so do a heartbeat request.
let heartbeat = send_one_heartbeat(&mut server_tx); let heartbeat = send_one_heartbeat(&mut server_tx);
heartbeat_timeout( heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
heartbeat,
&heartbeat_ts_collector,
&connected_addr,
&remote_services,
)
.await?;
// # Security // # Security
// //
@ -1312,7 +1301,7 @@ async fn send_periodic_heartbeats_run_loop(
// the collector doesn't depend on network activity, // the collector doesn't depend on network activity,
// so this await should not hang // so this await should not hang
let _ = heartbeat_ts_collector let _ = heartbeat_ts_collector
.send(MetaAddr::new_responded(book_addr, &remote_services)) .send(MetaAddr::new_responded(book_addr))
.await; .await;
} }
} }
@ -1375,29 +1364,16 @@ async fn heartbeat_timeout<F, T>(
fut: F, fut: F,
address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>, address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr, connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, BoxError> ) -> Result<T, BoxError>
where where
F: Future<Output = Result<T, BoxError>>, F: Future<Output = Result<T, BoxError>>,
{ {
let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await { let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
Ok(inner_result) => { Ok(inner_result) => {
handle_heartbeat_error( handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
inner_result,
address_book_updater,
connected_addr,
remote_services,
)
.await?
} }
Err(elapsed) => { Err(elapsed) => {
handle_heartbeat_error( handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
Err(elapsed),
address_book_updater,
connected_addr,
remote_services,
)
.await?
} }
}; };
@ -1409,7 +1385,6 @@ async fn handle_heartbeat_error<T, E>(
result: Result<T, E>, result: Result<T, E>,
address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>, address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr, connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<T, E> ) -> Result<T, E>
where where
E: std::fmt::Debug, E: std::fmt::Debug,
@ -1427,7 +1402,7 @@ where
// - after the first error or shutdown, the peer is disconnected // - after the first error or shutdown, the peer is disconnected
if let Some(book_addr) = connected_addr.get_address_book_addr() { if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = address_book_updater let _ = address_book_updater
.send(MetaAddr::new_errored(book_addr, *remote_services)) .send(MetaAddr::new_errored(book_addr, None))
.await; .await;
} }
Err(err) Err(err)
@ -1440,13 +1415,12 @@ async fn handle_heartbeat_shutdown(
peer_error: PeerError, peer_error: PeerError,
address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>, address_book_updater: &tokio::sync::mpsc::Sender<MetaAddrChange>,
connected_addr: &ConnectedAddr, connected_addr: &ConnectedAddr,
remote_services: &PeerServices,
) -> Result<(), BoxError> { ) -> Result<(), BoxError> {
tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat"); tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
if let Some(book_addr) = connected_addr.get_address_book_addr() { if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = address_book_updater let _ = address_book_updater
.send(MetaAddr::new_shutdown(book_addr, *remote_services)) .send(MetaAddr::new_shutdown(book_addr))
.await; .await;
} }

View File

@ -414,6 +414,8 @@ where
} }
/// Returns the address book for this `CandidateSet`. /// Returns the address book for this `CandidateSet`.
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> { pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
self.address_book.clone() self.address_book.clone()
} }

View File

@ -29,7 +29,6 @@ use tokio_stream::wrappers::IntervalStream;
use tower::{ use tower::{
buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt, buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
}; };
use tracing::Span;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics}; use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
@ -197,7 +196,7 @@ where
config.clone(), config.clone(),
outbound_connector.clone(), outbound_connector.clone(),
peerset_tx.clone(), peerset_tx.clone(),
address_book_updater, address_book_updater.clone(),
); );
let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span()); let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
@ -242,6 +241,7 @@ where
outbound_connector, outbound_connector,
peerset_tx, peerset_tx,
active_outbound_connections, active_outbound_connections,
address_book_updater,
); );
let crawl_guard = tokio::spawn(crawl_fut.in_current_span()); let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
@ -740,6 +740,7 @@ enum CrawlerAction {
/// ///
/// Uses `active_outbound_connections` to limit the number of active outbound connections /// Uses `active_outbound_connections` to limit the number of active outbound connections
/// across both the initial peers and crawler. The limit is based on `config`. /// across both the initial peers and crawler. The limit is based on `config`.
#[allow(clippy::too_many_arguments)]
#[instrument( #[instrument(
skip( skip(
config, config,
@ -749,6 +750,7 @@ enum CrawlerAction {
outbound_connector, outbound_connector,
peerset_tx, peerset_tx,
active_outbound_connections, active_outbound_connections,
address_book_updater,
), ),
fields( fields(
new_peer_interval = ?config.crawl_new_peer_interval, new_peer_interval = ?config.crawl_new_peer_interval,
@ -762,6 +764,7 @@ async fn crawl_and_dial<C, S>(
outbound_connector: C, outbound_connector: C,
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>, peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
mut active_outbound_connections: ActiveConnectionCounter, mut active_outbound_connections: ActiveConnectionCounter,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> Result<(), BoxError> ) -> Result<(), BoxError>
where where
C: Service< C: Service<
@ -783,8 +786,6 @@ where
"starting the peer address crawler", "starting the peer address crawler",
); );
let address_book = candidates.address_book().await;
// # Concurrency // # Concurrency
// //
// Allow tasks using the candidate set to be spawned, so they can run concurrently. // Allow tasks using the candidate set to be spawned, so they can run concurrently.
@ -857,7 +858,7 @@ where
let candidates = candidates.clone(); let candidates = candidates.clone();
let outbound_connector = outbound_connector.clone(); let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone(); let peerset_tx = peerset_tx.clone();
let address_book = address_book.clone(); let address_book_updater = address_book_updater.clone();
let demand_tx = demand_tx.clone(); let demand_tx = demand_tx.clone();
// Increment the connection count before we spawn the connection. // Increment the connection count before we spawn the connection.
@ -886,7 +887,7 @@ where
outbound_connector, outbound_connector,
outbound_connection_tracker, outbound_connection_tracker,
peerset_tx, peerset_tx,
address_book, address_book_updater,
demand_tx, demand_tx,
) )
.await?; .await?;
@ -1020,7 +1021,7 @@ where
outbound_connector, outbound_connector,
outbound_connection_tracker, outbound_connection_tracker,
peerset_tx, peerset_tx,
address_book, address_book_updater,
demand_tx demand_tx
))] ))]
async fn dial<C>( async fn dial<C>(
@ -1028,7 +1029,7 @@ async fn dial<C>(
mut outbound_connector: C, mut outbound_connector: C,
outbound_connection_tracker: ConnectionTracker, outbound_connection_tracker: ConnectionTracker,
mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>, mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
address_book: Arc<std::sync::Mutex<AddressBook>>, address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
mut demand_tx: futures::channel::mpsc::Sender<MorePeers>, mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
) -> Result<(), BoxError> ) -> Result<(), BoxError>
where where
@ -1069,8 +1070,8 @@ where
} }
// The connection was never opened, or it failed the handshake and was dropped. // The connection was never opened, or it failed the handshake and was dropped.
Err(error) => { Err(error) => {
debug!(?error, ?candidate.addr, "failed to make outbound connection to peer"); info!(?error, ?candidate.addr, "failed to make outbound connection to peer");
report_failed(address_book.clone(), candidate).await; report_failed(address_book_updater.clone(), candidate).await;
// The demand signal that was taken out of the queue to attempt to connect to the // The demand signal that was taken out of the queue to attempt to connect to the
// failed candidate never turned into a connection, so add it back. // failed candidate never turned into a connection, so add it back.
@ -1090,25 +1091,15 @@ where
Ok(()) Ok(())
} }
/// Mark `addr` as a failed peer in `address_book`. /// Mark `addr` as a failed peer to `address_book_updater`.
#[instrument(skip(address_book))] #[instrument(skip(address_book_updater))]
async fn report_failed(address_book: Arc<std::sync::Mutex<AddressBook>>, addr: MetaAddr) { async fn report_failed(
let addr = MetaAddr::new_errored(addr.addr, addr.services); address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
addr: MetaAddr,
) {
// The connection info is the same as what's already in the address book.
let addr = MetaAddr::new_errored(addr.addr, None);
// # Correctness // Ignore send errors on Zebra shutdown.
// let _ = address_book_updater.send(addr).await;
// Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
let span = Span::current();
let updated_addr = tokio::task::spawn_blocking(move || {
span.in_scope(|| address_book.lock().unwrap().update(addr))
})
.wait_for_panics()
.await;
assert_eq!(
updated_addr.map(|addr| addr.addr()),
Some(addr.addr()),
"incorrect address updated by address book: \
original: {addr:?}, updated: {updated_addr:?}"
);
} }

View File

@ -24,7 +24,6 @@ use futures::{channel::mpsc, FutureExt, StreamExt};
use indexmap::IndexSet; use indexmap::IndexSet;
use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle}; use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle};
use tower::{service_fn, Layer, Service, ServiceExt}; use tower::{service_fn, Layer, Service, ServiceExt};
use tracing::Span;
use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32}; use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32};
use zebra_test::net::random_known_port; use zebra_test::net::random_known_port;
@ -1517,13 +1516,8 @@ where
config.peerset_initial_target_size = peerset_initial_target_size; config.peerset_initial_target_size = peerset_initial_target_size;
} }
// Manually initialize an address book without a timestamp tracker. let (address_book, address_book_updater, _address_metrics, _address_book_updater_guard) =
let mut address_book = AddressBook::new( AddressBookUpdater::spawn(&config, config.listen_addr);
config.listen_addr,
config.network,
config.max_connections_per_ip,
Span::current(),
);
// Add enough fake peers to go over the limit, even if the limit is zero. // Add enough fake peers to go over the limit, even if the limit is zero.
let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1; let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1;
@ -1540,7 +1534,10 @@ where
.new_gossiped_change() .new_gossiped_change()
.expect("created MetaAddr contains enough information to represent a gossiped address"); .expect("created MetaAddr contains enough information to represent a gossiped address");
address_book.update(addr); address_book
.lock()
.expect("panic in previous thread while accessing the address book")
.update(addr);
} }
// Create a fake peer set. // Create a fake peer set.
@ -1555,8 +1552,6 @@ where
Ok(rsp) Ok(rsp)
}); });
let address_book = Arc::new(std::sync::Mutex::new(address_book));
// Make the channels large enough to hold all the peers. // Make the channels large enough to hold all the peers.
let (peerset_tx, peerset_rx) = mpsc::channel::<DiscoveredPeer>(over_limit_peers); let (peerset_tx, peerset_rx) = mpsc::channel::<DiscoveredPeer>(over_limit_peers);
let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(over_limit_peers); let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(over_limit_peers);
@ -1581,6 +1576,7 @@ where
outbound_connector, outbound_connector,
peerset_tx, peerset_tx,
active_outbound_connections, active_outbound_connections,
address_book_updater,
); );
let crawl_task_handle = tokio::spawn(crawl_fut); let crawl_task_handle = tokio::spawn(crawl_fut);