diff --git a/zebra-chain/src/serialization.rs b/zebra-chain/src/serialization.rs index f383a0641..e11bcaed2 100644 --- a/zebra-chain/src/serialization.rs +++ b/zebra-chain/src/serialization.rs @@ -24,7 +24,7 @@ pub mod arbitrary; pub use constraint::AtLeastOne; pub use date_time::DateTime32; pub use error::SerializationError; -pub use read_zcash::ReadZcashExt; +pub use read_zcash::{canonical_socket_addr, ReadZcashExt}; pub use write_zcash::WriteZcashExt; pub use zcash_deserialize::{ zcash_deserialize_bytes_external_count, zcash_deserialize_external_count, TrustedPreallocate, diff --git a/zebra-chain/src/serialization/arbitrary.rs b/zebra-chain/src/serialization/arbitrary.rs index 6cd6af92c..8e3962849 100644 --- a/zebra-chain/src/serialization/arbitrary.rs +++ b/zebra-chain/src/serialization/arbitrary.rs @@ -1,6 +1,6 @@ //! Arbitrary data generation for serialization proptests -use super::{read_zcash::canonical_ip_addr, DateTime32}; +use super::{read_zcash::canonical_socket_addr, DateTime32}; use chrono::{TimeZone, Utc, MAX_DATETIME, MIN_DATETIME}; use proptest::{arbitrary::any, prelude::*}; use std::net::SocketAddr; @@ -59,10 +59,6 @@ pub fn datetime_u32() -> impl Strategy> { /// Returns a random canonical Zebra `SocketAddr`. /// /// See [`canonical_ip_addr`] for details. -pub fn canonical_socket_addr() -> impl Strategy { - use SocketAddr::*; - any::().prop_map(|addr| match addr { - V4(_) => addr, - V6(v6_addr) => SocketAddr::new(canonical_ip_addr(v6_addr.ip()), v6_addr.port()), - }) +pub fn canonical_socket_addr_strategy() -> impl Strategy { + any::().prop_map(canonical_socket_addr) } diff --git a/zebra-chain/src/serialization/date_time.rs b/zebra-chain/src/serialization/date_time.rs index ce01035ae..7a4852028 100644 --- a/zebra-chain/src/serialization/date_time.rs +++ b/zebra-chain/src/serialization/date_time.rs @@ -44,40 +44,70 @@ impl DateTime32 { self.into() } - /// Returns the current time + /// Returns the current time. + /// + /// # Panics + /// + /// If the number of seconds since the UNIX epoch is greater than `u32::MAX`. pub fn now() -> DateTime32 { Utc::now() .try_into() .expect("unexpected out of range chrono::DateTime") } - /// Returns the number of seconds elapsed between `earlier` and this time, + /// Returns the duration elapsed between `earlier` and this time, /// or `None` if `earlier` is later than this time. pub fn checked_duration_since(&self, earlier: DateTime32) -> Option { self.timestamp .checked_sub(earlier.timestamp) - .map(|seconds| Duration32 { seconds }) + .map(Duration32::from) } - /// Returns the number of seconds elapsed between `earlier` and this time, + /// Returns duration elapsed between `earlier` and this time, /// or zero if `earlier` is later than this time. pub fn saturating_duration_since(&self, earlier: DateTime32) -> Duration32 { - Duration32 { - seconds: self.timestamp.saturating_sub(earlier.timestamp), - } + Duration32::from(self.timestamp.saturating_sub(earlier.timestamp)) } - /// Returns the number of seconds elapsed since this time, + /// Returns the duration elapsed since this time, /// or if this time is in the future, returns `None`. pub fn checked_elapsed(&self) -> Option { DateTime32::now().checked_duration_since(*self) } - /// Returns the number of seconds elapsed since this time, + /// Returns the duration elapsed since this time, /// or if this time is in the future, returns zero. pub fn saturating_elapsed(&self) -> Duration32 { DateTime32::now().saturating_duration_since(*self) } + + /// Returns the time that is `duration` after this time. + /// If the calculation overflows, returns `None`. + pub fn checked_add(&self, duration: Duration32) -> Option { + self.timestamp + .checked_add(duration.seconds) + .map(DateTime32::from) + } + + /// Returns the time that is `duration` after this time. + /// If the calculation overflows, returns `DateTime32::MAX`. + pub fn saturating_add(&self, duration: Duration32) -> DateTime32 { + DateTime32::from(self.timestamp.saturating_add(duration.seconds)) + } + + /// Returns the time that is `duration` before this time. + /// If the calculation underflows, returns `None`. + pub fn checked_sub(&self, duration: Duration32) -> Option { + self.timestamp + .checked_sub(duration.seconds) + .map(DateTime32::from) + } + + /// Returns the time that is `duration` before this time. + /// If the calculation underflows, returns `DateTime32::MIN`. + pub fn saturating_sub(&self, duration: Duration32) -> DateTime32 { + DateTime32::from(self.timestamp.saturating_sub(duration.seconds)) + } } impl Duration32 { @@ -87,7 +117,7 @@ impl Duration32 { /// The latest possible `Duration32` value. pub const MAX: Duration32 = Duration32 { seconds: u32::MAX }; - /// Returns the number of seconds. + /// Returns the number of seconds in this duration. pub fn seconds(&self) -> u32 { self.seconds } @@ -101,6 +131,34 @@ impl Duration32 { pub fn to_std(self) -> std::time::Duration { self.into() } + + /// Returns a duration that is `duration` longer than this duration. + /// If the calculation overflows, returns `None`. + pub fn checked_add(&self, duration: Duration32) -> Option { + self.seconds + .checked_add(duration.seconds) + .map(Duration32::from) + } + + /// Returns a duration that is `duration` longer than this duration. + /// If the calculation overflows, returns `Duration32::MAX`. + pub fn saturating_add(&self, duration: Duration32) -> Duration32 { + Duration32::from(self.seconds.saturating_add(duration.seconds)) + } + + /// Returns a duration that is `duration` shorter than this duration. + /// If the calculation underflows, returns `None`. + pub fn checked_sub(&self, duration: Duration32) -> Option { + self.seconds + .checked_sub(duration.seconds) + .map(Duration32::from) + } + + /// Returns a duration that is `duration` shorter than this duration. + /// If the calculation underflows, returns `Duration32::MIN`. + pub fn saturating_sub(&self, duration: Duration32) -> Duration32 { + Duration32::from(self.seconds.saturating_sub(duration.seconds)) + } } impl fmt::Debug for DateTime32 { @@ -189,7 +247,7 @@ impl TryFrom> for DateTime32 { /// Convert from a [`chrono::DateTime`] to a [`DateTime32`], discarding any nanoseconds. /// - /// Conversion fails if the number of seconds is outside the `u32` range. + /// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range. fn try_from(value: chrono::DateTime) -> Result { Ok(Self { timestamp: value.timestamp().try_into()?, @@ -202,7 +260,7 @@ impl TryFrom<&chrono::DateTime> for DateTime32 { /// Convert from a [`chrono::DateTime`] to a [`DateTime32`], discarding any nanoseconds. /// - /// Conversion fails if the number of seconds is outside the `u32` range. + /// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range. fn try_from(value: &chrono::DateTime) -> Result { (*value).try_into() } @@ -213,7 +271,7 @@ impl TryFrom for Duration32 { /// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds. /// - /// Conversion fails if the number of seconds is outside the `u32` range. + /// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range. fn try_from(value: chrono::Duration) -> Result { Ok(Self { seconds: value.num_seconds().try_into()?, @@ -226,7 +284,7 @@ impl TryFrom<&chrono::Duration> for Duration32 { /// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds. /// - /// Conversion fails if the number of seconds is outside the `u32` range. + /// Conversion fails if the number of seconds in the duration is outside the `u32` range. fn try_from(value: &chrono::Duration) -> Result { (*value).try_into() } @@ -237,7 +295,7 @@ impl TryFrom for Duration32 { /// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds. /// - /// Conversion fails if the number of seconds is outside the `u32` range. + /// Conversion fails if the number of seconds in the duration is outside the `u32` range. fn try_from(value: std::time::Duration) -> Result { Ok(Self { seconds: value.as_secs().try_into()?, @@ -250,7 +308,7 @@ impl TryFrom<&std::time::Duration> for Duration32 { /// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds. /// - /// Conversion fails if the number of seconds is outside the `u32` range. + /// Conversion fails if the number of seconds in the duration is outside the `u32` range. fn try_from(value: &std::time::Duration) -> Result { (*value).try_into() } diff --git a/zebra-chain/src/serialization/read_zcash.rs b/zebra-chain/src/serialization/read_zcash.rs index 8f67730e7..23ba352c0 100644 --- a/zebra-chain/src/serialization/read_zcash.rs +++ b/zebra-chain/src/serialization/read_zcash.rs @@ -162,3 +162,20 @@ pub fn canonical_ip_addr(v6_addr: &Ipv6Addr) -> IpAddr { Some(_) | None => V6(*v6_addr), } } + +/// Transform a `SocketAddr` into a canonical Zebra `SocketAddr`, converting +/// IPv6-mapped IPv4 addresses, and removing IPv6 scope IDs and flow information. +/// +/// See [`canonical_ip_addr`] for detailed info on IPv6-mapped IPv4 addresses. +pub fn canonical_socket_addr(socket_addr: impl Into) -> SocketAddr { + use SocketAddr::*; + + let mut socket_addr = socket_addr.into(); + if let V6(v6_socket_addr) = socket_addr { + let canonical_ip = canonical_ip_addr(v6_socket_addr.ip()); + // creating a new SocketAddr removes scope IDs and flow information + socket_addr = SocketAddr::new(canonical_ip, socket_addr.port()); + } + + socket_addr +} diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index decf9f060..ecc6dec7d 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -10,6 +10,8 @@ use std::{ use tracing::Span; +use zebra_chain::serialization::canonical_socket_addr; + use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState}; /// A database of peer listener addresses, their advertised services, and @@ -94,7 +96,7 @@ impl AddressBook { let mut new_book = AddressBook { by_addr: HashMap::default(), - local_listener: config.listen_addr, + local_listener: canonical_socket_addr(config.listen_addr), span, last_address_log: None, }; @@ -118,6 +120,11 @@ impl AddressBook { let addrs = addrs .into_iter() + .map(|mut meta_addr| { + meta_addr.addr = canonical_socket_addr(meta_addr.addr); + meta_addr + }) + .filter(MetaAddr::address_is_valid_for_outbound) .map(|meta_addr| (meta_addr.addr, meta_addr)); new_book.by_addr.extend(addrs); @@ -126,34 +133,36 @@ impl AddressBook { } /// Get the local listener address. - pub fn get_local_listener(&self) -> MetaAddrChange { - MetaAddr::new_local_listener(&self.local_listener) + /// + /// This address contains minimal state, but it is not sanitized. + pub fn get_local_listener(&self) -> MetaAddr { + MetaAddr::new_local_listener_change(&self.local_listener) + .into_new_meta_addr() + .expect("unexpected invalid new local listener addr") } /// Get the contents of `self` in random order with sanitized timestamps. pub fn sanitized(&self) -> Vec { use rand::seq::SliceRandom; let _guard = self.span.enter(); - let mut peers = self - .peers() - .filter_map(|a| MetaAddr::sanitize(&a)) + + let mut peers = self.by_addr.clone(); + + // Unconditionally add our local listener address to the advertised peers, + // to replace any self-connection failures. The address book and change + // constructors make sure that the SocketAddr is canonical. + let local_listener = self.get_local_listener(); + peers.insert(local_listener.addr, local_listener); + + // Then sanitize and shuffle + let mut peers = peers + .values() + .filter_map(MetaAddr::sanitize) .collect::>(); peers.shuffle(&mut rand::thread_rng()); peers } - /// Returns true if the address book has an entry for `addr`. - pub fn contains_addr(&self, addr: &SocketAddr) -> bool { - let _guard = self.span.enter(); - self.by_addr.contains_key(addr) - } - - /// Returns the entry corresponding to `addr`, or `None` if it does not exist. - pub fn get_by_addr(&self, addr: SocketAddr) -> Option { - let _guard = self.span.enter(); - self.by_addr.get(&addr).cloned() - } - /// Apply `change` to the address book, returning the updated `MetaAddr`, /// if the change was valid. /// @@ -162,6 +171,9 @@ impl AddressBook { /// All changes should go through `update`, so that the address book /// only contains valid outbound addresses. /// + /// Change addresses must be canonical `SocketAddr`s. This makes sure that + /// each address book entry has a unique IP address. + /// /// # Security /// /// This function must apply every attempted, responded, and failed change @@ -218,6 +230,7 @@ impl AddressBook { /// /// All address removals should go through `take`, so that the address /// book metrics are accurate. + #[allow(dead_code)] fn take(&mut self, removed_addr: SocketAddr) -> Option { let _guard = self.span.enter(); trace!( @@ -314,14 +327,6 @@ impl AddressBook { .cloned() } - /// Returns an iterator that drains entries from the address book. - /// - /// Removes entries in reconnection attempt then arbitrary order, - /// see [`peers`] for details. - pub fn drain(&'_ mut self) -> impl Iterator + '_ { - Drain { book: self } - } - /// Returns the number of entries in this address book. pub fn len(&self) -> usize { self.by_addr.len() @@ -442,16 +447,3 @@ impl Extend for AddressBook { } } } - -struct Drain<'a> { - book: &'a mut AddressBook, -} - -impl<'a> Iterator for Drain<'a> { - type Item = MetaAddr; - - fn next(&mut self) -> Option { - let next_item_addr = self.book.peers().next()?.addr; - self.book.take(next_item_addr) - } -} diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 3c40801b2..e81d9c4d7 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -7,7 +7,7 @@ use std::{ use serde::{de, Deserialize, Deserializer}; -use zebra_chain::parameters::Network; +use zebra_chain::{parameters::Network, serialization::canonical_socket_addr}; use crate::BoxError; @@ -131,7 +131,7 @@ impl Config { let fut = tokio::time::timeout(crate::constants::DNS_LOOKUP_TIMEOUT, fut); match fut.await { - Ok(Ok(ips)) => Ok(ips.collect()), + Ok(Ok(ips)) => Ok(ips.map(canonical_socket_addr).collect()), Ok(Err(e)) => { tracing::info!(?host, ?e, "DNS error resolving peer IP address"); Err(e.into()) @@ -237,7 +237,7 @@ impl<'de> Deserialize<'de> for Config { }?; Ok(Config { - listen_addr, + listen_addr: canonical_socket_addr(listen_addr), network: config.network, initial_mainnet_peers: config.initial_mainnet_peers, initial_testnet_peers: config.initial_testnet_peers, diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs index e565261a3..ae6a3063f 100644 --- a/zebra-network/src/meta_addr.rs +++ b/zebra-network/src/meta_addr.rs @@ -11,8 +11,8 @@ use std::{ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use zebra_chain::serialization::{ - DateTime32, ReadZcashExt, SerializationError, TrustedPreallocate, WriteZcashExt, - ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize, + canonical_socket_addr, DateTime32, ReadZcashExt, SerializationError, TrustedPreallocate, + WriteZcashExt, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize, }; use crate::{ @@ -26,7 +26,7 @@ use PeerAddrState::*; #[cfg(any(test, feature = "proptest-impl"))] use proptest_derive::Arbitrary; #[cfg(any(test, feature = "proptest-impl"))] -use zebra_chain::serialization::arbitrary::canonical_socket_addr; +use zebra_chain::serialization::arbitrary::canonical_socket_addr_strategy; #[cfg(any(test, feature = "proptest-impl"))] mod arbitrary; @@ -125,12 +125,15 @@ impl PartialOrd for PeerAddrState { #[derive(Copy, Clone, Debug)] #[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub struct MetaAddr { - /// The peer's address. + /// The peer's canonical socket address. #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] - pub addr: SocketAddr, + // + // TODO: make addr private, so the constructors can make sure it is a + // canonical SocketAddr (#2357) + pub(crate) addr: SocketAddr, /// The services advertised by the peer. /// @@ -148,8 +151,8 @@ pub struct MetaAddr { /// records, older peer versions, or buggy or malicious peers. // // TODO: make services private and optional - // split gossiped and handshake services? - pub services: PeerServices, + // split gossiped and handshake services? (#2234) + pub(crate) services: PeerServices, /// The unverified "last seen time" gossiped by the remote peer that sent us /// this address. @@ -173,10 +176,11 @@ pub struct MetaAddr { last_failure: Option, /// The outcome of our most recent communication attempt with this peer. - pub last_connection_state: PeerAddrState, // - // TODO: move the time and services fields into PeerAddrState? + // TODO: make services private and optional? + // move the time and services fields into PeerAddrState? // then some fields could be required in some states + pub(crate) last_connection_state: PeerAddrState, } /// A change to an existing `MetaAddr`. @@ -187,7 +191,7 @@ pub enum MetaAddrChange { NewGossiped { #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] addr: SocketAddr, untrusted_services: PeerServices, @@ -200,7 +204,7 @@ pub enum MetaAddrChange { NewAlternate { #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] addr: SocketAddr, untrusted_services: PeerServices, @@ -210,7 +214,7 @@ pub enum MetaAddrChange { NewLocal { #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] addr: SocketAddr, }, @@ -220,7 +224,7 @@ pub enum MetaAddrChange { UpdateAttempt { #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] addr: SocketAddr, }, @@ -229,7 +233,7 @@ pub enum MetaAddrChange { UpdateResponded { #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] addr: SocketAddr, services: PeerServices, @@ -239,7 +243,7 @@ pub enum MetaAddrChange { UpdateFailed { #[cfg_attr( any(test, feature = "proptest-impl"), - proptest(strategy = "canonical_socket_addr()") + proptest(strategy = "canonical_socket_addr_strategy()") )] addr: SocketAddr, services: Option, @@ -255,7 +259,7 @@ impl MetaAddr { untrusted_last_seen: DateTime32, ) -> MetaAddr { MetaAddr { - addr, + addr: canonical_socket_addr(addr), services: untrusted_services, untrusted_last_seen: Some(untrusted_last_seen), last_response: None, @@ -269,7 +273,7 @@ impl MetaAddr { /// `MetaAddr`. pub fn new_gossiped_change(self) -> MetaAddrChange { NewGossiped { - addr: self.addr, + addr: canonical_socket_addr(self.addr), untrusted_services: self.services, untrusted_last_seen: self .untrusted_last_seen @@ -291,7 +295,7 @@ impl MetaAddr { /// - Zebra could advertise unreachable addresses to its own peers. pub fn new_responded(addr: &SocketAddr, services: &PeerServices) -> MetaAddrChange { UpdateResponded { - addr: *addr, + addr: canonical_socket_addr(*addr), services: *services, } } @@ -299,21 +303,25 @@ impl MetaAddr { /// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we /// want to make an outbound connection to. pub fn new_reconnect(addr: &SocketAddr) -> MetaAddrChange { - UpdateAttempt { addr: *addr } + UpdateAttempt { + addr: canonical_socket_addr(*addr), + } } /// Returns a [`MetaAddrChange::NewAlternate`] for a peer's alternate address, /// received via a `Version` message. pub fn new_alternate(addr: &SocketAddr, untrusted_services: &PeerServices) -> MetaAddrChange { NewAlternate { - addr: *addr, + addr: canonical_socket_addr(*addr), untrusted_services: *untrusted_services, } } /// Returns a [`MetaAddrChange::NewLocal`] for our own listener address. - pub fn new_local_listener(addr: &SocketAddr) -> MetaAddrChange { - NewLocal { addr: *addr } + pub fn new_local_listener_change(addr: &SocketAddr) -> MetaAddrChange { + NewLocal { + addr: canonical_socket_addr(*addr), + } } /// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had @@ -323,7 +331,7 @@ impl MetaAddr { services: impl Into>, ) -> MetaAddrChange { UpdateFailed { - addr: *addr, + addr: canonical_socket_addr(*addr), services: services.into(), } } @@ -482,15 +490,24 @@ impl MetaAddr { /// /// Returns `None` if this `MetaAddr` should not be sent to remote peers. pub fn sanitize(&self) -> Option { - let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS; - let ts = self.last_seen()?.timestamp(); - // This can't underflow, because `0 <= rem_euclid < ts` - let last_seen = ts - ts.rem_euclid(interval); - let last_seen = DateTime32::from(last_seen); + if !self.last_known_info_is_valid_for_outbound() { + return None; + } + + // Sanitize time + let last_seen = self.last_seen()?; + let remainder = last_seen + .timestamp() + .rem_euclid(crate::constants::TIMESTAMP_TRUNCATION_SECONDS); + let last_seen = last_seen + .checked_sub(remainder.into()) + .expect("unexpected underflow: rem_euclid is strictly less than timestamp"); + Some(MetaAddr { - addr: self.addr, - // deserialization also sanitizes services to known flags - services: self.services & PeerServices::all(), + addr: canonical_socket_addr(self.addr), + // TODO: split untrusted and direct services + // sanitize untrusted services to NODE_NETWORK only? (#2234) + services: self.services, // only put the last seen time in the untrusted field, // this matches deserialization, and avoids leaking internal state untrusted_last_seen: Some(last_seen), @@ -540,9 +557,10 @@ impl MetaAddrChange { NewAlternate { untrusted_services, .. } => Some(*untrusted_services), - // TODO: create a "services implemented by Zebra" constant + // TODO: create a "services implemented by Zebra" constant (#2234) NewLocal { .. } => Some(PeerServices::NODE_NETWORK), UpdateAttempt { .. } => None, + // TODO: split untrusted and direct services (#2234) UpdateResponded { services, .. } => Some(*services), UpdateFailed { services, .. } => *services, } @@ -556,7 +574,8 @@ impl MetaAddrChange { .. } => Some(*untrusted_last_seen), NewAlternate { .. } => None, - NewLocal { .. } => None, + // We know that our local listener is available + NewLocal { .. } => Some(DateTime32::now()), UpdateAttempt { .. } => None, UpdateResponded { .. } => None, UpdateFailed { .. } => None, @@ -630,7 +649,7 @@ impl MetaAddrChange { match self { NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => Some(MetaAddr { addr: self.addr(), - // TODO: make services optional when we add a DNS seeder change/state + // TODO: make services optional when we add a DNS seeder change and state services: self .untrusted_services() .expect("unexpected missing services"), @@ -679,7 +698,7 @@ impl MetaAddrChange { // so malicious peers can't keep changing our peer connection order. Some(MetaAddr { addr: self.addr(), - // TODO: or(self.untrusted_services()) when services become optional + // TODO: or(self.untrusted_services()) when services become optional (#2234) services: previous.services, untrusted_last_seen: previous .untrusted_last_seen @@ -702,8 +721,10 @@ impl MetaAddrChange { // connection timeout, even if changes are applied out of order. Some(MetaAddr { addr: self.addr(), + // We want up-to-date services, even if they have fewer bits, + // or they are applied out of order. services: self.untrusted_services().unwrap_or(previous.services), - // only NeverAttempted changes can modify the last seen field + // Only NeverAttempted changes can modify the last seen field untrusted_last_seen: previous.untrusted_last_seen, // Since Some(time) is always greater than None, `max` prefers: // - the latest time if both are Some @@ -781,7 +802,7 @@ impl Ord for MetaAddr { // So this comparison will have no impact until Zebra implements // more service features. // - // TODO: order services by usefulness, not bit pattern values + // TODO: order services by usefulness, not bit pattern values (#2234) // Security: split gossiped and direct services let larger_services = self.services.cmp(&other.services); diff --git a/zebra-network/src/meta_addr/arbitrary.rs b/zebra-network/src/meta_addr/arbitrary.rs index 90f6a9aae..82a10de86 100644 --- a/zebra-network/src/meta_addr/arbitrary.rs +++ b/zebra-network/src/meta_addr/arbitrary.rs @@ -4,20 +4,26 @@ use proptest::{arbitrary::any, collection::vec, prelude::*}; use super::{MetaAddr, MetaAddrChange, PeerServices}; -use zebra_chain::serialization::{arbitrary::canonical_socket_addr, DateTime32}; +use zebra_chain::serialization::{arbitrary::canonical_socket_addr_strategy, DateTime32}; -/// The largest number of random changes we want to apply to a MetaAddr +/// The largest number of random changes we want to apply to a [`MetaAddr`]. /// -/// This should be at least twice the number of [`PeerAddrState`]s, so -/// the tests can cover multiple transitions through every state. +/// This should be at least twice the number of [`PeerAddrState`]s, so the tests +/// can cover multiple transitions through every state. pub const MAX_ADDR_CHANGE: usize = 15; +/// The largest number of random addresses we want to add to an [`AddressBook`]. +/// +/// This should be at least the number of [`PeerAddrState`]s, so the tests can +/// cover interactions between addresses in different states. +pub const MAX_META_ADDR: usize = 8; + impl MetaAddr { /// Create a strategy that generates [`MetaAddr`]s in the /// [`PeerAddrState::NeverAttemptedGossiped`] state. pub fn gossiped_strategy() -> BoxedStrategy { ( - canonical_socket_addr(), + canonical_socket_addr_strategy(), any::(), any::(), ) @@ -30,7 +36,7 @@ impl MetaAddr { /// Create a strategy that generates [`MetaAddr`]s in the /// [`PeerAddrState::NeverAttemptedAlternate`] state. pub fn alternate_strategy() -> BoxedStrategy { - (canonical_socket_addr(), any::()) + (canonical_socket_addr_strategy(), any::()) .prop_map(|(socket_addr, untrusted_services)| { MetaAddr::new_alternate(&socket_addr, &untrusted_services) .into_new_meta_addr() @@ -78,7 +84,7 @@ impl MetaAddrChange { /// TODO: Generate all [`MetaAddrChange`] variants, and give them ready fields. /// (After PR #2276 merges.) pub fn ready_outbound_strategy() -> BoxedStrategy { - canonical_socket_addr() + canonical_socket_addr_strategy() .prop_filter_map("failed MetaAddr::is_valid_for_outbound", |addr| { // Alternate nodes use the current time, so they're always ready // diff --git a/zebra-network/src/meta_addr/tests/check.rs b/zebra-network/src/meta_addr/tests/check.rs index e90180ba2..565443d92 100644 --- a/zebra-network/src/meta_addr/tests/check.rs +++ b/zebra-network/src/meta_addr/tests/check.rs @@ -1,5 +1,7 @@ //! Shared test checks for MetaAddr +use std::net::SocketAddr; + use super::super::MetaAddr; use crate::{constants::TIMESTAMP_TRUNCATION_SECONDS, types::PeerServices}; @@ -68,8 +70,24 @@ pub(crate) fn sanitize_avoids_leaks(original: &MetaAddr, sanitized: &MetaAddr) { // Sanitize to the the default state, even though it's not serialized assert_eq!(sanitized.last_connection_state, Default::default()); // Sanitize to known flags - assert_eq!(sanitized.services, original.services & PeerServices::all()); + let sanitized_peer_services = original.services & PeerServices::all(); + assert_eq!(sanitized.services, sanitized_peer_services); - // We want the other fields to be unmodified - assert_eq!(sanitized.addr, original.addr); + // Remove IPv6 scope ID and flow information + let sanitized_socket_addr = SocketAddr::new(original.addr.ip(), original.addr.port()); + assert_eq!(sanitized.addr.ip(), original.addr.ip()); + assert_eq!(sanitized.addr.port(), original.addr.port()); + assert_eq!(sanitized.addr, sanitized_socket_addr); + + // We want any other fields to be their defaults + assert_eq!( + sanitized, + &MetaAddr::new_gossiped_meta_addr( + sanitized_socket_addr, + sanitized_peer_services, + sanitized + .last_seen() + .expect("unexpected missing last seen time in sanitized MetaAddr") + ), + ); } diff --git a/zebra-network/src/meta_addr/tests/prop.rs b/zebra-network/src/meta_addr/tests/prop.rs index efc57d115..b07d96258 100644 --- a/zebra-network/src/meta_addr/tests/prop.rs +++ b/zebra-network/src/meta_addr/tests/prop.rs @@ -14,13 +14,18 @@ use tokio::{runtime::Runtime, time::Instant}; use tower::service_fn; use tracing::Span; -use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize}; +use zebra_chain::serialization::{canonical_socket_addr, ZcashDeserialize, ZcashSerialize}; use super::check; use crate::{ constants::LIVE_PEER_DURATION, - meta_addr::{arbitrary::MAX_ADDR_CHANGE, MetaAddr, MetaAddrChange, PeerAddrState::*}, + meta_addr::{ + arbitrary::{MAX_ADDR_CHANGE, MAX_META_ADDR}, + MetaAddr, MetaAddrChange, + PeerAddrState::*, + }, peer_set::candidate_set::CandidateSet, + protocol::types::PeerServices, AddressBook, Config, }; @@ -32,12 +37,22 @@ const DEFAULT_VERBOSE_TEST_PROPTEST_CASES: u32 = 256; proptest! { /// Make sure that the sanitize function reduces time and state metadata - /// leaks. + /// leaks for valid addresses. + /// + /// Make sure that the sanitize function skips invalid IP addresses, ports, + /// and client services. #[test] fn sanitize_avoids_leaks(addr in MetaAddr::arbitrary()) { zebra_test::init(); if let Some(sanitized) = addr.sanitize() { + // check that all sanitized addresses are valid for outbound + prop_assert!(addr.last_known_info_is_valid_for_outbound()); + // also check the address, port, and services individually + prop_assert!(!addr.addr.ip().is_unspecified()); + prop_assert_ne!(addr.addr.port(), 0); + prop_assert!(addr.services.contains(PeerServices::NODE_NETWORK)); + check::sanitize_avoids_leaks(&addr, &sanitized); } } @@ -245,6 +260,40 @@ proptest! { } } } + + /// Make sure that a sanitized [`AddressBook`] contains the local listener + /// [`MetaAddr`], regardless of the previous contents of the address book. + /// + /// If Zebra gossips its own listener address to peers, and gets it back, + /// its address book will contain its local listener address. This address + /// will likely be in [`PeerAddrState::Failed`], due to failed + /// self-connection attempts. + #[test] + fn sanitized_address_book_contains_local_listener( + local_listener in any::(), + address_book_addrs in vec(any::(), 0..MAX_META_ADDR), + ) { + zebra_test::init(); + + let config = Config { listen_addr: local_listener, ..Config::default() }; + let address_book = AddressBook::new_with_addrs(&config, Span::none(), address_book_addrs); + let sanitized_addrs = address_book.sanitized(); + + let expected_local_listener = address_book.get_local_listener(); + let canonical_local_listener = canonical_socket_addr(local_listener); + let book_sanitized_local_listener = sanitized_addrs.iter().find(|meta_addr| meta_addr.addr == canonical_local_listener ); + + // invalid addresses should be removed by sanitization, + // regardless of where they have come from + prop_assert_eq!( + book_sanitized_local_listener.cloned(), + expected_local_listener.sanitize(), + "address book: {:?}, sanitized_addrs: {:?}, canonical_local_listener: {:?}", + address_book, + sanitized_addrs, + canonical_local_listener, + ); + } } proptest! { diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index cc4ac3ff4..0e5cdfd4c 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -379,33 +379,32 @@ fn validate_addrs( /// This will consider all addresses as invalid if trying to offset their /// `last_seen` times to be before the limit causes an underflow. fn limit_last_seen_times(addrs: &mut Vec, last_seen_limit: DateTime32) { - let (oldest_reported_seen_timestamp, newest_reported_seen_timestamp) = - addrs - .iter() - .fold((u32::MAX, u32::MIN), |(oldest, newest), addr| { - let last_seen = addr - .untrusted_last_seen() - .expect("unexpected missing last seen") - .timestamp(); - (oldest.min(last_seen), newest.max(last_seen)) - }); + let last_seen_times = addrs.iter().map(|meta_addr| { + meta_addr + .untrusted_last_seen() + .expect("unexpected missing last seen: should be provided by deserialization") + }); + let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN); + let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX); // If any time is in the future, adjust all times, to compensate for clock skew on honest peers - if newest_reported_seen_timestamp > last_seen_limit.timestamp() { - let offset = newest_reported_seen_timestamp - last_seen_limit.timestamp(); + if newest_seen > last_seen_limit { + let offset = newest_seen + .checked_duration_since(last_seen_limit) + .expect("unexpected underflow: just checked newest_seen is greater"); - // Apply offset to oldest timestamp to check for underflow - let oldest_resulting_timestamp = oldest_reported_seen_timestamp as i64 - offset as i64; - if oldest_resulting_timestamp >= 0 { + // Check for underflow + if oldest_seen.checked_sub(offset).is_some() { // No underflow is possible, so apply offset to all addresses for addr in addrs { - let old_last_seen = addr + let last_seen = addr .untrusted_last_seen() - .expect("unexpected missing last seen") - .timestamp(); - let new_last_seen = old_last_seen - offset; + .expect("unexpected missing last seen: should be provided by deserialization"); + let last_seen = last_seen + .checked_sub(offset) + .expect("unexpected underflow: just checked oldest_seen"); - addr.set_untrusted_last_seen(new_last_seen.into()); + addr.set_untrusted_last_seen(last_seen); } } else { // An underflow will occur, so reject all gossiped peers diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 26e9190eb..f5fd28af3 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -201,12 +201,16 @@ where S::Future: Send + 'static, { info!(?initial_peers, "connecting to initial peer set"); - // ## Correctness: + // # Security // - // Each `CallAll` can hold one `Buffer` or `Batch` reservation for - // an indefinite period. We can use `CallAllUnordered` without filling - // the underlying `Inbound` buffer, because we immediately drive this - // single `CallAll` to completion, and handshakes have a short timeout. + // TODO: rate-limit initial seed peer connections (#2326) + // + // # Correctness + // + // Each `FuturesUnordered` can hold one `Buffer` or `Batch` reservation for + // an indefinite period. We can use `FuturesUnordered` without filling + // the underlying network buffers, because we immediately drive this + // single `FuturesUnordered` to completion, and handshakes have a short timeout. let mut handshakes: FuturesUnordered<_> = initial_peers .into_iter() .map(|addr| { diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index e1e965617..e9b341eab 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -240,11 +240,7 @@ impl Service for Inbound { // Briefly hold the address book threaded mutex while // cloning the address book. Then sanitize after releasing // the lock. - let mut peers = address_book.lock().unwrap().clone(); - - // Add our local listener address to the advertised peers - let local_listener = address_book.lock().unwrap().get_local_listener(); - peers.update(local_listener); + let peers = address_book.lock().unwrap().clone(); // Send a sanitized response let mut peers = peers.sanitized();