diff --git a/zebra-network/src/timestamp_collector.rs b/zebra-network/src/address_book_updater.rs similarity index 78% rename from zebra-network/src/timestamp_collector.rs rename to zebra-network/src/address_book_updater.rs index 96b657f15..d49ca57f9 100644 --- a/zebra-network/src/timestamp_collector.rs +++ b/zebra-network/src/address_book_updater.rs @@ -6,12 +6,14 @@ use futures::{channel::mpsc, prelude::*}; use crate::{meta_addr::MetaAddrChange, AddressBook}; -/// The timestamp collector hooks into incoming message streams for each peer and -/// records per-connection last-seen timestamps into an [`AddressBook`]. -pub struct TimestampCollector {} +/// The `AddressBookUpdater` hooks into incoming message streams for each peer +/// and lets the owner of the sender handle update the address book. For +/// example, it can be used to record per-connection last-seen timestamps, or +/// add new initial peers to the address book. +pub struct AddressBookUpdater {} -impl TimestampCollector { - /// Spawn a new [`TimestampCollector`] task, updating a new [`AddressBook`] +impl AddressBookUpdater { + /// Spawn a new [`AddressBookUpdater`] task, updating a new [`AddressBook`] /// configured with a `local_listener`. /// /// Returns handles for the transmission channel for timestamp events, and diff --git a/zebra-network/src/lib.rs b/zebra-network/src/lib.rs index 9e7bfec31..045e60947 100644 --- a/zebra-network/src/lib.rs +++ b/zebra-network/src/lib.rs @@ -56,6 +56,7 @@ extern crate bitflags; pub type BoxError = Box; mod address_book; +mod address_book_updater; mod config; pub mod constants; mod isolated; @@ -64,7 +65,6 @@ mod peer; mod peer_set; mod policies; mod protocol; -mod timestamp_collector; pub use crate::{ address_book::AddressBook, diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index e63394455..ff0aaae1f 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -186,6 +186,15 @@ pub struct MetaAddr { #[derive(Copy, Clone, Debug, Eq, PartialEq)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum MetaAddrChange { + /// Creates a `MetaAddr` for an initial peer. + NewInitial { + #[cfg_attr( + any(test, feature = "proptest-impl"), + proptest(strategy = "canonical_socket_addr_strategy()") + )] + addr: SocketAddr, + }, + /// Creates a new gossiped `MetaAddr`. NewGossiped { #[cfg_attr( @@ -250,6 +259,14 @@ pub enum MetaAddrChange { } impl MetaAddr { + /// Returns a [`MetaAddrChange::NewInitial`] for a peer that was excluded from + /// the list of the initial peers. + pub fn new_initial_peer(addr: SocketAddr) -> MetaAddrChange { + NewInitial { + addr: canonical_socket_addr(addr), + } + } + /// Returns a new `MetaAddr`, based on the deserialized fields from a /// gossiped peer [`Addr`][crate::protocol::external::Message::Addr] message. pub fn new_gossiped_meta_addr( @@ -562,7 +579,8 @@ impl MetaAddrChange { /// Return the address for this change. pub fn addr(&self) -> SocketAddr { match self { - NewGossiped { addr, .. } + NewInitial { addr } + | NewGossiped { addr, .. } | NewAlternate { addr, .. } | NewLocal { addr, .. } | UpdateAttempt { addr } @@ -577,7 +595,8 @@ impl MetaAddrChange { /// This method should only be used in tests. pub fn set_addr(&mut self, new_addr: SocketAddr) { match self { - NewGossiped { addr, .. } + NewInitial { addr } + | NewGossiped { addr, .. } | NewAlternate { addr, .. } | NewLocal { addr, .. } | UpdateAttempt { addr } @@ -589,6 +608,7 @@ impl MetaAddrChange { /// Return the untrusted services for this change, if available. pub fn untrusted_services(&self) -> Option { match self { + NewInitial { .. } => None, NewGossiped { untrusted_services, .. } => Some(*untrusted_services), @@ -607,6 +627,7 @@ impl MetaAddrChange { /// Return the untrusted last seen time for this change, if available. pub fn untrusted_last_seen(&self) -> Option { match self { + NewInitial { .. } => None, NewGossiped { untrusted_last_seen, .. @@ -623,6 +644,7 @@ impl MetaAddrChange { /// Return the last attempt for this change, if available. pub fn last_attempt(&self) -> Option { match self { + NewInitial { .. } => None, NewGossiped { .. } => None, NewAlternate { .. } => None, NewLocal { .. } => None, @@ -638,6 +660,7 @@ impl MetaAddrChange { /// Return the last response for this change, if available. pub fn last_response(&self) -> Option { match self { + NewInitial { .. } => None, NewGossiped { .. } => None, NewAlternate { .. } => None, NewLocal { .. } => None, @@ -652,9 +675,10 @@ impl MetaAddrChange { } } - /// Return the last attempt for this change, if available. + /// Return the last failure for this change, if available. pub fn last_failure(&self) -> Option { match self { + NewInitial { .. } => None, NewGossiped { .. } => None, NewAlternate { .. } => None, NewLocal { .. } => None, @@ -672,6 +696,7 @@ impl MetaAddrChange { /// Return the peer connection state for this change. pub fn peer_addr_state(&self) -> PeerAddrState { match self { + NewInitial { .. } => NeverAttemptedGossiped, NewGossiped { .. } => NeverAttemptedGossiped, NewAlternate { .. } => NeverAttemptedAlternate, // local listeners get sanitized, so the state doesn't matter here @@ -685,15 +710,18 @@ impl MetaAddrChange { /// If this change can create a new `MetaAddr`, return that address. pub fn into_new_meta_addr(self) -> Option { match self { - NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => Some(MetaAddr { - addr: self.addr(), - services: self.untrusted_services(), - untrusted_last_seen: self.untrusted_last_seen(), - last_response: None, - last_attempt: None, - last_failure: None, - last_connection_state: self.peer_addr_state(), - }), + NewInitial { .. } | NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => { + Some(MetaAddr { + addr: self.addr(), + services: self.untrusted_services(), + untrusted_last_seen: self.untrusted_last_seen(), + last_response: None, + last_attempt: None, + last_failure: None, + last_connection_state: self.peer_addr_state(), + }) + } + UpdateAttempt { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None, } } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 22b5d6826..d9d4e88e8 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -52,7 +52,7 @@ use crate::{ pub struct Handshake { config: Config, inbound_service: S, - timestamp_collector: mpsc::Sender, + address_book_updater: mpsc::Sender, inv_collector: broadcast::Sender<(InventoryHash, SocketAddr)>, nonces: Arc>>, user_agent: String, @@ -304,7 +304,7 @@ impl fmt::Debug for ConnectedAddr { pub struct Builder { config: Option, inbound_service: Option, - timestamp_collector: Option>, + address_book_updater: Option>, our_services: Option, user_agent: Option, relay: Option, @@ -346,11 +346,11 @@ where /// /// This channel takes `MetaAddr`s, permanent addresses which can be used to /// make outbound connections to peers. - pub fn with_timestamp_collector( + pub fn with_address_book_updater( mut self, - timestamp_collector: mpsc::Sender, + address_book_updater: mpsc::Sender, ) -> Self { - self.timestamp_collector = Some(timestamp_collector); + self.address_book_updater = Some(address_book_updater); self } @@ -382,7 +382,7 @@ where // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self` config: self.config, inbound_service: self.inbound_service, - timestamp_collector: self.timestamp_collector, + address_book_updater: self.address_book_updater, our_services: self.our_services, user_agent: self.user_agent, relay: self.relay, @@ -410,9 +410,9 @@ where let (tx, _) = broadcast::channel(100); tx }); - let timestamp_collector = self.timestamp_collector.unwrap_or_else(|| { - // No timestamp collector was passed, so create a stub channel. - // Dropping the receiver means sends will fail, but we don't care. + let address_book_updater = self.address_book_updater.unwrap_or_else(|| { + // No `AddressBookUpdater` for timestamp collection was passed, so create a stub + // channel. Dropping the receiver means sends will fail, but we don't care. let (tx, _rx) = mpsc::channel(1); tx }); @@ -425,7 +425,7 @@ where config, inbound_service, inv_collector, - timestamp_collector, + address_book_updater, nonces, user_agent, our_services, @@ -449,7 +449,7 @@ where Builder { config: None, inbound_service: None, - timestamp_collector: None, + address_book_updater: None, user_agent: None, our_services: None, relay: None, @@ -709,7 +709,7 @@ where // Clone these upfront, so they can be moved into the future. let nonces = self.nonces.clone(); let inbound_service = self.inbound_service.clone(); - let mut timestamp_collector = self.timestamp_collector.clone(); + let mut address_book_updater = self.address_book_updater.clone(); let inv_collector = self.inv_collector.clone(); let config = self.config.clone(); let user_agent = self.user_agent.clone(); @@ -764,7 +764,7 @@ where for alt_addr in alternate_addrs { let alt_addr = MetaAddr::new_alternate(&alt_addr, &remote_services); // awaiting a local task won't hang - let _ = timestamp_collector.send(alt_addr).await; + let _ = address_book_updater.send(alt_addr).await; } // Set the connection's version to the minimum of the received version or our own. @@ -818,7 +818,7 @@ where // // Every message and error must update the peer address state via // the inbound_ts_collector. - let inbound_ts_collector = timestamp_collector.clone(); + let inbound_ts_collector = address_book_updater.clone(); let inv_collector = inv_collector.clone(); let ts_inner_conn_span = connection_span.clone(); let inv_inner_conn_span = connection_span.clone(); @@ -939,14 +939,14 @@ where // // Returning from the spawned closure terminates the connection's heartbeat task. let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat"); - let heartbeat_ts_collector = timestamp_collector.clone(); + let heartbeat_ts_collector = address_book_updater.clone(); tokio::spawn( async move { use futures::future::Either; let mut shutdown_rx = shutdown_rx; let mut server_tx = server_tx; - let mut timestamp_collector = heartbeat_ts_collector.clone(); + let mut heartbeat_ts_collector = heartbeat_ts_collector.clone(); let mut interval_stream = IntervalStream::new(tokio::time::interval(constants::HEARTBEAT_INTERVAL)); @@ -969,7 +969,7 @@ where tracing::trace!("shutting down due to Client shut down"); if let Some(book_addr) = connected_addr.get_address_book_addr() { // awaiting a local task won't hang - let _ = timestamp_collector + let _ = heartbeat_ts_collector .send(MetaAddr::new_shutdown(&book_addr, remote_services)) .await; } @@ -985,7 +985,7 @@ where let heartbeat = send_one_heartbeat(&mut server_tx); if heartbeat_timeout( heartbeat, - &mut timestamp_collector, + &mut heartbeat_ts_collector, &connected_addr, &remote_services, ) @@ -1058,7 +1058,7 @@ async fn send_one_heartbeat(server_tx: &mut mpsc::Sender) -> Resu /// `handle_heartbeat_error`. async fn heartbeat_timeout( fut: F, - timestamp_collector: &mut mpsc::Sender, + address_book_updater: &mut mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1069,7 +1069,7 @@ where Ok(inner_result) => { handle_heartbeat_error( inner_result, - timestamp_collector, + address_book_updater, connected_addr, remote_services, ) @@ -1078,7 +1078,7 @@ where Err(elapsed) => { handle_heartbeat_error( Err(elapsed), - timestamp_collector, + address_book_updater, connected_addr, remote_services, ) @@ -1089,10 +1089,10 @@ where Ok(t) } -/// If `result.is_err()`, mark `connected_addr` as failed using `timestamp_collector`. +/// If `result.is_err()`, mark `connected_addr` as failed using `address_book_updater`. async fn handle_heartbeat_error( result: Result, - timestamp_collector: &mut mpsc::Sender, + address_book_updater: &mut mpsc::Sender, connected_addr: &ConnectedAddr, remote_services: &PeerServices, ) -> Result @@ -1105,7 +1105,7 @@ where tracing::debug!(?err, "heartbeat error, shutting down"); if let Some(book_addr) = connected_addr.get_address_book_addr() { - let _ = timestamp_collector + let _ = address_book_updater .send(MetaAddr::new_errored(&book_addr, *remote_services)) .await; } diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index daeb92ac0..0c9bf3b1b 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -29,11 +29,11 @@ use tracing_futures::Instrument; use zebra_chain::{chain_tip::ChainTip, parameters::Network}; use crate::{ + address_book_updater::AddressBookUpdater, constants, - meta_addr::MetaAddr, + meta_addr::{MetaAddr, MetaAddrChange}, peer::{self, HandshakeRequest, OutboundConnectorRequest}, peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet}, - timestamp_collector::TimestampCollector, AddressBook, BoxError, Config, Request, Response, }; @@ -95,7 +95,7 @@ where let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; - let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr); + let (address_book, address_book_updater) = AddressBookUpdater::spawn(listen_addr); // Create a broadcast channel for peer inventory advertisements. // If it reaches capacity, this channel drops older inventory advertisements. @@ -118,7 +118,7 @@ where .with_config(config.clone()) .with_inbound_service(inbound_service) .with_inventory_collector(inv_sender) - .with_timestamp_collector(timestamp_collector) + .with_address_book_updater(address_book_updater.clone()) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) .with_latest_chain_tip(latest_chain_tip) @@ -177,6 +177,7 @@ where config.clone(), outbound_connector.clone(), peerset_tx.clone(), + address_book_updater, ); let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); @@ -232,6 +233,7 @@ async fn add_initial_peers( config: Config, outbound_connector: S, mut peerset_tx: mpsc::Sender, + address_book_updater: mpsc::Sender, ) -> Result where S: Service< @@ -241,7 +243,7 @@ where > + Clone, S::Future: Send + 'static, { - let initial_peers = limit_initial_peers(&config).await; + let initial_peers = limit_initial_peers(&config, address_book_updater).await; let mut handshake_success_total: usize = 0; let mut handshake_error_total: usize = 0; @@ -359,26 +361,38 @@ where /// `peerset_initial_target_size`. /// /// The result is randomly chosen entries from the provided set of addresses. -async fn limit_initial_peers(config: &Config) -> HashSet { - let initial_peers = config.initial_peers().await; - let initial_peer_count = initial_peers.len(); +async fn limit_initial_peers( + config: &Config, + mut address_book_updater: mpsc::Sender, +) -> HashSet { + let all_peers = config.initial_peers().await; + let peers_count = all_peers.len(); - // Limit the number of initial peers to `config.peerset_initial_target_size` - if initial_peer_count > config.peerset_initial_target_size { - info!( - "Limiting the initial peers list from {} to {}", - initial_peer_count, config.peerset_initial_target_size - ); + if peers_count <= config.peerset_initial_target_size { + return all_peers; } - let initial_peers_vect: Vec = initial_peers.iter().copied().collect(); + // Limit the number of initial peers to `config.peerset_initial_target_size` + info!( + "Limiting the initial peers list from {} to {}", + peers_count, config.peerset_initial_target_size + ); - // TODO: add unused peers to the AddressBook (#2931) - // https://docs.rs/rand/0.8.4/rand/seq/trait.SliceRandom.html#tymethod.partial_shuffle - initial_peers_vect - .choose_multiple(&mut rand::thread_rng(), config.peerset_initial_target_size) - .copied() - .collect() + // Split all the peers into the `initial_peers` that will be returned and + // `unused_peers` that will be sent to the address book. + let mut all_peers: Vec = all_peers.into_iter().collect(); + let (initial_peers, unused_peers) = + all_peers.partial_shuffle(&mut rand::thread_rng(), config.peerset_initial_target_size); + + // Send the unused peers to the address book. + for peer in unused_peers { + let peer_addr = MetaAddr::new_initial_peer(*peer); + // `send` only waits when the channel is full. + // The address book updater is a separate task, so we will only wait for a short time. + let _ = address_book_updater.send(peer_addr).await; + } + + initial_peers.iter().copied().collect() } /// Open a peer connection listener on `config.listen_addr`, @@ -391,7 +405,7 @@ async fn limit_initial_peers(config: &Config) -> HashSet { /// /// If opening the listener fails. #[instrument(skip(config), fields(addr = ?config.listen_addr))] -async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) { +pub(crate) async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) { // Warn if we're configured using the wrong network port. use Network::*; let wrong_net = match config.network { diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index d7ef0bd8a..d4bdb1a7c 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -32,6 +32,7 @@ use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::Dat use zebra_test::net::random_known_port; use crate::{ + address_book_updater::AddressBookUpdater, constants, init, meta_addr::MetaAddr, peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest}, @@ -1140,7 +1141,7 @@ async fn add_initial_peers_is_rate_limited() { let before = Instant::now(); let (initial_peers_task_handle, peerset_rx) = - spawn_add_initial_peers(PEER_COUNT, outbound_connector); + spawn_add_initial_peers(PEER_COUNT, outbound_connector).await; let connections = peerset_rx.take(PEER_COUNT).collect::>().await; let elapsed = Instant::now() - before; @@ -1161,6 +1162,43 @@ async fn add_initial_peers_is_rate_limited() { ); } +/// Test that [`init`] does not deadlock. +#[tokio::test] +async fn network_init_deadlock() { + // The `PEER_COUNT` is the amount of initial seed peers. The value is set so + // that the peers fill up `PEERSET_INITIAL_TARGET_SIZE`, fill up the channel + // for sending unused peers to the `AddressBook`, and so that there are + // still some extra peers left. + const PEER_COUNT: usize = 200; + const PEERSET_INITIAL_TARGET_SIZE: usize = 2; + const TIME_LIMIT: Duration = Duration::from_secs(10); + + zebra_test::init(); + + // Create a list of dummy IPs, and initialize a config using them as the + // initial peers. The amount of these peers will overflow + // `PEERSET_INITIAL_TARGET_SIZE`. + let mut peers = HashSet::new(); + for address_number in 0..PEER_COUNT { + peers.insert( + SocketAddr::new(Ipv4Addr::new(127, 1, 1, address_number as _).into(), 1).to_string(), + ); + } + + let config = Config { + initial_mainnet_peers: peers, + peerset_initial_target_size: PEERSET_INITIAL_TARGET_SIZE, + network: Network::Mainnet, + ..Config::default() + }; + + let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) }); + + let init_future = init(config, nil_inbound_service, NoChainTip); + + assert!(tokio::time::timeout(TIME_LIMIT, init_future).await.is_ok()); +} + /// Open a local listener on `listen_addr` for `network`. /// Asserts that the local listener address works as expected. async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) { @@ -1442,7 +1480,7 @@ where /// Dummy IPs are used. /// /// Returns the task [`JoinHandle`], and the peer set receiver. -fn spawn_add_initial_peers( +async fn spawn_add_initial_peers( peer_count: usize, outbound_connector: C, ) -> ( @@ -1475,7 +1513,10 @@ where let (peerset_tx, peerset_rx) = mpsc::channel::(peer_count + 1); - let add_fut = add_initial_peers(config, outbound_connector, peerset_tx); + let (_tcp_listener, listen_addr) = open_listener(&config.clone()).await; + let (_address_book, address_book_updater) = AddressBookUpdater::spawn(listen_addr); + + let add_fut = add_initial_peers(config, outbound_connector, peerset_tx, address_book_updater); let add_task_handle = tokio::spawn(add_fut); (add_task_handle, peerset_rx)