diff --git a/Cargo.lock b/Cargo.lock index cee2bdb83..7accc50c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,7 +308,7 @@ dependencies = [ "cexpr", "clang-sys", "clap", - "env_logger", + "env_logger 0.8.3", "lazy_static", "lazycell", "log", @@ -1070,6 +1070,16 @@ dependencies = [ "syn 1.0.60", ] +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.8.3" @@ -2181,6 +2191,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-map" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ac8f4a4a06c811aa24b151dbb3fe19f687cb52e0d5cca0493671ed88f973970" +dependencies = [ + "quickcheck", + "quickcheck_macros", +] + [[package]] name = "owning_ref" version = "0.4.1" @@ -2498,6 +2518,29 @@ dependencies = [ "memchr", ] +[[package]] +name = "quickcheck" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44883e74aa97ad63db83c4bf8ca490f02b2fc02f92575e720c8551e843c945f" +dependencies = [ + "env_logger 0.7.1", + "log", + "rand 0.7.3", + "rand_core 0.5.1", +] + +[[package]] +name = "quickcheck_macros" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608c156fd8e97febc07dc9c2e2c80bf74cfc6ef26893eae3daf8bc2bc94a4b7f" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.60", +] + [[package]] name = "quote" version = "0.6.13" @@ -4396,6 +4439,7 @@ dependencies = [ "hex", "lazy_static", "metrics", + "ordered-map", "pin-project 1.0.7", "proptest", "proptest-derive", diff --git a/deny.toml b/deny.toml index af1077266..bf5088600 100644 --- a/deny.toml +++ b/deny.toml @@ -46,6 +46,10 @@ skip-tree = [ # ticket #3063: redjubjub dependencies { name = "redjubjub", version = "=0.4.0" }, + # ordered-map dependencies that should be dev-dependencies + # https://github.com/qwfy/ordered-map/pull/1 + { name = "env_logger", version = "=0.7.1" }, + # ticket #2984: owo-colors dependencies { name = "color-eyre", version = "=0.5.11" }, diff --git a/zebra-chain/src/serialization/date_time.rs b/zebra-chain/src/serialization/date_time.rs index bfd722820..3d903ab45 100644 --- a/zebra-chain/src/serialization/date_time.rs +++ b/zebra-chain/src/serialization/date_time.rs @@ -1,14 +1,14 @@ //! DateTime types with specific serialization invariants. -use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use chrono::{TimeZone, Utc}; - use std::{ convert::{TryFrom, TryInto}, fmt, num::TryFromIntError, }; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use chrono::{TimeZone, Utc}; + use super::{SerializationError, ZcashDeserialize, ZcashSerialize}; /// A date and time, represented by a 32-bit number of seconds since the UNIX epoch. @@ -50,7 +50,7 @@ impl DateTime32 { /// /// If the number of seconds since the UNIX epoch is greater than `u32::MAX`. pub fn now() -> DateTime32 { - Utc::now() + chrono::Utc::now() .try_into() .expect("unexpected out of range chrono::DateTime") } @@ -71,14 +71,18 @@ impl DateTime32 { /// Returns the duration elapsed since this time, /// or if this time is in the future, returns `None`. - pub fn checked_elapsed(&self) -> Option { - DateTime32::now().checked_duration_since(*self) + pub fn checked_elapsed(&self, now: chrono::DateTime) -> Option { + DateTime32::try_from(now) + .expect("unexpected out of range chrono::DateTime") + .checked_duration_since(*self) } /// Returns the duration elapsed since this time, /// or if this time is in the future, returns zero. - pub fn saturating_elapsed(&self) -> Duration32 { - DateTime32::now().saturating_duration_since(*self) + pub fn saturating_elapsed(&self, now: chrono::DateTime) -> Duration32 { + DateTime32::try_from(now) + .expect("unexpected out of range chrono::DateTime") + .saturating_duration_since(*self) } /// Returns the time that is `duration` after this time. diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 1e264e816..cc964c577 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1.1.0" chrono = "0.4" hex = "0.4" lazy_static = "1.4.0" +ordered-map = "0.4.2" pin-project = "1.0.7" rand = "0.8" regex = "1" diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index aac50f4fb..022ae1932 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -1,13 +1,10 @@ //! The `AddressBook` manages information about what peers exist, when they were //! seen, and what services they provide. -use std::{ - collections::{BTreeSet, HashMap}, - iter::Extend, - net::SocketAddr, - time::Instant, -}; +use std::{cmp::Reverse, iter::Extend, net::SocketAddr, time::Instant}; +use chrono::Utc; +use ordered_map::OrderedMap; use tracing::Span; use crate::{ @@ -53,8 +50,14 @@ mod tests; /// - the canonical address of any connection. #[derive(Clone, Debug)] pub struct AddressBook { - /// Each known peer address has a matching `MetaAddr`. - by_addr: HashMap, + /// Peer listener addresses, suitable for outbound connections, + /// in connection attempt order. + /// + /// Some peers in this list might have open outbound or inbound connections. + /// + /// We reverse the comparison order, because the standard library ([`BTreeMap`]) + /// sorts in ascending order, but [`OrderedMap`] sorts in descending order. + by_addr: OrderedMap>, /// The local listener address. local_listener: SocketAddr, @@ -99,20 +102,27 @@ impl AddressBook { let constructor_span = span.clone(); let _guard = constructor_span.enter(); + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let mut new_book = AddressBook { - by_addr: HashMap::default(), + by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)), local_listener: canonical_socket_addr(local_listener), span, last_address_log: None, }; - new_book.update_metrics(); + new_book.update_metrics(instant_now, chrono_now); new_book } /// Construct an [`AddressBook`] with the given `local_listener`, /// [`tracing::Span`], and addresses. /// + /// If there are multiple [`MetaAddr`]s with the same address, + /// an arbitrary address is inserted into the address book, + /// and the rest are dropped. + /// /// This constructor can be used to break address book invariants, /// so it should only be used in tests. #[cfg(any(test, feature = "proptest-impl"))] @@ -121,6 +131,12 @@ impl AddressBook { span: Span, addrs: impl IntoIterator, ) -> AddressBook { + let constructor_span = span.clone(); + let _guard = constructor_span.enter(); + + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let mut new_book = AddressBook::new(local_listener, span); let addrs = addrs @@ -131,9 +147,13 @@ impl AddressBook { }) .filter(MetaAddr::address_is_valid_for_outbound) .map(|meta_addr| (meta_addr.addr, meta_addr)); - new_book.by_addr.extend(addrs); - new_book.update_metrics(); + for (socket_addr, meta_addr) in addrs { + // overwrite any duplicate addresses + new_book.by_addr.insert(socket_addr, meta_addr); + } + + new_book.update_metrics(instant_now, chrono_now); new_book } @@ -147,7 +167,7 @@ impl AddressBook { } /// Get the contents of `self` in random order with sanitized timestamps. - pub fn sanitized(&self) -> Vec { + pub fn sanitized(&self, now: chrono::DateTime) -> Vec { use rand::seq::SliceRandom; let _guard = self.span.enter(); @@ -161,7 +181,7 @@ impl AddressBook { // Then sanitize and shuffle let mut peers = peers - .values() + .descending_values() .filter_map(MetaAddr::sanitize) // Security: remove peers that: // - last responded more than three hours ago, or @@ -170,12 +190,28 @@ impl AddressBook { // This prevents Zebra from gossiping nodes that are likely unreachable. Gossiping such // nodes impacts the network health, because connection attempts end up being wasted on // peers that are less likely to respond. - .filter(MetaAddr::is_active_for_gossip) + .filter(|addr| addr.is_active_for_gossip(now)) .collect::>(); peers.shuffle(&mut rand::thread_rng()); peers } + /// Look up `addr` in the address book, and return its [`MetaAddr`]. + /// + /// Converts `addr` to a canonical address before looking it up. + pub fn get(&mut self, addr: &SocketAddr) -> Option { + let addr = canonical_socket_addr(*addr); + + // Unfortunately, `OrderedMap` doesn't implement `get`. + let meta_addr = self.by_addr.remove(&addr); + + if let Some(meta_addr) = meta_addr { + self.by_addr.insert(addr, meta_addr); + } + + meta_addr + } + /// Apply `change` to the address book, returning the updated `MetaAddr`, /// if the change was valid. /// @@ -196,9 +232,13 @@ impl AddressBook { /// [`SocketAddr`]s. Ignored addresses will never be used to connect to /// peers. pub fn update(&mut self, change: MetaAddrChange) -> Option { + let previous = self.get(&change.addr()); + let _guard = self.span.enter(); - let previous = self.by_addr.get(&change.addr()).cloned(); + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + let updated = change.apply_to_meta_addr(previous); trace!( @@ -206,7 +246,7 @@ impl AddressBook { ?updated, ?previous, total_peers = self.by_addr.len(), - recent_peers = self.recently_live_peers().count(), + recent_peers = self.recently_live_peers(chrono_now).count(), ); if let Some(updated) = updated { @@ -231,7 +271,7 @@ impl AddressBook { self.by_addr.insert(updated.addr, updated); std::mem::drop(_guard); - self.update_metrics(); + self.update_metrics(instant_now, chrono_now); } updated @@ -246,39 +286,32 @@ impl AddressBook { #[allow(dead_code)] fn take(&mut self, removed_addr: SocketAddr) -> Option { let _guard = self.span.enter(); + + let instant_now = Instant::now(); + let chrono_now = Utc::now(); + trace!( ?removed_addr, total_peers = self.by_addr.len(), - recent_peers = self.recently_live_peers().count(), + recent_peers = self.recently_live_peers(chrono_now).count(), ); if let Some(entry) = self.by_addr.remove(&removed_addr) { std::mem::drop(_guard); - self.update_metrics(); + self.update_metrics(instant_now, chrono_now); Some(entry) } else { None } } - /// Returns true if the given [`SocketAddr`] has recently sent us a message. - pub fn recently_live_addr(&self, addr: &SocketAddr) -> bool { - let _guard = self.span.enter(); - match self.by_addr.get(addr) { - None => false, - // NeverAttempted, Failed, and AttemptPending peers should never be live - Some(peer) => { - peer.last_connection_state == PeerAddrState::Responded - && peer.has_connection_recently_responded() - } - } - } - /// Returns true if the given [`SocketAddr`] is pending a reconnection /// attempt. - pub fn pending_reconnection_addr(&self, addr: &SocketAddr) -> bool { + pub fn pending_reconnection_addr(&mut self, addr: &SocketAddr) -> bool { + let meta_addr = self.get(addr); + let _guard = self.span.enter(); - match self.by_addr.get(addr) { + match meta_addr { None => false, Some(peer) => peer.last_connection_state == PeerAddrState::AttemptPending, } @@ -286,58 +319,66 @@ impl AddressBook { /// Return an iterator over all peers. /// - /// Returns peers in reconnection attempt order, then recently live peers in - /// an arbitrary order. + /// Returns peers in reconnection attempt order, including recently connected peers. pub fn peers(&'_ self) -> impl Iterator + '_ { let _guard = self.span.enter(); - self.reconnection_peers() - .chain(self.maybe_connected_peers()) + self.by_addr.descending_values().cloned() } /// Return an iterator over peers that are due for a reconnection attempt, /// in reconnection attempt order. - pub fn reconnection_peers(&'_ self) -> impl Iterator + '_ { + pub fn reconnection_peers( + &'_ self, + instant_now: Instant, + chrono_now: chrono::DateTime, + ) -> impl Iterator + '_ { let _guard = self.span.enter(); - // TODO: optimise, if needed, or get rid of older peers - - // Skip live peers, and peers pending a reconnect attempt, then sort using BTreeSet + // Skip live peers, and peers pending a reconnect attempt. + // The peers are already stored in sorted order. self.by_addr - .values() - .filter(|peer| peer.is_ready_for_connection_attempt()) - .collect::>() - .into_iter() + .descending_values() + .filter(move |peer| peer.is_ready_for_connection_attempt(instant_now, chrono_now)) .cloned() } - /// Return an iterator over all the peers in `state`, in arbitrary order. + /// Return an iterator over all the peers in `state`, + /// in reconnection attempt order, including recently connected peers. pub fn state_peers(&'_ self, state: PeerAddrState) -> impl Iterator + '_ { let _guard = self.span.enter(); self.by_addr - .values() + .descending_values() .filter(move |peer| peer.last_connection_state == state) .cloned() } - /// Return an iterator over peers that might be connected, in arbitrary - /// order. - pub fn maybe_connected_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers that might be connected, + /// in reconnection attempt order. + pub fn maybe_connected_peers( + &'_ self, + instant_now: Instant, + chrono_now: chrono::DateTime, + ) -> impl Iterator + '_ { let _guard = self.span.enter(); self.by_addr - .values() - .filter(|peer| !peer.is_ready_for_connection_attempt()) + .descending_values() + .filter(move |peer| !peer.is_ready_for_connection_attempt(instant_now, chrono_now)) .cloned() } - /// Return an iterator over peers we've seen recently, in arbitrary order. - pub fn recently_live_peers(&'_ self) -> impl Iterator + '_ { + /// Return an iterator over peers we've seen recently, + /// in reconnection attempt order. + pub fn recently_live_peers( + &'_ self, + now: chrono::DateTime, + ) -> impl Iterator + '_ { let _guard = self.span.enter(); self.by_addr - .values() - .filter(move |peer| self.recently_live_addr(&peer.addr)) + .descending_values() + .filter(move |peer| peer.was_recently_live(now)) .cloned() } @@ -347,7 +388,7 @@ impl AddressBook { } /// Returns metrics for the addresses in this address book. - pub fn address_metrics(&self) -> AddressMetrics { + pub fn address_metrics(&self, now: chrono::DateTime) -> AddressMetrics { let responded = self.state_peers(PeerAddrState::Responded).count(); let never_attempted_gossiped = self .state_peers(PeerAddrState::NeverAttemptedGossiped) @@ -358,7 +399,7 @@ impl AddressBook { let failed = self.state_peers(PeerAddrState::Failed).count(); let attempt_pending = self.state_peers(PeerAddrState::AttemptPending).count(); - let recently_live = self.recently_live_peers().count(); + let recently_live = self.recently_live_peers(now).count(); let recently_stopped_responding = responded .checked_sub(recently_live) .expect("all recently live peers must have responded"); @@ -375,10 +416,10 @@ impl AddressBook { } /// Update the metrics for this address book. - fn update_metrics(&mut self) { + fn update_metrics(&mut self, instant_now: Instant, chrono_now: chrono::DateTime) { let _guard = self.span.enter(); - let m = self.address_metrics(); + let m = self.address_metrics(chrono_now); // TODO: rename to address_book.[state_name] metrics::gauge!("candidate_set.responded", m.responded as f64); @@ -399,11 +440,11 @@ impl AddressBook { ); std::mem::drop(_guard); - self.log_metrics(&m); + self.log_metrics(&m, instant_now); } /// Log metrics for this address book - fn log_metrics(&mut self, m: &AddressMetrics) { + fn log_metrics(&mut self, m: &AddressMetrics, now: Instant) { let _guard = self.span.enter(); trace!( @@ -419,18 +460,18 @@ impl AddressBook { // every request, use the trace-level logs, or the metrics exporter. if let Some(last_address_log) = self.last_address_log { // Avoid duplicate address logs - if Instant::now().duration_since(last_address_log).as_secs() < 60 { + if now.saturating_duration_since(last_address_log).as_secs() < 60 { return; } } else { // Suppress initial logs until the peer set has started up. // There can be multiple address changes before the first peer has // responded. - self.last_address_log = Some(Instant::now()); + self.last_address_log = Some(now); return; } - self.last_address_log = Some(Instant::now()); + self.last_address_log = Some(now); // if all peers have failed if m.responded + m.attempt_pending diff --git a/zebra-network/src/address_book/tests.rs b/zebra-network/src/address_book/tests.rs index 2bf82ef4e..9cd2b81e2 100644 --- a/zebra-network/src/address_book/tests.rs +++ b/zebra-network/src/address_book/tests.rs @@ -1 +1,4 @@ +//! Tests for the address book. + mod prop; +mod vectors; diff --git a/zebra-network/src/address_book/tests/prop.rs b/zebra-network/src/address_book/tests/prop.rs index 8e0c2a008..048de0f43 100644 --- a/zebra-network/src/address_book/tests/prop.rs +++ b/zebra-network/src/address_book/tests/prop.rs @@ -1,14 +1,17 @@ -use std::net::SocketAddr; +//! Randomised property tests for the address book. +use std::{net::SocketAddr, time::Instant}; + +use chrono::Utc; use proptest::{collection::vec, prelude::*}; use tracing::Span; use zebra_chain::serialization::Duration32; -use super::super::AddressBook; use crate::{ constants::MAX_PEER_ACTIVE_FOR_GOSSIP, meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr}, + AddressBook, }; const TIME_ERROR_MARGIN: Duration32 = Duration32::from_seconds(1); @@ -20,14 +23,15 @@ proptest! { addresses in vec(any::(), 0..MAX_META_ADDR), ) { zebra_test::init(); + let chrono_now = Utc::now(); let address_book = AddressBook::new_with_addrs(local_listener, Span::none(), addresses); - for gossiped_address in address_book.sanitized() { + for gossiped_address in address_book.sanitized(chrono_now) { let duration_since_last_seen = gossiped_address .last_seen() .expect("Peer that was never seen before is being gossiped") - .saturating_elapsed() + .saturating_elapsed(chrono_now) .saturating_sub(TIME_ERROR_MARGIN); prop_assert!(duration_since_last_seen <= MAX_PEER_ACTIVE_FOR_GOSSIP); @@ -41,11 +45,13 @@ proptest! { addresses in vec(any::(), 0..MAX_META_ADDR), ) { zebra_test::init(); + let instant_now = Instant::now(); + let chrono_now = Utc::now(); let address_book = AddressBook::new_with_addrs(local_listener, Span::none(), addresses); - for peer in address_book.reconnection_peers() { - prop_assert!(peer.is_probably_reachable(), "peer: {:?}", peer); + for peer in address_book.reconnection_peers(instant_now, chrono_now) { + prop_assert!(peer.is_probably_reachable(chrono_now), "peer: {:?}", peer); } } } diff --git a/zebra-network/src/address_book/tests/vectors.rs b/zebra-network/src/address_book/tests/vectors.rs new file mode 100644 index 000000000..cab6bf774 --- /dev/null +++ b/zebra-network/src/address_book/tests/vectors.rs @@ -0,0 +1,86 @@ +//! Fixed test vectors for the address book. + +use std::time::Instant; + +use chrono::Utc; +use tracing::Span; + +use zebra_chain::serialization::{DateTime32, Duration32}; + +use crate::{meta_addr::MetaAddr, protocol::external::types::PeerServices, AddressBook}; + +/// Make sure an empty address book is actually empty. +#[test] +fn address_book_empty() { + let address_book = AddressBook::new("0.0.0.0:0".parse().unwrap(), Span::current()); + + assert_eq!( + address_book + .reconnection_peers(Instant::now(), Utc::now()) + .next(), + None + ); + assert_eq!(address_book.len(), 0); +} + +/// Make sure peers are attempted in priority order. +#[test] +fn address_book_peer_order() { + let addr1 = "127.0.0.1:1".parse().unwrap(); + let addr2 = "127.0.0.2:2".parse().unwrap(); + + let mut meta_addr1 = + MetaAddr::new_gossiped_meta_addr(addr1, PeerServices::NODE_NETWORK, DateTime32::MIN); + let mut meta_addr2 = MetaAddr::new_gossiped_meta_addr( + addr2, + PeerServices::NODE_NETWORK, + DateTime32::MIN.saturating_add(Duration32::from_seconds(1)), + ); + + // Regardless of the order of insertion, the most recent address should be chosen first + let addrs = vec![meta_addr1, meta_addr2]; + let address_book = + AddressBook::new_with_addrs("0.0.0.0:0".parse().unwrap(), Span::current(), addrs); + assert_eq!( + address_book + .reconnection_peers(Instant::now(), Utc::now()) + .next(), + Some(meta_addr2), + ); + + // Reverse the order, check that we get the same result + let addrs = vec![meta_addr2, meta_addr1]; + let address_book = + AddressBook::new_with_addrs("0.0.0.0:0".parse().unwrap(), Span::current(), addrs); + assert_eq!( + address_book + .reconnection_peers(Instant::now(), Utc::now()) + .next(), + Some(meta_addr2), + ); + + // Now check that the order depends on the time, not the address + meta_addr1.addr = addr2; + meta_addr2.addr = addr1; + + let addrs = vec![meta_addr1, meta_addr2]; + let address_book = + AddressBook::new_with_addrs("0.0.0.0:0".parse().unwrap(), Span::current(), addrs); + assert_eq!( + address_book + .reconnection_peers(Instant::now(), Utc::now()) + .next(), + Some(meta_addr2), + ); + + // Reverse the order, check that we get the same result + let addrs = vec![meta_addr2, meta_addr1]; + let address_book = + AddressBook::new_with_addrs("0.0.0.0:0".parse().unwrap(), Span::current(), addrs); + assert_eq!( + address_book + .reconnection_peers(Instant::now(), Utc::now()) + .next(), + Some(meta_addr2), + ); +} diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index 0378869ee..48d053009 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -7,6 +7,7 @@ use std::{ time::Instant, }; +use chrono::Utc; use zebra_chain::serialization::DateTime32; use crate::{ @@ -434,10 +435,10 @@ impl MetaAddr { /// connection. Therefore, if the last-seen timestamp is older than /// [`constants::MIN_PEER_RECONNECTION_DELAY`] ago, we know we should have /// disconnected from it. Otherwise, we could potentially be connected to it. - pub fn has_connection_recently_responded(&self) -> bool { + pub fn has_connection_recently_responded(&self, now: chrono::DateTime) -> bool { if let Some(last_response) = self.last_response { // Recent times and future times are considered live - last_response.saturating_elapsed() + last_response.saturating_elapsed(now) <= constants::MIN_PEER_RECONNECTION_DELAY .try_into() .expect("unexpectedly large constant") @@ -451,12 +452,12 @@ impl MetaAddr { /// /// Returns `true` if this peer was recently attempted, or has a connection /// attempt in progress. - pub fn was_connection_recently_attempted(&self) -> bool { + pub fn was_connection_recently_attempted(&self, now: Instant) -> bool { if let Some(last_attempt) = self.last_attempt { // Recent times and future times are considered live. // Instants are monotonic, so `now` should always be later than `last_attempt`, // except for synthetic data in tests. - last_attempt.elapsed() <= constants::MIN_PEER_RECONNECTION_DELAY + now.saturating_duration_since(last_attempt) <= constants::MIN_PEER_RECONNECTION_DELAY } else { // If there has never been any attempt, it can't possibly be live false @@ -466,16 +467,23 @@ impl MetaAddr { /// Have we recently had a failed connection to this peer? /// /// Returns `true` if this peer has recently failed. - pub fn has_connection_recently_failed(&self) -> bool { + pub fn has_connection_recently_failed(&self, now: Instant) -> bool { if let Some(last_failure) = self.last_failure { // Recent times and future times are considered live - last_failure.elapsed() <= constants::MIN_PEER_RECONNECTION_DELAY + now.saturating_duration_since(last_failure) <= constants::MIN_PEER_RECONNECTION_DELAY } else { // If there has never been any failure, it can't possibly be recent false } } + /// Returns true if this peer has recently sent us a message. + pub fn was_recently_live(&self, now: chrono::DateTime) -> bool { + // NeverAttempted, Failed, and AttemptPending peers should never be live + self.last_connection_state == PeerAddrState::Responded + && self.has_connection_recently_responded(now) + } + /// Has this peer been seen recently? /// /// Returns `true` if this peer has responded recently or if the peer was gossiped with a @@ -483,12 +491,12 @@ impl MetaAddr { /// /// [`constants::MAX_PEER_ACTIVE_FOR_GOSSIP`] represents the maximum time since a peer was seen /// to still be considered reachable. - pub fn is_active_for_gossip(&self) -> bool { + pub fn is_active_for_gossip(&self, now: chrono::DateTime) -> bool { if let Some(last_seen) = self.last_seen() { // Correctness: `last_seen` shouldn't ever be in the future, either because we set the // time or because another peer's future time was sanitized when it was added to the // address book - last_seen.saturating_elapsed() <= constants::MAX_PEER_ACTIVE_FOR_GOSSIP + last_seen.saturating_elapsed(now) <= constants::MAX_PEER_ACTIVE_FOR_GOSSIP } else { // Peer has never responded and does not have a gossiped last seen time false @@ -496,12 +504,16 @@ impl MetaAddr { } /// Is this address ready for a new outbound connection attempt? - pub fn is_ready_for_connection_attempt(&self) -> bool { + pub fn is_ready_for_connection_attempt( + &self, + instant_now: Instant, + chrono_now: chrono::DateTime, + ) -> bool { self.last_known_info_is_valid_for_outbound() - && !self.has_connection_recently_responded() - && !self.was_connection_recently_attempted() - && !self.has_connection_recently_failed() - && self.is_probably_reachable() + && !self.has_connection_recently_responded(chrono_now) + && !self.was_connection_recently_attempted(instant_now) + && !self.has_connection_recently_failed(instant_now) + && self.is_probably_reachable(chrono_now) } /// Is the [`SocketAddr`] we have for this peer valid for outbound @@ -545,8 +557,8 @@ impl MetaAddr { /// itself seen the peer. If the reported last seen time is a long time ago or `None`, then the local /// node will attempt to connect the peer once, and if that attempt fails it won't /// try to connect ever again. (The state can't be `Failed` until after the first connection attempt.) - pub fn is_probably_reachable(&self) -> bool { - self.last_connection_state != PeerAddrState::Failed || self.last_seen_is_recent() + pub fn is_probably_reachable(&self, now: chrono::DateTime) -> bool { + self.last_connection_state != PeerAddrState::Failed || self.last_seen_is_recent(now) } /// Was this peer last seen recently? @@ -554,9 +566,9 @@ impl MetaAddr { /// Returns `true` if this peer was last seen at most /// [`MAX_RECENT_PEER_AGE`][constants::MAX_RECENT_PEER_AGE] ago. /// Returns false if the peer is outdated, or it has no last seen time. - pub fn last_seen_is_recent(&self) -> bool { + pub fn last_seen_is_recent(&self, now: chrono::DateTime) -> bool { match self.last_seen() { - Some(last_seen) => last_seen.saturating_elapsed() <= constants::MAX_RECENT_PEER_AGE, + Some(last_seen) => last_seen.saturating_elapsed(now) <= constants::MAX_RECENT_PEER_AGE, None => false, } } diff --git a/zebra-network/src/meta_addr/tests/prop.rs b/zebra-network/src/meta_addr/tests/prop.rs index bd9ad0809..757264aef 100644 --- a/zebra-network/src/meta_addr/tests/prop.rs +++ b/zebra-network/src/meta_addr/tests/prop.rs @@ -10,6 +10,7 @@ use std::{ time::Duration, }; +use chrono::Utc; use proptest::{collection::vec, prelude::*}; use tokio::{runtime, time::Instant}; use tower::service_fn; @@ -112,10 +113,13 @@ proptest! { ) { zebra_test::init(); + let instant_now = std::time::Instant::now(); + let chrono_now = Utc::now(); + let mut attempt_count: usize = 0; for change in changes { - while addr.is_ready_for_connection_attempt() { + while addr.is_ready_for_connection_attempt(instant_now, chrono_now) { attempt_count += 1; // Assume that this test doesn't last longer than MIN_PEER_RECONNECTION_DELAY prop_assert!(attempt_count <= 1); @@ -148,9 +152,11 @@ proptest! { ) { zebra_test::init(); + let chrono_now = Utc::now(); + let address_book = AddressBook::new_with_addrs(local_listener, Span::none(), address_book_addrs); - let sanitized_addrs = address_book.sanitized(); + let sanitized_addrs = address_book.sanitized(chrono_now); let expected_local_listener = address_book.local_listener_meta_addr(); let canonical_local_listener = canonical_socket_addr(local_listener); @@ -289,6 +295,9 @@ proptest! { ) { zebra_test::init(); + let instant_now = std::time::Instant::now(); + let chrono_now = Utc::now(); + // Run the test for this many simulated live peer durations const LIVE_PEER_INTERVALS: u32 = 3; // Run the test for this much simulated time @@ -322,7 +331,7 @@ proptest! { let addr = addrs.entry(addr.addr).or_insert(*addr); let change = changes.get(change_index); - while addr.is_ready_for_connection_attempt() { + while addr.is_ready_for_connection_attempt(instant_now, chrono_now) { *attempt_counts.entry(addr.addr).or_default() += 1; prop_assert!( *attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1 @@ -362,16 +371,18 @@ proptest! { /// Make sure check if a peer was recently seen is correct. #[test] fn last_seen_is_recent_is_correct(peer in any::()) { + let chrono_now = Utc::now(); + let time_since_last_seen = peer .last_seen() - .map(|last_seen| last_seen.saturating_elapsed()); + .map(|last_seen| last_seen.saturating_elapsed(chrono_now)); let recently_seen = time_since_last_seen .map(|elapsed| elapsed <= MAX_RECENT_PEER_AGE) .unwrap_or(false); prop_assert_eq!( - peer.last_seen_is_recent(), + peer.last_seen_is_recent(chrono_now), recently_seen, "last seen: {:?}, now: {:?}", peer.last_seen(), @@ -382,13 +393,16 @@ proptest! { /// Make sure a peer is correctly determined to be probably reachable. #[test] fn probably_rechable_is_determined_correctly(peer in any::()) { + + let chrono_now = Utc::now(); + let last_attempt_failed = peer.last_connection_state == Failed; - let not_recently_seen = !peer.last_seen_is_recent(); + let not_recently_seen = !peer.last_seen_is_recent(chrono_now); let probably_unreachable = last_attempt_failed && not_recently_seen; prop_assert_eq!( - peer.is_probably_reachable(), + peer.is_probably_reachable(chrono_now), !probably_unreachable, "last_connection_state: {:?}, last_seen: {:?}", peer.last_connection_state, diff --git a/zebra-network/src/meta_addr/tests/vectors.rs b/zebra-network/src/meta_addr/tests/vectors.rs index 59ba5050b..59630d17e 100644 --- a/zebra-network/src/meta_addr/tests/vectors.rs +++ b/zebra-network/src/meta_addr/tests/vectors.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; +use chrono::Utc; use zebra_chain::serialization::{DateTime32, Duration32}; use super::{super::MetaAddr, check}; @@ -53,12 +54,14 @@ fn sanitize_extremes() { fn new_local_listener_is_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); let peer = MetaAddr::new_local_listener_change(&address) .into_new_meta_addr() .expect("MetaAddrChange can't create a new MetaAddr"); - assert!(peer.is_active_for_gossip()); + assert!(peer.is_active_for_gossip(chrono_now)); } /// Test if a recently received alternate peer address is not gossipable. @@ -69,12 +72,14 @@ fn new_local_listener_is_gossipable() { fn new_alternate_peer_address_is_not_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); let peer = MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK) .into_new_meta_addr() .expect("MetaAddrChange can't create a new MetaAddr"); - assert!(!peer.is_active_for_gossip()); + assert!(!peer.is_active_for_gossip(chrono_now)); } /// Test if recently received gossiped peer is gossipable. @@ -82,6 +87,8 @@ fn new_alternate_peer_address_is_not_gossipable() { fn gossiped_peer_reportedly_to_be_seen_recently_is_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); // Report last seen within the reachable interval. @@ -94,7 +101,7 @@ fn gossiped_peer_reportedly_to_be_seen_recently_is_gossipable() { let peer = MetaAddr::new_gossiped_meta_addr(address, PeerServices::NODE_NETWORK, last_seen); - assert!(peer.is_active_for_gossip()); + assert!(peer.is_active_for_gossip(chrono_now)); } /// Test if received gossiped peer that was reportedly last seen in the future is gossipable. @@ -102,6 +109,8 @@ fn gossiped_peer_reportedly_to_be_seen_recently_is_gossipable() { fn gossiped_peer_reportedly_seen_in_the_future_is_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); // Report last seen in the future @@ -111,7 +120,7 @@ fn gossiped_peer_reportedly_seen_in_the_future_is_gossipable() { let peer = MetaAddr::new_gossiped_meta_addr(address, PeerServices::NODE_NETWORK, last_seen); - assert!(peer.is_active_for_gossip()); + assert!(peer.is_active_for_gossip(chrono_now)); } /// Test if gossiped peer that was reported last seen a long time ago is not gossipable. @@ -119,6 +128,8 @@ fn gossiped_peer_reportedly_seen_in_the_future_is_gossipable() { fn gossiped_peer_reportedly_seen_long_ago_is_not_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); // Report last seen just outside the reachable interval. @@ -131,7 +142,7 @@ fn gossiped_peer_reportedly_seen_long_ago_is_not_gossipable() { let peer = MetaAddr::new_gossiped_meta_addr(address, PeerServices::NODE_NETWORK, last_seen); - assert!(!peer.is_active_for_gossip()); + assert!(!peer.is_active_for_gossip(chrono_now)); } /// Test that peer that has just responded is gossipable. @@ -139,6 +150,8 @@ fn gossiped_peer_reportedly_seen_long_ago_is_not_gossipable() { fn recently_responded_peer_is_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); let peer_seed = MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK) .into_new_meta_addr() @@ -149,7 +162,7 @@ fn recently_responded_peer_is_gossipable() { .apply_to_meta_addr(peer_seed) .expect("Failed to create MetaAddr for responded peer"); - assert!(peer.is_active_for_gossip()); + assert!(peer.is_active_for_gossip(chrono_now)); } /// Test that peer that last responded in the reachable interval is gossipable. @@ -157,6 +170,8 @@ fn recently_responded_peer_is_gossipable() { fn not_so_recently_responded_peer_is_still_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); let peer_seed = MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK) .into_new_meta_addr() @@ -177,7 +192,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() { peer.set_last_response(last_response); - assert!(peer.is_active_for_gossip()); + assert!(peer.is_active_for_gossip(chrono_now)); } /// Test that peer that responded long ago is not gossipable. @@ -185,6 +200,8 @@ fn not_so_recently_responded_peer_is_still_gossipable() { fn responded_long_ago_peer_is_not_gossipable() { zebra_test::init(); + let chrono_now = Utc::now(); + let address = SocketAddr::from(([192, 168, 180, 9], 10_000)); let peer_seed = MetaAddr::new_alternate(&address, &PeerServices::NODE_NETWORK) .into_new_meta_addr() @@ -205,5 +222,5 @@ fn responded_long_ago_peer_is_not_gossipable() { peer.set_last_response(last_response); - assert!(!peer.is_active_for_gossip()); + assert!(!peer.is_active_for_gossip(chrono_now)); } diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 7a9951061..4bfeb49b4 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -1,5 +1,6 @@ use std::{cmp::min, sync::Arc}; +use chrono::Utc; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::time::{sleep_until, timeout, Instant}; use tower::{Service, ServiceExt}; @@ -317,9 +318,14 @@ where // be kept to a minimum. let reconnect = { let mut guard = self.address_book.lock().unwrap(); + + // Now we have the lock, get the current time + let instant_now = std::time::Instant::now(); + let chrono_now = Utc::now(); + // It's okay to return without sleeping here, because we're returning // `None`. We only need to sleep before yielding an address. - let reconnect = guard.reconnection_peers().next()?; + let reconnect = guard.reconnection_peers(instant_now, chrono_now).next()?; let reconnect = MetaAddr::new_reconnect(&reconnect.addr); guard.update(reconnect)? diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 0d09074ad..8af742980 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -20,6 +20,7 @@ use std::{ time::{Duration, Instant}, }; +use chrono::Utc; use futures::{ channel::{mpsc, oneshot}, FutureExt, StreamExt, @@ -184,7 +185,7 @@ async fn peer_limit_zero_mainnet() { address_book.lock().unwrap().peers().count(), 0, "expected no peers in Mainnet address book, but got: {:?}", - address_book.lock().unwrap().address_metrics() + address_book.lock().unwrap().address_metrics(Utc::now()) ); } @@ -205,7 +206,7 @@ async fn peer_limit_zero_testnet() { address_book.lock().unwrap().peers().count(), 0, "expected no peers in Testnet address book, but got: {:?}", - address_book.lock().unwrap().address_metrics() + address_book.lock().unwrap().address_metrics(Utc::now()) ); } @@ -1405,7 +1406,7 @@ where over_limit_peers, "expected {} peers in Mainnet address book, but got: {:?}", over_limit_peers, - address_book.lock().unwrap().address_metrics() + address_book.lock().unwrap().address_metrics(Utc::now()) ); (config, peerset_rx) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 801b3ebec..b54d46d77 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -55,6 +55,7 @@ use std::{ time::Instant, }; +use chrono::Utc; use futures::{ channel::{mpsc, oneshot}, future::{FutureExt, TryFutureExt}, @@ -675,9 +676,15 @@ where // // Only log address metrics in exceptional circumstances, to avoid lock contention. // + // Get the current time after acquiring the address book lock. + // // TODO: replace with a watch channel that is updated in `AddressBook::update_metrics()`, // or turn the address book into a service (#1976) - let address_metrics = self.address_book.lock().unwrap().address_metrics(); + let address_metrics = self + .address_book + .lock() + .unwrap() + .address_metrics(Utc::now()); if unready_services_len == 0 { warn!( ?address_metrics, @@ -704,7 +711,12 @@ where // Security: make sure we haven't exceeded the connection limit if num_peers > self.peerset_total_connection_limit { - let address_metrics = self.address_book.lock().unwrap().address_metrics(); + // Correctness: Get the current time after acquiring the address book lock. + let address_metrics = self + .address_book + .lock() + .unwrap() + .address_metrics(Utc::now()); panic!( "unexpectedly exceeded configured peer set connection limit: \n\ peers: {:?}, ready: {:?}, unready: {:?}, \n\ diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 29b8609c6..19d3a4525 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -12,6 +12,7 @@ use std::{ task::{Context, Poll}, }; +use chrono::Utc; use futures::{ future::{FutureExt, TryFutureExt}, stream::Stream, @@ -305,8 +306,11 @@ impl Service for Inbound { // the lock. let peers = address_book.lock().unwrap().clone(); + // Correctness: get the current time after acquiring the address book lock. + let now = Utc::now(); + // Send a sanitized response - let mut peers = peers.sanitized(); + let mut peers = peers.sanitized(now); // Truncate the list let truncate_at = MAX_ADDRS_IN_MESSAGE