fix(network): when connecting to peers, ignore invalid ports, and prefer canonical ports (#4564)

* Move peer address validation into its own module

* Add a network parameter to AddressBook and some MetaAddr methods

* Reject invalid initial peers, and connect to them in preferred order

* Reject Flux/ZelCash and misconfigured Zcash peers

* Prefer canonical Zcash ports

* Make peer_preference into a struct method

* Prefer peer addresses with canonical ports for outbound connections

* Also ignore the Zcash regtest port

* Document where field and variant order is required for correctness

* Use the correct peer count

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2022-06-14 14:58:37 +10:00 committed by GitHub
parent 2e50ccc8f3
commit 92fd11f9ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 325 additions and 97 deletions

View File

@ -8,6 +8,8 @@ use ordered_map::OrderedMap;
use tokio::sync::watch;
use tracing::Span;
use zebra_chain::parameters::Network;
use crate::{
constants, meta_addr::MetaAddrChange, protocol::external::canonical_socket_addr,
types::MetaAddr, PeerAddrState,
@ -61,15 +63,18 @@ pub struct AddressBook {
/// [`OrderedMap`] sorts in descending order.
by_addr: OrderedMap<SocketAddr, MetaAddr, Reverse<MetaAddr>>,
/// The local listener address.
local_listener: SocketAddr,
/// The configured Zcash network.
network: Network,
/// The maximum number of addresses in the address book.
///
/// Always set to [`MAX_ADDRS_IN_ADDRESS_BOOK`](constants::MAX_ADDRS_IN_ADDRESS_BOOK),
/// in release builds. Lower values are used during testing.
addr_limit: usize,
/// The local listener address.
local_listener: SocketAddr,
/// The span for operations on this address book.
span: Span,
@ -107,9 +112,10 @@ pub struct AddressMetrics {
#[allow(clippy::len_without_is_empty)]
impl AddressBook {
/// Construct an [`AddressBook`] with the given `local_listener` and
/// [`tracing::Span`].
pub fn new(local_listener: SocketAddr, span: Span) -> AddressBook {
/// Construct an [`AddressBook`] with the given `local_listener` on `network`.
///
/// Uses the supplied [`tracing::Span`] for address book operations.
pub fn new(local_listener: SocketAddr, network: Network, span: Span) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();
@ -122,8 +128,9 @@ impl AddressBook {
let mut new_book = AddressBook {
by_addr: OrderedMap::new(|meta_addr| Reverse(*meta_addr)),
addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
local_listener: canonical_socket_addr(local_listener),
network,
addr_limit: constants::MAX_ADDRS_IN_ADDRESS_BOOK,
span,
address_metrics_tx,
last_address_log: None,
@ -133,7 +140,7 @@ impl AddressBook {
new_book
}
/// Construct an [`AddressBook`] with the given `local_listener`,
/// Construct an [`AddressBook`] with the given `local_listener`, `network`,
/// `addr_limit`, [`tracing::Span`], and addresses.
///
/// `addr_limit` is enforced by this method, and by [`AddressBook::update`].
@ -147,6 +154,7 @@ impl AddressBook {
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_addrs(
local_listener: SocketAddr,
network: Network,
addr_limit: usize,
span: Span,
addrs: impl IntoIterator<Item = MetaAddr>,
@ -157,7 +165,7 @@ impl AddressBook {
let instant_now = Instant::now();
let chrono_now = Utc::now();
let mut new_book = AddressBook::new(local_listener, span);
let mut new_book = AddressBook::new(local_listener, network, span);
new_book.addr_limit = addr_limit;
let addrs = addrs
@ -166,7 +174,7 @@ impl AddressBook {
meta_addr.addr = canonical_socket_addr(meta_addr.addr);
meta_addr
})
.filter(MetaAddr::address_is_valid_for_outbound)
.filter(|meta_addr| meta_addr.address_is_valid_for_outbound(network))
.take(addr_limit)
.map(|meta_addr| (meta_addr.addr, meta_addr));
@ -215,7 +223,7 @@ impl AddressBook {
// Then sanitize and shuffle
let mut peers = peers
.descending_values()
.filter_map(MetaAddr::sanitize)
.filter_map(|meta_addr| meta_addr.sanitize(self.network))
// Security: remove peers that:
// - last responded more than three hours ago, or
// - haven't responded yet but were reported last seen more than three hours ago
@ -286,7 +294,7 @@ impl AddressBook {
if let Some(updated) = updated {
// Ignore invalid outbound addresses.
// (Inbound connections can be monitored via Zebra's metrics.)
if !updated.address_is_valid_for_outbound() {
if !updated.address_is_valid_for_outbound(self.network) {
return None;
}
@ -295,9 +303,7 @@ impl AddressBook {
//
// Otherwise, if we got the info directly from the peer,
// store it in the address book, so we know not to reconnect.
//
// TODO: delete peers with invalid info when they get too old (#1873)
if !updated.last_known_info_is_valid_for_outbound()
if !updated.last_known_info_is_valid_for_outbound(self.network)
&& updated.last_connection_state.is_never_attempted()
{
return None;
@ -408,7 +414,9 @@ impl AddressBook {
// The peers are already stored in sorted order.
self.by_addr
.descending_values()
.filter(move |peer| peer.is_ready_for_connection_attempt(instant_now, chrono_now))
.filter(move |peer| {
peer.is_ready_for_connection_attempt(instant_now, chrono_now, self.network)
})
.cloned()
}
@ -437,7 +445,9 @@ impl AddressBook {
self.by_addr
.descending_values()
.filter(move |peer| !peer.is_ready_for_connection_attempt(instant_now, chrono_now))
.filter(move |peer| {
!peer.is_ready_for_connection_attempt(instant_now, chrono_now, self.network)
})
.cloned()
}
@ -611,8 +621,9 @@ impl Clone for AddressBook {
AddressBook {
by_addr: self.by_addr.clone(),
addr_limit: self.addr_limit,
local_listener: self.local_listener,
network: self.network,
addr_limit: self.addr_limit,
span: self.span.clone(),
address_metrics_tx,
last_address_log: None,

View File

@ -6,7 +6,7 @@ use chrono::Utc;
use proptest::{collection::vec, prelude::*};
use tracing::Span;
use zebra_chain::serialization::Duration32;
use zebra_chain::{parameters::Network::*, serialization::Duration32};
use crate::{
constants::{MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
@ -29,6 +29,7 @@ proptest! {
let address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
addresses
@ -57,6 +58,7 @@ proptest! {
let address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
addresses
@ -94,6 +96,7 @@ proptest! {
let mut address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
addr_limit,
Span::none(),
initial_addrs.clone(),
@ -115,6 +118,7 @@ proptest! {
let mut address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
addr_limit,
Span::none(),
initial_addrs,

View File

@ -5,7 +5,10 @@ use std::time::Instant;
use chrono::Utc;
use tracing::Span;
use zebra_chain::serialization::{DateTime32, Duration32};
use zebra_chain::{
parameters::Network::*,
serialization::{DateTime32, Duration32},
};
use crate::{
constants::MAX_ADDRS_IN_ADDRESS_BOOK, meta_addr::MetaAddr,
@ -15,7 +18,7 @@ use crate::{
/// Make sure an empty address book is actually empty.
#[test]
fn address_book_empty() {
let address_book = AddressBook::new("0.0.0.0:0".parse().unwrap(), Span::current());
let address_book = AddressBook::new("0.0.0.0:0".parse().unwrap(), Mainnet, Span::current());
assert_eq!(
address_book
@ -44,6 +47,7 @@ fn address_book_peer_order() {
let addrs = vec![meta_addr1, meta_addr2];
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
@ -59,6 +63,7 @@ fn address_book_peer_order() {
let addrs = vec![meta_addr2, meta_addr1];
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
@ -77,6 +82,7 @@ fn address_book_peer_order() {
let addrs = vec![meta_addr1, meta_addr2];
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,
@ -92,6 +98,7 @@ fn address_book_peer_order() {
let addrs = vec![meta_addr2, meta_addr1];
let address_book = AddressBook::new_with_addrs(
"0.0.0.0:0".parse().unwrap(),
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::current(),
addrs,

View File

@ -50,7 +50,11 @@ impl AddressBookUpdater {
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
let address_book = AddressBook::new(local_listener, span!(Level::TRACE, "address book"));
let address_book = AddressBook::new(
local_listener,
config.network,
span!(Level::TRACE, "address book"),
);
let address_metrics = address_book.address_metrics_watcher();
let address_book = Arc::new(std::sync::Mutex::new(address_book));

View File

@ -8,10 +8,11 @@ use std::{
};
use chrono::Utc;
use zebra_chain::serialization::DateTime32;
use zebra_chain::{parameters::Network, serialization::DateTime32};
use crate::{
constants,
peer::PeerPreference,
protocol::{external::canonical_socket_addr, types::PeerServices},
};
@ -373,6 +374,11 @@ impl MetaAddr {
self.addr
}
/// Return the address preference level for this `MetaAddr`.
pub fn peer_preference(&self) -> Result<PeerPreference, &'static str> {
PeerPreference::new(&self.addr, None)
}
/// Returns the time of the last successful interaction with this peer.
///
/// Initially set to the unverified "last seen time" gossiped by the remote
@ -516,8 +522,9 @@ impl MetaAddr {
&self,
instant_now: Instant,
chrono_now: chrono::DateTime<Utc>,
network: Network,
) -> bool {
self.last_known_info_is_valid_for_outbound()
self.last_known_info_is_valid_for_outbound(network)
&& !self.has_connection_recently_responded(chrono_now)
&& !self.was_connection_recently_attempted(instant_now)
&& !self.has_connection_recently_failed(instant_now)
@ -529,8 +536,8 @@ impl MetaAddr {
///
/// Since the addresses in the address book are unique, this check can be
/// used to permanently reject entire [`MetaAddr`]s.
pub fn address_is_valid_for_outbound(&self) -> bool {
!self.addr.ip().is_unspecified() && self.addr.port() != 0
pub fn address_is_valid_for_outbound(&self, network: Network) -> bool {
PeerPreference::new(&self.addr, network).is_ok()
}
/// Is the last known information for this peer valid for outbound
@ -540,13 +547,13 @@ impl MetaAddr {
/// only be used to:
/// - reject `NeverAttempted...` [`MetaAddrChange`]s, and
/// - temporarily stop outbound connections to a [`MetaAddr`].
pub fn last_known_info_is_valid_for_outbound(&self) -> bool {
pub fn last_known_info_is_valid_for_outbound(&self, network: Network) -> bool {
let is_node = match self.services {
Some(services) => services.contains(PeerServices::NODE_NETWORK),
None => true,
};
is_node && self.address_is_valid_for_outbound()
is_node && self.address_is_valid_for_outbound(network)
}
/// Should this peer considered reachable?
@ -584,8 +591,8 @@ impl MetaAddr {
/// Return a sanitized version of this `MetaAddr`, for sending to a remote peer.
///
/// Returns `None` if this `MetaAddr` should not be sent to remote peers.
pub fn sanitize(&self) -> Option<MetaAddr> {
if !self.last_known_info_is_valid_for_outbound() {
pub fn sanitize(&self, network: Network) -> Option<MetaAddr> {
if !self.last_known_info_is_valid_for_outbound(network) {
return None;
}
@ -853,7 +860,7 @@ impl Ord for MetaAddr {
/// `MetaAddr`s are sorted in approximate reconnection attempt order, but
/// with `Responded` peers sorted first as a group.
///
/// This order should not be used for reconnection attempts: use
/// But this order should not be used for reconnection attempts: use
/// [`reconnection_peers`][rp] instead.
///
/// See [`CandidateSet`] for more details.
@ -866,6 +873,10 @@ impl Ord for MetaAddr {
// First, try states that are more likely to work
let more_reliable_state = self.last_connection_state.cmp(&other.last_connection_state);
// Then, try addresses that are more likely to be valid.
// Currently, this prefers addresses with canonical Zcash ports.
let more_likely_valid = self.peer_preference().cmp(&other.peer_preference());
// # Security and Correctness
//
// Prioritise older attempt times, so we try all peers in each state,
@ -934,6 +945,7 @@ impl Ord for MetaAddr {
let port_tie_breaker = self.addr.port().cmp(&other.addr.port());
more_reliable_state
.then(more_likely_valid)
.then(older_attempt)
.then(older_failure)
.then(older_response)

View File

@ -1,8 +1,10 @@
//! Randomised test data generation for MetaAddr.
use std::net::SocketAddr;
use proptest::{arbitrary::any, collection::vec, prelude::*};
use zebra_chain::serialization::DateTime32;
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
use crate::protocol::external::arbitrary::canonical_socket_addr_strategy;
@ -108,7 +110,7 @@ impl MetaAddrChange {
if change
.into_new_meta_addr()
.expect("unexpected invalid alternate change")
.last_known_info_is_valid_for_outbound()
.last_known_info_is_valid_for_outbound(Mainnet)
{
Some(change)
} else {

View File

@ -11,7 +11,7 @@ use tokio::time::Instant;
use tower::service_fn;
use tracing::Span;
use zebra_chain::serialization::DateTime32;
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
use crate::{
constants::{MAX_ADDRS_IN_ADDRESS_BOOK, MAX_RECENT_PEER_AGE, MIN_PEER_RECONNECTION_DELAY},
@ -43,9 +43,9 @@ proptest! {
fn sanitize_avoids_leaks(addr in MetaAddr::arbitrary()) {
zebra_test::init();
if let Some(sanitized) = addr.sanitize() {
if let Some(sanitized) = addr.sanitize(Mainnet) {
// check that all sanitized addresses are valid for outbound
prop_assert!(addr.last_known_info_is_valid_for_outbound());
prop_assert!(addr.last_known_info_is_valid_for_outbound(Mainnet));
// also check the address, port, and services individually
prop_assert!(!addr.addr.ip().is_unspecified());
prop_assert_ne!(addr.addr.port(), 0);
@ -114,7 +114,7 @@ proptest! {
let mut attempt_count: usize = 0;
for change in changes {
while addr.is_ready_for_connection_attempt(instant_now, chrono_now) {
while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) {
attempt_count += 1;
// Assume that this test doesn't last longer than MIN_PEER_RECONNECTION_DELAY
prop_assert!(attempt_count <= 1);
@ -151,6 +151,7 @@ proptest! {
let address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
address_book_addrs
@ -167,7 +168,7 @@ proptest! {
// regardless of where they have come from
prop_assert_eq!(
book_sanitized_local_listener.cloned(),
expected_local_listener.sanitize(),
expected_local_listener.sanitize(Mainnet),
"address book: {:?}, sanitized_addrs: {:?}, canonical_local_listener: {:?}",
address_book,
sanitized_addrs,
@ -205,6 +206,7 @@ proptest! {
// Check address book update - return value
let mut address_book = AddressBook::new_with_addrs(
local_listener,
Mainnet,
1,
Span::none(),
Vec::new(),
@ -215,8 +217,8 @@ proptest! {
let book_contents: Vec<MetaAddr> = address_book.peers().collect();
// Ignore the same addresses that the address book ignores
let expected_result = if !expected_result.address_is_valid_for_outbound()
|| ( !expected_result.last_known_info_is_valid_for_outbound()
let expected_result = if !expected_result.address_is_valid_for_outbound(Mainnet)
|| ( !expected_result.last_known_info_is_valid_for_outbound(Mainnet)
&& expected_result.last_connection_state.is_never_attempted())
{
None
@ -309,7 +311,7 @@ proptest! {
// Only put valid addresses in the address book.
// This means some tests will start with an empty address book.
let addrs = if addr.last_known_info_is_valid_for_outbound() {
let addrs = if addr.last_known_info_is_valid_for_outbound(Mainnet) {
Some(addr)
} else {
None
@ -317,6 +319,7 @@ proptest! {
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
Mainnet,
MAX_ADDRS_IN_ADDRESS_BOOK,
Span::none(),
addrs,
@ -355,7 +358,7 @@ proptest! {
LIVE_PEER_INTERVALS,
overall_test_time,
peer_change_interval,
addr.last_known_info_is_valid_for_outbound(),
addr.last_known_info_is_valid_for_outbound(Mainnet),
);
}
@ -422,7 +425,7 @@ proptest! {
let addr = addrs.entry(addr.addr).or_insert(*addr);
let change = changes.get(change_index);
while addr.is_ready_for_connection_attempt(instant_now, chrono_now) {
while addr.is_ready_for_connection_attempt(instant_now, chrono_now, Mainnet) {
*attempt_counts.entry(addr.addr).or_default() += 1;
prop_assert!(
*attempt_counts.get(&addr.addr).unwrap() <= LIVE_PEER_INTERVALS + 1

View File

@ -3,10 +3,15 @@
use std::net::SocketAddr;
use chrono::Utc;
use zebra_chain::serialization::{DateTime32, Duration32};
use zebra_chain::{
parameters::Network::*,
serialization::{DateTime32, Duration32},
};
use crate::{constants::MAX_PEER_ACTIVE_FOR_GOSSIP, protocol::types::PeerServices};
use super::{super::MetaAddr, check};
use crate::{constants::MAX_PEER_ACTIVE_FOR_GOSSIP, protocol::types::PeerServices};
/// Margin of error for time-based tests.
///
@ -39,10 +44,10 @@ fn sanitize_extremes() {
last_connection_state: Default::default(),
};
if let Some(min_sanitized) = min_time_entry.sanitize() {
if let Some(min_sanitized) = min_time_entry.sanitize(Mainnet) {
check::sanitize_avoids_leaks(&min_time_entry, &min_sanitized);
}
if let Some(max_sanitized) = max_time_entry.sanitize() {
if let Some(max_sanitized) = max_time_entry.sanitize(Mainnet) {
check::sanitize_avoids_leaks(&max_time_entry, &max_sanitized);
}
}

View File

@ -1,4 +1,4 @@
//! Peer handling.
//! Peer connection handling.
mod client;
mod connection;
@ -7,6 +7,7 @@ mod error;
mod handshake;
mod load_tracked_client;
mod minimum_peer_version;
mod priority;
#[cfg(any(test, feature = "proptest-impl"))]
pub use client::tests::ClientTestHarness;
@ -27,3 +28,4 @@ pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};
pub use load_tracked_client::LoadTrackedClient;
pub use minimum_peer_version::MinimumPeerVersion;
pub use priority::{AttributePreference, PeerPreference};

View File

@ -0,0 +1,130 @@
//! Prioritizing outbound peer connections based on peer attributes.
use std::net::SocketAddr;
use zebra_chain::parameters::Network;
use AttributePreference::*;
/// A level of preference for a peer attribute.
///
/// Invalid peer attributes are represented as errors.
///
/// Outbound peer connections are initiated in the sorted [order](std::ops::Ord) of this type.
///
/// The derived order depends on the order of the variants in the enum.
/// The variants are sorted in the order they are listed.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub enum AttributePreference {
/// This peer is more likely to be a valid Zcash network peer.
///
/// # Correctness
///
/// This variant must be declared as the first enum variant,
/// so that `Preferred` peers sort before `Acceptable` peers.
Preferred,
/// This peer is possibly a valid Zcash network peer.
Acceptable,
}
/// A level of preference for a peer.
///
/// Outbound peer connections are initiated in the sorted [order](std::ops::Ord) of this type.
///
/// The derived order depends on the order of the fields in the struct.
/// The first field determines the overall order, then later fields sort equal first field values.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
pub struct PeerPreference {
/// Is the peer using the canonical Zcash port for the configured [`Network`]?
canonical_port: AttributePreference,
}
impl AttributePreference {
/// Returns `Preferred` if `is_preferred` is true.
pub fn preferred_from(is_preferred: bool) -> Self {
if is_preferred {
Preferred
} else {
Acceptable
}
}
/// Returns `true` for `Preferred` attributes.
#[allow(dead_code)]
pub fn is_preferred(&self) -> bool {
match self {
Preferred => true,
Acceptable => false,
}
}
}
impl PeerPreference {
/// Return a preference for the peer at `peer_addr` on `network`.
///
/// Use the [`PeerPreference`] [`Ord`] implementation to sort preferred peers first.
pub fn new(
peer_addr: &SocketAddr,
network: impl Into<Option<Network>>,
) -> Result<PeerPreference, &'static str> {
address_is_valid_for_outbound_connections(peer_addr, network)?;
// This check only prefers the configured network,
// because the address book and initial peer connections reject the port used by the other network.
let canonical_port =
AttributePreference::preferred_from([8232, 18232].contains(&peer_addr.port()));
Ok(PeerPreference { canonical_port })
}
}
/// Is the [`SocketAddr`] we have for this peer valid for outbound
/// connections?
///
/// Since the addresses in the address book are unique, this check can be
/// used to permanently reject entire [`MetaAddr`]s.
fn address_is_valid_for_outbound_connections(
peer_addr: &SocketAddr,
network: impl Into<Option<Network>>,
) -> Result<(), &'static str> {
// TODO: make private IP addresses an error unless a debug config is set (#3117)
if peer_addr.ip().is_unspecified() {
return Err("invalid peer IP address: unspecified addresses can not be used for outbound connections");
}
// 0 is an invalid outbound port.
if peer_addr.port() == 0 {
return Err(
"invalid peer port: unspecified ports can not be used for outbound connections",
);
}
// Ignore ports used by similar networks: Flux/ZelCash and misconfigured Zcash.
if let Some(network) = network.into() {
if peer_addr.port() == network.default_port() {
return Ok(());
}
if peer_addr.port() == 8232 {
return Err(
"invalid peer port: port is for Mainnet, but this node is configured for Testnet",
);
} else if peer_addr.port() == 18232 {
return Err(
"invalid peer port: port is for Testnet, but this node is configured for Mainnet",
);
} else if peer_addr.port() == 18344 {
return Err(
"invalid peer port: port is for Regtest, but Zebra does not support that network",
);
} else if [16125, 26125].contains(&peer_addr.port()) {
// 16125/26125 is used by Flux/ZelCash, which uses the same network magic numbers as Zcash,
// so we have to reject it by port
return Err("invalid peer port: port is for a non-Zcash network");
}
}
Ok(())
}

View File

@ -10,15 +10,16 @@ use proptest::{collection::vec, prelude::*};
use tokio::time::{sleep, timeout};
use tracing::Span;
use zebra_chain::serialization::DateTime32;
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::MIN_PEER_CONNECTION_INTERVAL,
meta_addr::{MetaAddr, MetaAddrChange},
AddressBook, BoxError, Request, Response,
};
use super::super::{validate_addrs, CandidateSet};
/// The maximum number of candidates for a "next peer" test.
const MAX_TEST_CANDIDATES: u32 = 4;
@ -64,7 +65,7 @@ proptest! {
});
// Since the address book is empty, there won't be any available peers
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Mainnet, Span::none());
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
@ -102,7 +103,7 @@ proptest! {
unreachable!("Mock peer service is never used");
});
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Mainnet, Span::none());
address_book.extend(peers);
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);

View File

@ -1,3 +1,5 @@
//! Fixed test vectors for CandidateSet.
use std::{
convert::TryInto,
net::{IpAddr, SocketAddr},
@ -10,16 +12,17 @@ use chrono::{DateTime, Duration, Utc};
use tokio::time::{self, Instant};
use tracing::Span;
use zebra_chain::serialization::DateTime32;
use zebra_chain::{parameters::Network::*, serialization::DateTime32};
use zebra_test::mock_service::{MockService, PanicAssertion};
use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::{GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL},
types::{MetaAddr, PeerServices},
AddressBook, Request, Response,
};
use super::super::{validate_addrs, CandidateSet};
/// Test that offset is applied when all addresses have `last_seen` times in the future.
#[test]
fn offsets_last_seen_times_in_the_future() {
@ -136,7 +139,11 @@ fn candidate_set_updates_are_rate_limited() {
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = AddressBook::new(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
Mainnet,
Span::none(),
);
let mut peer_service = MockService::build().for_unit_tests();
let mut candidate_set = CandidateSet::new(
Arc::new(std::sync::Mutex::new(address_book)),
@ -177,7 +184,11 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
let runtime = zebra_test::init_async();
let _guard = runtime.enter();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = AddressBook::new(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
Mainnet,
Span::none(),
);
let mut peer_service = MockService::build().for_unit_tests();
let mut candidate_set = CandidateSet::new(
Arc::new(std::sync::Mutex::new(address_book)),

View File

@ -3,7 +3,11 @@
// Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed).
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
net::SocketAddr,
sync::Arc,
};
use futures::{
future::{self, FutureExt},
@ -30,7 +34,7 @@ use crate::{
address_book_updater::AddressBookUpdater,
constants,
meta_addr::{MetaAddr, MetaAddrChange},
peer::{self, HandshakeRequest, MinimumPeerVersion, OutboundConnectorRequest},
peer::{self, HandshakeRequest, MinimumPeerVersion, OutboundConnectorRequest, PeerPreference},
peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
AddressBook, BoxError, Config, Request, Response,
};
@ -197,10 +201,6 @@ where
// because zcashd rate-limits `addr`/`addrv2` messages per connection,
// and if we only have one initial peer,
// we need to ensure that its `Response::Addr` is used by the crawler.
//
// TODO: cache the most recent `Response::Addr` returned by each peer.
// If the request times out, return the cached response to the caller.
info!(
?active_initial_peer_count,
"sending initial request for peers"
@ -383,36 +383,62 @@ async fn limit_initial_peers(
config: &Config,
address_book_updater: tokio::sync::mpsc::Sender<MetaAddrChange>,
) -> HashSet<SocketAddr> {
let all_peers = config.initial_peers().await;
let peers_count = all_peers.len();
let all_peers: HashSet<SocketAddr> = config.initial_peers().await;
let mut preferred_peers: BTreeMap<PeerPreference, Vec<SocketAddr>> = BTreeMap::new();
// # Correctness
//
// We can't exit early if we only have a few peers,
// because we still need to shuffle the connection order.
if all_peers.len() > config.peerset_initial_target_size {
let all_peers_count = all_peers.len();
if all_peers_count > config.peerset_initial_target_size {
info!(
"limiting the initial peers list from {} to {}",
peers_count, config.peerset_initial_target_size
all_peers_count, config.peerset_initial_target_size,
);
}
// Split out the `initial_peers` that will be shuffled and returned.
let mut initial_peers: Vec<SocketAddr> = all_peers.iter().cloned().collect();
let (initial_peers, _unused_peers) =
initial_peers.partial_shuffle(&mut rand::thread_rng(), config.peerset_initial_target_size);
// Send every initial peer to the address book.
// Filter out invalid initial peers, and prioritise valid peers for initial connections.
// (This treats initial peers the same way we treat gossiped peers.)
for peer in all_peers {
let peer_addr = MetaAddr::new_initial_peer(peer);
for peer_addr in all_peers {
let preference = PeerPreference::new(&peer_addr, config.network);
match preference {
Ok(preference) => preferred_peers
.entry(preference)
.or_default()
.push(peer_addr),
Err(error) => warn!(
?peer_addr,
?error,
"invalid initial peer from DNS seeder or configured IP address",
),
}
}
// Send every initial peer to the address book, in preferred order.
// (This treats initial peers the same way we treat gossiped peers.)
for peer in preferred_peers.values().flatten() {
let peer_addr = MetaAddr::new_initial_peer(*peer);
// `send` only waits when the channel is full.
// The address book updater runs in its own thread, so we will only wait for a short time.
let _ = address_book_updater.send(peer_addr).await;
}
initial_peers.iter().copied().collect()
// Split out the `initial_peers` that will be shuffled and returned,
// choosing preferred peers first.
let mut initial_peers: HashSet<SocketAddr> = HashSet::new();
for better_peers in preferred_peers.values() {
let mut better_peers = better_peers.clone();
let (chosen_peers, _unused_peers) = better_peers.partial_shuffle(
&mut rand::thread_rng(),
config.peerset_initial_target_size - initial_peers.len(),
);
initial_peers.extend(chosen_peers.iter());
if initial_peers.len() >= config.peerset_initial_target_size {
break;
}
}
initial_peers
}
/// Open a peer connection listener on `config.listen_addr`,

View File

@ -1260,7 +1260,7 @@ where
}
// Manually initialize an address book without a timestamp tracker.
let mut address_book = AddressBook::new(config.listen_addr, Span::current());
let mut address_book = AddressBook::new(config.listen_addr, config.network, Span::current());
// Add enough fake peers to go over the limit, even if the limit is zero.
let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1;

View File

@ -307,7 +307,7 @@ impl PeerSetGuard {
let local_listener = "127.0.0.1:1000"
.parse()
.expect("Invalid local listener address");
let address_book = AddressBook::new(local_listener, Span::none());
let address_book = AddressBook::new(local_listener, Network::Mainnet, Span::none());
Arc::new(std::sync::Mutex::new(address_book))
}

View File

@ -4,9 +4,12 @@ use std::convert::TryInto;
use proptest::prelude::*;
use zebra_chain::serialization::{
arbitrary::max_allocation_is_big_enough, TrustedPreallocate, ZcashSerialize,
MAX_PROTOCOL_MESSAGE_LEN,
use zebra_chain::{
parameters::Network::*,
serialization::{
arbitrary::max_allocation_is_big_enough, TrustedPreallocate, ZcashSerialize,
MAX_PROTOCOL_MESSAGE_LEN,
},
};
use crate::{
@ -75,7 +78,7 @@ proptest! {
zebra_test::init();
// We require sanitization before serialization
let addr = addr.sanitize();
let addr = addr.sanitize(Mainnet);
prop_assume!(addr.is_some());
let addr: AddrV1 = addr.unwrap().into();
@ -94,7 +97,7 @@ proptest! {
zebra_test::init();
// We require sanitization before serialization
let addr = addr.sanitize();
let addr = addr.sanitize(Mainnet);
prop_assume!(addr.is_some());
let addr: AddrV1 = addr.unwrap().into();
@ -126,7 +129,7 @@ proptest! {
zebra_test::init();
// We require sanitization before serialization
let addr = addr.sanitize();
let addr = addr.sanitize(Mainnet);
prop_assume!(addr.is_some());
let addr: AddrV2 = addr.unwrap().into();
@ -145,7 +148,7 @@ proptest! {
zebra_test::init();
// We require sanitization before serialization
let addr = addr.sanitize();
let addr = addr.sanitize(Mainnet);
prop_assume!(addr.is_some());
let addr: AddrV2 = addr.unwrap().into();

View File

@ -6,9 +6,12 @@ use bytes::BytesMut;
use proptest::{collection::vec, prelude::*};
use tokio_util::codec::{Decoder, Encoder};
use zebra_chain::serialization::{
SerializationError, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
MAX_PROTOCOL_MESSAGE_LEN,
use zebra_chain::{
parameters::Network::*,
serialization::{
SerializationError, ZcashDeserialize, ZcashDeserializeInto, ZcashSerialize,
MAX_PROTOCOL_MESSAGE_LEN,
},
};
use crate::{
@ -108,7 +111,7 @@ proptest! {
// We require sanitization before serialization,
// but we also need the original address for this test
let sanitized_addr = addr.sanitize();
let sanitized_addr = addr.sanitize(Mainnet);
prop_assume!(sanitized_addr.is_some());
let sanitized_addr = sanitized_addr.unwrap();
@ -181,7 +184,7 @@ proptest! {
// We require sanitization before serialization,
// but we also need the original address for this test
let sanitized_addr = addr.sanitize();
let sanitized_addr = addr.sanitize(Mainnet);
prop_assume!(sanitized_addr.is_some());
let sanitized_addr = sanitized_addr.unwrap();

View File

@ -17,7 +17,7 @@ use tracing::Span;
use zebra_chain::{
amount::Amount,
block::Block,
parameters::Network,
parameters::Network::{self, *},
serialization::ZcashDeserializeInto,
transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx},
};
@ -692,10 +692,14 @@ async fn setup(
) {
zebra_test::init();
let network = Network::Mainnet;
let network = Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = AddressBook::new(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
Mainnet,
Span::none(),
);
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _read_only_state_service, latest_chain_tip, chain_tip_change) =