Security: Use canonical SocketAddrs to avoid duplicate peer connections, Feature: Send local listener to peers (#2276)
* Always send our local listener with the latest time Previously, whenever there was an inbound request for peers, we would clone the address book and update it with the local listener. This had two impacts: - the listener could conflict with an existing entry, rather than unconditionally replacing it, and - the listener was briefly included in the address book metrics. As a side-effect, this change also makes sanitization slightly faster, because it avoids some useless peer filtering and sorting. * Skip listeners that are not valid for outbound connections * Filter sanitized addresses Zebra based on address state This fix correctly prevents Zebra gossiping client addresses to peers, but still keeps the client in the address book to avoid reconnections. * Add a full set of DateTime32 and Duration32 calculation methods * Refactor sanitize to use the new DateTime32/Duration32 methods * Security: Use canonical SocketAddrs to avoid duplicate connections If we allow multiple variants for each peer address, we can make multiple connections to that peer. Also make sure sanitized MetaAddrs are valid for outbound connections. * Test that address books contain the local listener address Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
parent
7638c43a7c
commit
1a57023eac
|
@ -24,7 +24,7 @@ pub mod arbitrary;
|
|||
pub use constraint::AtLeastOne;
|
||||
pub use date_time::DateTime32;
|
||||
pub use error::SerializationError;
|
||||
pub use read_zcash::ReadZcashExt;
|
||||
pub use read_zcash::{canonical_socket_addr, ReadZcashExt};
|
||||
pub use write_zcash::WriteZcashExt;
|
||||
pub use zcash_deserialize::{
|
||||
zcash_deserialize_bytes_external_count, zcash_deserialize_external_count, TrustedPreallocate,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Arbitrary data generation for serialization proptests
|
||||
|
||||
use super::{read_zcash::canonical_ip_addr, DateTime32};
|
||||
use super::{read_zcash::canonical_socket_addr, DateTime32};
|
||||
use chrono::{TimeZone, Utc, MAX_DATETIME, MIN_DATETIME};
|
||||
use proptest::{arbitrary::any, prelude::*};
|
||||
use std::net::SocketAddr;
|
||||
|
@ -59,10 +59,6 @@ pub fn datetime_u32() -> impl Strategy<Value = chrono::DateTime<Utc>> {
|
|||
/// Returns a random canonical Zebra `SocketAddr`.
|
||||
///
|
||||
/// See [`canonical_ip_addr`] for details.
|
||||
pub fn canonical_socket_addr() -> impl Strategy<Value = SocketAddr> {
|
||||
use SocketAddr::*;
|
||||
any::<SocketAddr>().prop_map(|addr| match addr {
|
||||
V4(_) => addr,
|
||||
V6(v6_addr) => SocketAddr::new(canonical_ip_addr(v6_addr.ip()), v6_addr.port()),
|
||||
})
|
||||
pub fn canonical_socket_addr_strategy() -> impl Strategy<Value = SocketAddr> {
|
||||
any::<SocketAddr>().prop_map(canonical_socket_addr)
|
||||
}
|
||||
|
|
|
@ -44,40 +44,70 @@ impl DateTime32 {
|
|||
self.into()
|
||||
}
|
||||
|
||||
/// Returns the current time
|
||||
/// Returns the current time.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If the number of seconds since the UNIX epoch is greater than `u32::MAX`.
|
||||
pub fn now() -> DateTime32 {
|
||||
Utc::now()
|
||||
.try_into()
|
||||
.expect("unexpected out of range chrono::DateTime")
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed between `earlier` and this time,
|
||||
/// Returns the duration elapsed between `earlier` and this time,
|
||||
/// or `None` if `earlier` is later than this time.
|
||||
pub fn checked_duration_since(&self, earlier: DateTime32) -> Option<Duration32> {
|
||||
self.timestamp
|
||||
.checked_sub(earlier.timestamp)
|
||||
.map(|seconds| Duration32 { seconds })
|
||||
.map(Duration32::from)
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed between `earlier` and this time,
|
||||
/// Returns duration elapsed between `earlier` and this time,
|
||||
/// or zero if `earlier` is later than this time.
|
||||
pub fn saturating_duration_since(&self, earlier: DateTime32) -> Duration32 {
|
||||
Duration32 {
|
||||
seconds: self.timestamp.saturating_sub(earlier.timestamp),
|
||||
}
|
||||
Duration32::from(self.timestamp.saturating_sub(earlier.timestamp))
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed since this time,
|
||||
/// Returns the duration elapsed since this time,
|
||||
/// or if this time is in the future, returns `None`.
|
||||
pub fn checked_elapsed(&self) -> Option<Duration32> {
|
||||
DateTime32::now().checked_duration_since(*self)
|
||||
}
|
||||
|
||||
/// Returns the number of seconds elapsed since this time,
|
||||
/// Returns the duration elapsed since this time,
|
||||
/// or if this time is in the future, returns zero.
|
||||
pub fn saturating_elapsed(&self) -> Duration32 {
|
||||
DateTime32::now().saturating_duration_since(*self)
|
||||
}
|
||||
|
||||
/// Returns the time that is `duration` after this time.
|
||||
/// If the calculation overflows, returns `None`.
|
||||
pub fn checked_add(&self, duration: Duration32) -> Option<DateTime32> {
|
||||
self.timestamp
|
||||
.checked_add(duration.seconds)
|
||||
.map(DateTime32::from)
|
||||
}
|
||||
|
||||
/// Returns the time that is `duration` after this time.
|
||||
/// If the calculation overflows, returns `DateTime32::MAX`.
|
||||
pub fn saturating_add(&self, duration: Duration32) -> DateTime32 {
|
||||
DateTime32::from(self.timestamp.saturating_add(duration.seconds))
|
||||
}
|
||||
|
||||
/// Returns the time that is `duration` before this time.
|
||||
/// If the calculation underflows, returns `None`.
|
||||
pub fn checked_sub(&self, duration: Duration32) -> Option<DateTime32> {
|
||||
self.timestamp
|
||||
.checked_sub(duration.seconds)
|
||||
.map(DateTime32::from)
|
||||
}
|
||||
|
||||
/// Returns the time that is `duration` before this time.
|
||||
/// If the calculation underflows, returns `DateTime32::MIN`.
|
||||
pub fn saturating_sub(&self, duration: Duration32) -> DateTime32 {
|
||||
DateTime32::from(self.timestamp.saturating_sub(duration.seconds))
|
||||
}
|
||||
}
|
||||
|
||||
impl Duration32 {
|
||||
|
@ -87,7 +117,7 @@ impl Duration32 {
|
|||
/// The latest possible `Duration32` value.
|
||||
pub const MAX: Duration32 = Duration32 { seconds: u32::MAX };
|
||||
|
||||
/// Returns the number of seconds.
|
||||
/// Returns the number of seconds in this duration.
|
||||
pub fn seconds(&self) -> u32 {
|
||||
self.seconds
|
||||
}
|
||||
|
@ -101,6 +131,34 @@ impl Duration32 {
|
|||
pub fn to_std(self) -> std::time::Duration {
|
||||
self.into()
|
||||
}
|
||||
|
||||
/// Returns a duration that is `duration` longer than this duration.
|
||||
/// If the calculation overflows, returns `None`.
|
||||
pub fn checked_add(&self, duration: Duration32) -> Option<Duration32> {
|
||||
self.seconds
|
||||
.checked_add(duration.seconds)
|
||||
.map(Duration32::from)
|
||||
}
|
||||
|
||||
/// Returns a duration that is `duration` longer than this duration.
|
||||
/// If the calculation overflows, returns `Duration32::MAX`.
|
||||
pub fn saturating_add(&self, duration: Duration32) -> Duration32 {
|
||||
Duration32::from(self.seconds.saturating_add(duration.seconds))
|
||||
}
|
||||
|
||||
/// Returns a duration that is `duration` shorter than this duration.
|
||||
/// If the calculation underflows, returns `None`.
|
||||
pub fn checked_sub(&self, duration: Duration32) -> Option<Duration32> {
|
||||
self.seconds
|
||||
.checked_sub(duration.seconds)
|
||||
.map(Duration32::from)
|
||||
}
|
||||
|
||||
/// Returns a duration that is `duration` shorter than this duration.
|
||||
/// If the calculation underflows, returns `Duration32::MIN`.
|
||||
pub fn saturating_sub(&self, duration: Duration32) -> Duration32 {
|
||||
Duration32::from(self.seconds.saturating_sub(duration.seconds))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for DateTime32 {
|
||||
|
@ -189,7 +247,7 @@ impl TryFrom<chrono::DateTime<Utc>> for DateTime32 {
|
|||
|
||||
/// Convert from a [`chrono::DateTime`] to a [`DateTime32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
/// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range.
|
||||
fn try_from(value: chrono::DateTime<Utc>) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
timestamp: value.timestamp().try_into()?,
|
||||
|
@ -202,7 +260,7 @@ impl TryFrom<&chrono::DateTime<Utc>> for DateTime32 {
|
|||
|
||||
/// Convert from a [`chrono::DateTime`] to a [`DateTime32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
/// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range.
|
||||
fn try_from(value: &chrono::DateTime<Utc>) -> Result<Self, Self::Error> {
|
||||
(*value).try_into()
|
||||
}
|
||||
|
@ -213,7 +271,7 @@ impl TryFrom<chrono::Duration> for Duration32 {
|
|||
|
||||
/// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
/// Conversion fails if the number of seconds since the UNIX epoch is outside the `u32` range.
|
||||
fn try_from(value: chrono::Duration) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
seconds: value.num_seconds().try_into()?,
|
||||
|
@ -226,7 +284,7 @@ impl TryFrom<&chrono::Duration> for Duration32 {
|
|||
|
||||
/// Convert from a [`chrono::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
/// Conversion fails if the number of seconds in the duration is outside the `u32` range.
|
||||
fn try_from(value: &chrono::Duration) -> Result<Self, Self::Error> {
|
||||
(*value).try_into()
|
||||
}
|
||||
|
@ -237,7 +295,7 @@ impl TryFrom<std::time::Duration> for Duration32 {
|
|||
|
||||
/// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
/// Conversion fails if the number of seconds in the duration is outside the `u32` range.
|
||||
fn try_from(value: std::time::Duration) -> Result<Self, Self::Error> {
|
||||
Ok(Self {
|
||||
seconds: value.as_secs().try_into()?,
|
||||
|
@ -250,7 +308,7 @@ impl TryFrom<&std::time::Duration> for Duration32 {
|
|||
|
||||
/// Convert from a [`std::time::Duration`] to a [`Duration32`], discarding any nanoseconds.
|
||||
///
|
||||
/// Conversion fails if the number of seconds is outside the `u32` range.
|
||||
/// Conversion fails if the number of seconds in the duration is outside the `u32` range.
|
||||
fn try_from(value: &std::time::Duration) -> Result<Self, Self::Error> {
|
||||
(*value).try_into()
|
||||
}
|
||||
|
|
|
@ -162,3 +162,20 @@ pub fn canonical_ip_addr(v6_addr: &Ipv6Addr) -> IpAddr {
|
|||
Some(_) | None => V6(*v6_addr),
|
||||
}
|
||||
}
|
||||
|
||||
/// Transform a `SocketAddr` into a canonical Zebra `SocketAddr`, converting
|
||||
/// IPv6-mapped IPv4 addresses, and removing IPv6 scope IDs and flow information.
|
||||
///
|
||||
/// See [`canonical_ip_addr`] for detailed info on IPv6-mapped IPv4 addresses.
|
||||
pub fn canonical_socket_addr(socket_addr: impl Into<SocketAddr>) -> SocketAddr {
|
||||
use SocketAddr::*;
|
||||
|
||||
let mut socket_addr = socket_addr.into();
|
||||
if let V6(v6_socket_addr) = socket_addr {
|
||||
let canonical_ip = canonical_ip_addr(v6_socket_addr.ip());
|
||||
// creating a new SocketAddr removes scope IDs and flow information
|
||||
socket_addr = SocketAddr::new(canonical_ip, socket_addr.port());
|
||||
}
|
||||
|
||||
socket_addr
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ use std::{
|
|||
|
||||
use tracing::Span;
|
||||
|
||||
use zebra_chain::serialization::canonical_socket_addr;
|
||||
|
||||
use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState};
|
||||
|
||||
/// A database of peer listener addresses, their advertised services, and
|
||||
|
@ -94,7 +96,7 @@ impl AddressBook {
|
|||
|
||||
let mut new_book = AddressBook {
|
||||
by_addr: HashMap::default(),
|
||||
local_listener: config.listen_addr,
|
||||
local_listener: canonical_socket_addr(config.listen_addr),
|
||||
span,
|
||||
last_address_log: None,
|
||||
};
|
||||
|
@ -118,6 +120,11 @@ impl AddressBook {
|
|||
|
||||
let addrs = addrs
|
||||
.into_iter()
|
||||
.map(|mut meta_addr| {
|
||||
meta_addr.addr = canonical_socket_addr(meta_addr.addr);
|
||||
meta_addr
|
||||
})
|
||||
.filter(MetaAddr::address_is_valid_for_outbound)
|
||||
.map(|meta_addr| (meta_addr.addr, meta_addr));
|
||||
new_book.by_addr.extend(addrs);
|
||||
|
||||
|
@ -126,34 +133,36 @@ impl AddressBook {
|
|||
}
|
||||
|
||||
/// Get the local listener address.
|
||||
pub fn get_local_listener(&self) -> MetaAddrChange {
|
||||
MetaAddr::new_local_listener(&self.local_listener)
|
||||
///
|
||||
/// This address contains minimal state, but it is not sanitized.
|
||||
pub fn get_local_listener(&self) -> MetaAddr {
|
||||
MetaAddr::new_local_listener_change(&self.local_listener)
|
||||
.into_new_meta_addr()
|
||||
.expect("unexpected invalid new local listener addr")
|
||||
}
|
||||
|
||||
/// Get the contents of `self` in random order with sanitized timestamps.
|
||||
pub fn sanitized(&self) -> Vec<MetaAddr> {
|
||||
use rand::seq::SliceRandom;
|
||||
let _guard = self.span.enter();
|
||||
let mut peers = self
|
||||
.peers()
|
||||
.filter_map(|a| MetaAddr::sanitize(&a))
|
||||
|
||||
let mut peers = self.by_addr.clone();
|
||||
|
||||
// Unconditionally add our local listener address to the advertised peers,
|
||||
// to replace any self-connection failures. The address book and change
|
||||
// constructors make sure that the SocketAddr is canonical.
|
||||
let local_listener = self.get_local_listener();
|
||||
peers.insert(local_listener.addr, local_listener);
|
||||
|
||||
// Then sanitize and shuffle
|
||||
let mut peers = peers
|
||||
.values()
|
||||
.filter_map(MetaAddr::sanitize)
|
||||
.collect::<Vec<_>>();
|
||||
peers.shuffle(&mut rand::thread_rng());
|
||||
peers
|
||||
}
|
||||
|
||||
/// Returns true if the address book has an entry for `addr`.
|
||||
pub fn contains_addr(&self, addr: &SocketAddr) -> bool {
|
||||
let _guard = self.span.enter();
|
||||
self.by_addr.contains_key(addr)
|
||||
}
|
||||
|
||||
/// Returns the entry corresponding to `addr`, or `None` if it does not exist.
|
||||
pub fn get_by_addr(&self, addr: SocketAddr) -> Option<MetaAddr> {
|
||||
let _guard = self.span.enter();
|
||||
self.by_addr.get(&addr).cloned()
|
||||
}
|
||||
|
||||
/// Apply `change` to the address book, returning the updated `MetaAddr`,
|
||||
/// if the change was valid.
|
||||
///
|
||||
|
@ -162,6 +171,9 @@ impl AddressBook {
|
|||
/// All changes should go through `update`, so that the address book
|
||||
/// only contains valid outbound addresses.
|
||||
///
|
||||
/// Change addresses must be canonical `SocketAddr`s. This makes sure that
|
||||
/// each address book entry has a unique IP address.
|
||||
///
|
||||
/// # Security
|
||||
///
|
||||
/// This function must apply every attempted, responded, and failed change
|
||||
|
@ -218,6 +230,7 @@ impl AddressBook {
|
|||
///
|
||||
/// All address removals should go through `take`, so that the address
|
||||
/// book metrics are accurate.
|
||||
#[allow(dead_code)]
|
||||
fn take(&mut self, removed_addr: SocketAddr) -> Option<MetaAddr> {
|
||||
let _guard = self.span.enter();
|
||||
trace!(
|
||||
|
@ -314,14 +327,6 @@ impl AddressBook {
|
|||
.cloned()
|
||||
}
|
||||
|
||||
/// Returns an iterator that drains entries from the address book.
|
||||
///
|
||||
/// Removes entries in reconnection attempt then arbitrary order,
|
||||
/// see [`peers`] for details.
|
||||
pub fn drain(&'_ mut self) -> impl Iterator<Item = MetaAddr> + '_ {
|
||||
Drain { book: self }
|
||||
}
|
||||
|
||||
/// Returns the number of entries in this address book.
|
||||
pub fn len(&self) -> usize {
|
||||
self.by_addr.len()
|
||||
|
@ -442,16 +447,3 @@ impl Extend<MetaAddrChange> for AddressBook {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Drain<'a> {
|
||||
book: &'a mut AddressBook,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for Drain<'a> {
|
||||
type Item = MetaAddr;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let next_item_addr = self.book.peers().next()?.addr;
|
||||
self.book.take(next_item_addr)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
|
||||
use serde::{de, Deserialize, Deserializer};
|
||||
|
||||
use zebra_chain::parameters::Network;
|
||||
use zebra_chain::{parameters::Network, serialization::canonical_socket_addr};
|
||||
|
||||
use crate::BoxError;
|
||||
|
||||
|
@ -131,7 +131,7 @@ impl Config {
|
|||
let fut = tokio::time::timeout(crate::constants::DNS_LOOKUP_TIMEOUT, fut);
|
||||
|
||||
match fut.await {
|
||||
Ok(Ok(ips)) => Ok(ips.collect()),
|
||||
Ok(Ok(ips)) => Ok(ips.map(canonical_socket_addr).collect()),
|
||||
Ok(Err(e)) => {
|
||||
tracing::info!(?host, ?e, "DNS error resolving peer IP address");
|
||||
Err(e.into())
|
||||
|
@ -237,7 +237,7 @@ impl<'de> Deserialize<'de> for Config {
|
|||
}?;
|
||||
|
||||
Ok(Config {
|
||||
listen_addr,
|
||||
listen_addr: canonical_socket_addr(listen_addr),
|
||||
network: config.network,
|
||||
initial_mainnet_peers: config.initial_mainnet_peers,
|
||||
initial_testnet_peers: config.initial_testnet_peers,
|
||||
|
|
|
@ -11,8 +11,8 @@ use std::{
|
|||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
|
||||
use zebra_chain::serialization::{
|
||||
DateTime32, ReadZcashExt, SerializationError, TrustedPreallocate, WriteZcashExt,
|
||||
ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
|
||||
canonical_socket_addr, DateTime32, ReadZcashExt, SerializationError, TrustedPreallocate,
|
||||
WriteZcashExt, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
|
@ -26,7 +26,7 @@ use PeerAddrState::*;
|
|||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
use proptest_derive::Arbitrary;
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
use zebra_chain::serialization::arbitrary::canonical_socket_addr;
|
||||
use zebra_chain::serialization::arbitrary::canonical_socket_addr_strategy;
|
||||
#[cfg(any(test, feature = "proptest-impl"))]
|
||||
mod arbitrary;
|
||||
|
||||
|
@ -125,12 +125,15 @@ impl PartialOrd for PeerAddrState {
|
|||
#[derive(Copy, Clone, Debug)]
|
||||
#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))]
|
||||
pub struct MetaAddr {
|
||||
/// The peer's address.
|
||||
/// The peer's canonical socket address.
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
pub addr: SocketAddr,
|
||||
//
|
||||
// TODO: make addr private, so the constructors can make sure it is a
|
||||
// canonical SocketAddr (#2357)
|
||||
pub(crate) addr: SocketAddr,
|
||||
|
||||
/// The services advertised by the peer.
|
||||
///
|
||||
|
@ -148,8 +151,8 @@ pub struct MetaAddr {
|
|||
/// records, older peer versions, or buggy or malicious peers.
|
||||
//
|
||||
// TODO: make services private and optional
|
||||
// split gossiped and handshake services?
|
||||
pub services: PeerServices,
|
||||
// split gossiped and handshake services? (#2234)
|
||||
pub(crate) services: PeerServices,
|
||||
|
||||
/// The unverified "last seen time" gossiped by the remote peer that sent us
|
||||
/// this address.
|
||||
|
@ -173,10 +176,11 @@ pub struct MetaAddr {
|
|||
last_failure: Option<Instant>,
|
||||
|
||||
/// The outcome of our most recent communication attempt with this peer.
|
||||
pub last_connection_state: PeerAddrState,
|
||||
//
|
||||
// TODO: move the time and services fields into PeerAddrState?
|
||||
// TODO: make services private and optional?
|
||||
// move the time and services fields into PeerAddrState?
|
||||
// then some fields could be required in some states
|
||||
pub(crate) last_connection_state: PeerAddrState,
|
||||
}
|
||||
|
||||
/// A change to an existing `MetaAddr`.
|
||||
|
@ -187,7 +191,7 @@ pub enum MetaAddrChange {
|
|||
NewGossiped {
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
addr: SocketAddr,
|
||||
untrusted_services: PeerServices,
|
||||
|
@ -200,7 +204,7 @@ pub enum MetaAddrChange {
|
|||
NewAlternate {
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
addr: SocketAddr,
|
||||
untrusted_services: PeerServices,
|
||||
|
@ -210,7 +214,7 @@ pub enum MetaAddrChange {
|
|||
NewLocal {
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
addr: SocketAddr,
|
||||
},
|
||||
|
@ -220,7 +224,7 @@ pub enum MetaAddrChange {
|
|||
UpdateAttempt {
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
addr: SocketAddr,
|
||||
},
|
||||
|
@ -229,7 +233,7 @@ pub enum MetaAddrChange {
|
|||
UpdateResponded {
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
addr: SocketAddr,
|
||||
services: PeerServices,
|
||||
|
@ -239,7 +243,7 @@ pub enum MetaAddrChange {
|
|||
UpdateFailed {
|
||||
#[cfg_attr(
|
||||
any(test, feature = "proptest-impl"),
|
||||
proptest(strategy = "canonical_socket_addr()")
|
||||
proptest(strategy = "canonical_socket_addr_strategy()")
|
||||
)]
|
||||
addr: SocketAddr,
|
||||
services: Option<PeerServices>,
|
||||
|
@ -255,7 +259,7 @@ impl MetaAddr {
|
|||
untrusted_last_seen: DateTime32,
|
||||
) -> MetaAddr {
|
||||
MetaAddr {
|
||||
addr,
|
||||
addr: canonical_socket_addr(addr),
|
||||
services: untrusted_services,
|
||||
untrusted_last_seen: Some(untrusted_last_seen),
|
||||
last_response: None,
|
||||
|
@ -269,7 +273,7 @@ impl MetaAddr {
|
|||
/// `MetaAddr`.
|
||||
pub fn new_gossiped_change(self) -> MetaAddrChange {
|
||||
NewGossiped {
|
||||
addr: self.addr,
|
||||
addr: canonical_socket_addr(self.addr),
|
||||
untrusted_services: self.services,
|
||||
untrusted_last_seen: self
|
||||
.untrusted_last_seen
|
||||
|
@ -291,7 +295,7 @@ impl MetaAddr {
|
|||
/// - Zebra could advertise unreachable addresses to its own peers.
|
||||
pub fn new_responded(addr: &SocketAddr, services: &PeerServices) -> MetaAddrChange {
|
||||
UpdateResponded {
|
||||
addr: *addr,
|
||||
addr: canonical_socket_addr(*addr),
|
||||
services: *services,
|
||||
}
|
||||
}
|
||||
|
@ -299,21 +303,25 @@ impl MetaAddr {
|
|||
/// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we
|
||||
/// want to make an outbound connection to.
|
||||
pub fn new_reconnect(addr: &SocketAddr) -> MetaAddrChange {
|
||||
UpdateAttempt { addr: *addr }
|
||||
UpdateAttempt {
|
||||
addr: canonical_socket_addr(*addr),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`MetaAddrChange::NewAlternate`] for a peer's alternate address,
|
||||
/// received via a `Version` message.
|
||||
pub fn new_alternate(addr: &SocketAddr, untrusted_services: &PeerServices) -> MetaAddrChange {
|
||||
NewAlternate {
|
||||
addr: *addr,
|
||||
addr: canonical_socket_addr(*addr),
|
||||
untrusted_services: *untrusted_services,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`MetaAddrChange::NewLocal`] for our own listener address.
|
||||
pub fn new_local_listener(addr: &SocketAddr) -> MetaAddrChange {
|
||||
NewLocal { addr: *addr }
|
||||
pub fn new_local_listener_change(addr: &SocketAddr) -> MetaAddrChange {
|
||||
NewLocal {
|
||||
addr: canonical_socket_addr(*addr),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had
|
||||
|
@ -323,7 +331,7 @@ impl MetaAddr {
|
|||
services: impl Into<Option<PeerServices>>,
|
||||
) -> MetaAddrChange {
|
||||
UpdateFailed {
|
||||
addr: *addr,
|
||||
addr: canonical_socket_addr(*addr),
|
||||
services: services.into(),
|
||||
}
|
||||
}
|
||||
|
@ -482,15 +490,24 @@ impl MetaAddr {
|
|||
///
|
||||
/// Returns `None` if this `MetaAddr` should not be sent to remote peers.
|
||||
pub fn sanitize(&self) -> Option<MetaAddr> {
|
||||
let interval = crate::constants::TIMESTAMP_TRUNCATION_SECONDS;
|
||||
let ts = self.last_seen()?.timestamp();
|
||||
// This can't underflow, because `0 <= rem_euclid < ts`
|
||||
let last_seen = ts - ts.rem_euclid(interval);
|
||||
let last_seen = DateTime32::from(last_seen);
|
||||
if !self.last_known_info_is_valid_for_outbound() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Sanitize time
|
||||
let last_seen = self.last_seen()?;
|
||||
let remainder = last_seen
|
||||
.timestamp()
|
||||
.rem_euclid(crate::constants::TIMESTAMP_TRUNCATION_SECONDS);
|
||||
let last_seen = last_seen
|
||||
.checked_sub(remainder.into())
|
||||
.expect("unexpected underflow: rem_euclid is strictly less than timestamp");
|
||||
|
||||
Some(MetaAddr {
|
||||
addr: self.addr,
|
||||
// deserialization also sanitizes services to known flags
|
||||
services: self.services & PeerServices::all(),
|
||||
addr: canonical_socket_addr(self.addr),
|
||||
// TODO: split untrusted and direct services
|
||||
// sanitize untrusted services to NODE_NETWORK only? (#2234)
|
||||
services: self.services,
|
||||
// only put the last seen time in the untrusted field,
|
||||
// this matches deserialization, and avoids leaking internal state
|
||||
untrusted_last_seen: Some(last_seen),
|
||||
|
@ -540,9 +557,10 @@ impl MetaAddrChange {
|
|||
NewAlternate {
|
||||
untrusted_services, ..
|
||||
} => Some(*untrusted_services),
|
||||
// TODO: create a "services implemented by Zebra" constant
|
||||
// TODO: create a "services implemented by Zebra" constant (#2234)
|
||||
NewLocal { .. } => Some(PeerServices::NODE_NETWORK),
|
||||
UpdateAttempt { .. } => None,
|
||||
// TODO: split untrusted and direct services (#2234)
|
||||
UpdateResponded { services, .. } => Some(*services),
|
||||
UpdateFailed { services, .. } => *services,
|
||||
}
|
||||
|
@ -556,7 +574,8 @@ impl MetaAddrChange {
|
|||
..
|
||||
} => Some(*untrusted_last_seen),
|
||||
NewAlternate { .. } => None,
|
||||
NewLocal { .. } => None,
|
||||
// We know that our local listener is available
|
||||
NewLocal { .. } => Some(DateTime32::now()),
|
||||
UpdateAttempt { .. } => None,
|
||||
UpdateResponded { .. } => None,
|
||||
UpdateFailed { .. } => None,
|
||||
|
@ -630,7 +649,7 @@ impl MetaAddrChange {
|
|||
match self {
|
||||
NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => Some(MetaAddr {
|
||||
addr: self.addr(),
|
||||
// TODO: make services optional when we add a DNS seeder change/state
|
||||
// TODO: make services optional when we add a DNS seeder change and state
|
||||
services: self
|
||||
.untrusted_services()
|
||||
.expect("unexpected missing services"),
|
||||
|
@ -679,7 +698,7 @@ impl MetaAddrChange {
|
|||
// so malicious peers can't keep changing our peer connection order.
|
||||
Some(MetaAddr {
|
||||
addr: self.addr(),
|
||||
// TODO: or(self.untrusted_services()) when services become optional
|
||||
// TODO: or(self.untrusted_services()) when services become optional (#2234)
|
||||
services: previous.services,
|
||||
untrusted_last_seen: previous
|
||||
.untrusted_last_seen
|
||||
|
@ -702,8 +721,10 @@ impl MetaAddrChange {
|
|||
// connection timeout, even if changes are applied out of order.
|
||||
Some(MetaAddr {
|
||||
addr: self.addr(),
|
||||
// We want up-to-date services, even if they have fewer bits,
|
||||
// or they are applied out of order.
|
||||
services: self.untrusted_services().unwrap_or(previous.services),
|
||||
// only NeverAttempted changes can modify the last seen field
|
||||
// Only NeverAttempted changes can modify the last seen field
|
||||
untrusted_last_seen: previous.untrusted_last_seen,
|
||||
// Since Some(time) is always greater than None, `max` prefers:
|
||||
// - the latest time if both are Some
|
||||
|
@ -781,7 +802,7 @@ impl Ord for MetaAddr {
|
|||
// So this comparison will have no impact until Zebra implements
|
||||
// more service features.
|
||||
//
|
||||
// TODO: order services by usefulness, not bit pattern values
|
||||
// TODO: order services by usefulness, not bit pattern values (#2234)
|
||||
// Security: split gossiped and direct services
|
||||
let larger_services = self.services.cmp(&other.services);
|
||||
|
||||
|
|
|
@ -4,20 +4,26 @@ use proptest::{arbitrary::any, collection::vec, prelude::*};
|
|||
|
||||
use super::{MetaAddr, MetaAddrChange, PeerServices};
|
||||
|
||||
use zebra_chain::serialization::{arbitrary::canonical_socket_addr, DateTime32};
|
||||
use zebra_chain::serialization::{arbitrary::canonical_socket_addr_strategy, DateTime32};
|
||||
|
||||
/// The largest number of random changes we want to apply to a MetaAddr
|
||||
/// The largest number of random changes we want to apply to a [`MetaAddr`].
|
||||
///
|
||||
/// This should be at least twice the number of [`PeerAddrState`]s, so
|
||||
/// the tests can cover multiple transitions through every state.
|
||||
/// This should be at least twice the number of [`PeerAddrState`]s, so the tests
|
||||
/// can cover multiple transitions through every state.
|
||||
pub const MAX_ADDR_CHANGE: usize = 15;
|
||||
|
||||
/// The largest number of random addresses we want to add to an [`AddressBook`].
|
||||
///
|
||||
/// This should be at least the number of [`PeerAddrState`]s, so the tests can
|
||||
/// cover interactions between addresses in different states.
|
||||
pub const MAX_META_ADDR: usize = 8;
|
||||
|
||||
impl MetaAddr {
|
||||
/// Create a strategy that generates [`MetaAddr`]s in the
|
||||
/// [`PeerAddrState::NeverAttemptedGossiped`] state.
|
||||
pub fn gossiped_strategy() -> BoxedStrategy<Self> {
|
||||
(
|
||||
canonical_socket_addr(),
|
||||
canonical_socket_addr_strategy(),
|
||||
any::<PeerServices>(),
|
||||
any::<DateTime32>(),
|
||||
)
|
||||
|
@ -30,7 +36,7 @@ impl MetaAddr {
|
|||
/// Create a strategy that generates [`MetaAddr`]s in the
|
||||
/// [`PeerAddrState::NeverAttemptedAlternate`] state.
|
||||
pub fn alternate_strategy() -> BoxedStrategy<Self> {
|
||||
(canonical_socket_addr(), any::<PeerServices>())
|
||||
(canonical_socket_addr_strategy(), any::<PeerServices>())
|
||||
.prop_map(|(socket_addr, untrusted_services)| {
|
||||
MetaAddr::new_alternate(&socket_addr, &untrusted_services)
|
||||
.into_new_meta_addr()
|
||||
|
@ -78,7 +84,7 @@ impl MetaAddrChange {
|
|||
/// TODO: Generate all [`MetaAddrChange`] variants, and give them ready fields.
|
||||
/// (After PR #2276 merges.)
|
||||
pub fn ready_outbound_strategy() -> BoxedStrategy<Self> {
|
||||
canonical_socket_addr()
|
||||
canonical_socket_addr_strategy()
|
||||
.prop_filter_map("failed MetaAddr::is_valid_for_outbound", |addr| {
|
||||
// Alternate nodes use the current time, so they're always ready
|
||||
//
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
//! Shared test checks for MetaAddr
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use super::super::MetaAddr;
|
||||
|
||||
use crate::{constants::TIMESTAMP_TRUNCATION_SECONDS, types::PeerServices};
|
||||
|
@ -68,8 +70,24 @@ pub(crate) fn sanitize_avoids_leaks(original: &MetaAddr, sanitized: &MetaAddr) {
|
|||
// Sanitize to the the default state, even though it's not serialized
|
||||
assert_eq!(sanitized.last_connection_state, Default::default());
|
||||
// Sanitize to known flags
|
||||
assert_eq!(sanitized.services, original.services & PeerServices::all());
|
||||
let sanitized_peer_services = original.services & PeerServices::all();
|
||||
assert_eq!(sanitized.services, sanitized_peer_services);
|
||||
|
||||
// We want the other fields to be unmodified
|
||||
assert_eq!(sanitized.addr, original.addr);
|
||||
// Remove IPv6 scope ID and flow information
|
||||
let sanitized_socket_addr = SocketAddr::new(original.addr.ip(), original.addr.port());
|
||||
assert_eq!(sanitized.addr.ip(), original.addr.ip());
|
||||
assert_eq!(sanitized.addr.port(), original.addr.port());
|
||||
assert_eq!(sanitized.addr, sanitized_socket_addr);
|
||||
|
||||
// We want any other fields to be their defaults
|
||||
assert_eq!(
|
||||
sanitized,
|
||||
&MetaAddr::new_gossiped_meta_addr(
|
||||
sanitized_socket_addr,
|
||||
sanitized_peer_services,
|
||||
sanitized
|
||||
.last_seen()
|
||||
.expect("unexpected missing last seen time in sanitized MetaAddr")
|
||||
),
|
||||
);
|
||||
}
|
||||
|
|
|
@ -14,13 +14,18 @@ use tokio::{runtime::Runtime, time::Instant};
|
|||
use tower::service_fn;
|
||||
use tracing::Span;
|
||||
|
||||
use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize};
|
||||
use zebra_chain::serialization::{canonical_socket_addr, ZcashDeserialize, ZcashSerialize};
|
||||
|
||||
use super::check;
|
||||
use crate::{
|
||||
constants::LIVE_PEER_DURATION,
|
||||
meta_addr::{arbitrary::MAX_ADDR_CHANGE, MetaAddr, MetaAddrChange, PeerAddrState::*},
|
||||
meta_addr::{
|
||||
arbitrary::{MAX_ADDR_CHANGE, MAX_META_ADDR},
|
||||
MetaAddr, MetaAddrChange,
|
||||
PeerAddrState::*,
|
||||
},
|
||||
peer_set::candidate_set::CandidateSet,
|
||||
protocol::types::PeerServices,
|
||||
AddressBook, Config,
|
||||
};
|
||||
|
||||
|
@ -32,12 +37,22 @@ const DEFAULT_VERBOSE_TEST_PROPTEST_CASES: u32 = 256;
|
|||
|
||||
proptest! {
|
||||
/// Make sure that the sanitize function reduces time and state metadata
|
||||
/// leaks.
|
||||
/// leaks for valid addresses.
|
||||
///
|
||||
/// Make sure that the sanitize function skips invalid IP addresses, ports,
|
||||
/// and client services.
|
||||
#[test]
|
||||
fn sanitize_avoids_leaks(addr in MetaAddr::arbitrary()) {
|
||||
zebra_test::init();
|
||||
|
||||
if let Some(sanitized) = addr.sanitize() {
|
||||
// check that all sanitized addresses are valid for outbound
|
||||
prop_assert!(addr.last_known_info_is_valid_for_outbound());
|
||||
// also check the address, port, and services individually
|
||||
prop_assert!(!addr.addr.ip().is_unspecified());
|
||||
prop_assert_ne!(addr.addr.port(), 0);
|
||||
prop_assert!(addr.services.contains(PeerServices::NODE_NETWORK));
|
||||
|
||||
check::sanitize_avoids_leaks(&addr, &sanitized);
|
||||
}
|
||||
}
|
||||
|
@ -245,6 +260,40 @@ proptest! {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Make sure that a sanitized [`AddressBook`] contains the local listener
|
||||
/// [`MetaAddr`], regardless of the previous contents of the address book.
|
||||
///
|
||||
/// If Zebra gossips its own listener address to peers, and gets it back,
|
||||
/// its address book will contain its local listener address. This address
|
||||
/// will likely be in [`PeerAddrState::Failed`], due to failed
|
||||
/// self-connection attempts.
|
||||
#[test]
|
||||
fn sanitized_address_book_contains_local_listener(
|
||||
local_listener in any::<SocketAddr>(),
|
||||
address_book_addrs in vec(any::<MetaAddr>(), 0..MAX_META_ADDR),
|
||||
) {
|
||||
zebra_test::init();
|
||||
|
||||
let config = Config { listen_addr: local_listener, ..Config::default() };
|
||||
let address_book = AddressBook::new_with_addrs(&config, Span::none(), address_book_addrs);
|
||||
let sanitized_addrs = address_book.sanitized();
|
||||
|
||||
let expected_local_listener = address_book.get_local_listener();
|
||||
let canonical_local_listener = canonical_socket_addr(local_listener);
|
||||
let book_sanitized_local_listener = sanitized_addrs.iter().find(|meta_addr| meta_addr.addr == canonical_local_listener );
|
||||
|
||||
// invalid addresses should be removed by sanitization,
|
||||
// regardless of where they have come from
|
||||
prop_assert_eq!(
|
||||
book_sanitized_local_listener.cloned(),
|
||||
expected_local_listener.sanitize(),
|
||||
"address book: {:?}, sanitized_addrs: {:?}, canonical_local_listener: {:?}",
|
||||
address_book,
|
||||
sanitized_addrs,
|
||||
canonical_local_listener,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
|
|
|
@ -379,33 +379,32 @@ fn validate_addrs(
|
|||
/// This will consider all addresses as invalid if trying to offset their
|
||||
/// `last_seen` times to be before the limit causes an underflow.
|
||||
fn limit_last_seen_times(addrs: &mut Vec<MetaAddr>, last_seen_limit: DateTime32) {
|
||||
let (oldest_reported_seen_timestamp, newest_reported_seen_timestamp) =
|
||||
addrs
|
||||
.iter()
|
||||
.fold((u32::MAX, u32::MIN), |(oldest, newest), addr| {
|
||||
let last_seen = addr
|
||||
.untrusted_last_seen()
|
||||
.expect("unexpected missing last seen")
|
||||
.timestamp();
|
||||
(oldest.min(last_seen), newest.max(last_seen))
|
||||
});
|
||||
let last_seen_times = addrs.iter().map(|meta_addr| {
|
||||
meta_addr
|
||||
.untrusted_last_seen()
|
||||
.expect("unexpected missing last seen: should be provided by deserialization")
|
||||
});
|
||||
let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN);
|
||||
let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX);
|
||||
|
||||
// If any time is in the future, adjust all times, to compensate for clock skew on honest peers
|
||||
if newest_reported_seen_timestamp > last_seen_limit.timestamp() {
|
||||
let offset = newest_reported_seen_timestamp - last_seen_limit.timestamp();
|
||||
if newest_seen > last_seen_limit {
|
||||
let offset = newest_seen
|
||||
.checked_duration_since(last_seen_limit)
|
||||
.expect("unexpected underflow: just checked newest_seen is greater");
|
||||
|
||||
// Apply offset to oldest timestamp to check for underflow
|
||||
let oldest_resulting_timestamp = oldest_reported_seen_timestamp as i64 - offset as i64;
|
||||
if oldest_resulting_timestamp >= 0 {
|
||||
// Check for underflow
|
||||
if oldest_seen.checked_sub(offset).is_some() {
|
||||
// No underflow is possible, so apply offset to all addresses
|
||||
for addr in addrs {
|
||||
let old_last_seen = addr
|
||||
let last_seen = addr
|
||||
.untrusted_last_seen()
|
||||
.expect("unexpected missing last seen")
|
||||
.timestamp();
|
||||
let new_last_seen = old_last_seen - offset;
|
||||
.expect("unexpected missing last seen: should be provided by deserialization");
|
||||
let last_seen = last_seen
|
||||
.checked_sub(offset)
|
||||
.expect("unexpected underflow: just checked oldest_seen");
|
||||
|
||||
addr.set_untrusted_last_seen(new_last_seen.into());
|
||||
addr.set_untrusted_last_seen(last_seen);
|
||||
}
|
||||
} else {
|
||||
// An underflow will occur, so reject all gossiped peers
|
||||
|
|
|
@ -201,12 +201,16 @@ where
|
|||
S::Future: Send + 'static,
|
||||
{
|
||||
info!(?initial_peers, "connecting to initial peer set");
|
||||
// ## Correctness:
|
||||
// # Security
|
||||
//
|
||||
// Each `CallAll` can hold one `Buffer` or `Batch` reservation for
|
||||
// an indefinite period. We can use `CallAllUnordered` without filling
|
||||
// the underlying `Inbound` buffer, because we immediately drive this
|
||||
// single `CallAll` to completion, and handshakes have a short timeout.
|
||||
// TODO: rate-limit initial seed peer connections (#2326)
|
||||
//
|
||||
// # Correctness
|
||||
//
|
||||
// Each `FuturesUnordered` can hold one `Buffer` or `Batch` reservation for
|
||||
// an indefinite period. We can use `FuturesUnordered` without filling
|
||||
// the underlying network buffers, because we immediately drive this
|
||||
// single `FuturesUnordered` to completion, and handshakes have a short timeout.
|
||||
let mut handshakes: FuturesUnordered<_> = initial_peers
|
||||
.into_iter()
|
||||
.map(|addr| {
|
||||
|
|
|
@ -240,11 +240,7 @@ impl Service<zn::Request> for Inbound {
|
|||
// Briefly hold the address book threaded mutex while
|
||||
// cloning the address book. Then sanitize after releasing
|
||||
// the lock.
|
||||
let mut peers = address_book.lock().unwrap().clone();
|
||||
|
||||
// Add our local listener address to the advertised peers
|
||||
let local_listener = address_book.lock().unwrap().get_local_listener();
|
||||
peers.update(local_listener);
|
||||
let peers = address_book.lock().unwrap().clone();
|
||||
|
||||
// Send a sanitized response
|
||||
let mut peers = peers.sanitized();
|
||||
|
|
Loading…
Reference in New Issue