Gossip dynamic local listener ports to peers (#2277)

* Gossip dynamically allocated listener ports to peers

Previously, Zebra would either gossip port `0`, which is invalid, or skip
gossiping its own dynamically allocated listener port.

* Improve "no configured peers" warning

And downgrade from error to warning, because inbound-only nodes are a
valid use case.

* Move random_known_port to zebra-test

* Add tests for dynamic local listener ports and the AddressBook

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
teor 2021-06-23 07:59:06 +10:00 committed by GitHub
parent e87933e167
commit bcd5f2c50d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 263 additions and 107 deletions

2
Cargo.lock generated
View File

@ -4554,6 +4554,7 @@ dependencies = [
"owo-colors 2.0.0",
"pretty_assertions",
"proptest",
"rand 0.8.4",
"regex",
"spandoc",
"tempdir",
@ -4597,7 +4598,6 @@ dependencies = [
"metrics-exporter-prometheus",
"once_cell",
"pin-project 1.0.7",
"rand 0.8.4",
"regex",
"semver 1.0.3",
"sentry",

View File

@ -12,7 +12,7 @@ use tracing::Span;
use zebra_chain::serialization::canonical_socket_addr;
use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState};
use crate::{meta_addr::MetaAddrChange, types::MetaAddr, PeerAddrState};
/// A database of peer listener addresses, their advertised services, and
/// information on when they were last seen.
@ -89,14 +89,15 @@ pub struct AddressMetrics {
#[allow(clippy::len_without_is_empty)]
impl AddressBook {
/// Construct an `AddressBook` with the given `config` and [`tracing::Span`].
pub fn new(config: &Config, span: Span) -> AddressBook {
/// Construct an [`AddressBook`] with the given `local_listener` and
/// [`tracing::Span`].
pub fn new(local_listener: SocketAddr, span: Span) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();
let mut new_book = AddressBook {
by_addr: HashMap::default(),
local_listener: canonical_socket_addr(config.listen_addr),
local_listener: canonical_socket_addr(local_listener),
span,
last_address_log: None,
};
@ -105,18 +106,18 @@ impl AddressBook {
new_book
}
/// Construct an [`AddressBook`] with the given [`Config`],
/// Construct an [`AddressBook`] with the given `local_listener`,
/// [`tracing::Span`], and addresses.
///
/// This constructor can be used to break address book invariants,
/// so it should only be used in tests.
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_addrs(
config: &Config,
local_listener: SocketAddr,
span: Span,
addrs: impl IntoIterator<Item = MetaAddr>,
) -> AddressBook {
let mut new_book = AddressBook::new(config, span);
let mut new_book = AddressBook::new(local_listener, span);
let addrs = addrs
.into_iter()
@ -135,7 +136,7 @@ impl AddressBook {
/// Get the local listener address.
///
/// This address contains minimal state, but it is not sanitized.
pub fn get_local_listener(&self) -> MetaAddr {
pub fn local_listener_meta_addr(&self) -> MetaAddr {
MetaAddr::new_local_listener_change(&self.local_listener)
.into_new_meta_addr()
.expect("unexpected invalid new local listener addr")
@ -151,7 +152,7 @@ impl AddressBook {
// Unconditionally add our local listener address to the advertised peers,
// to replace any self-connection failures. The address book and change
// constructors make sure that the SocketAddr is canonical.
let local_listener = self.get_local_listener();
let local_listener = self.local_listener_meta_addr();
peers.insert(local_listener.addr, local_listener);
// Then sanitize and shuffle

View File

@ -68,7 +68,11 @@ impl Config {
use futures::stream::StreamExt;
if peers.is_empty() {
error!("no initial peers in the network config. Hint: you must configure at least one peer or DNS seeder to run Zebra");
warn!(
"no initial peers in the network config. \
Hint: you must configure at least one peer IP or DNS seeder to run Zebra, \
or make sure Zebra's listener port gets inbound connections."
);
return HashSet::new();
}

View File

@ -5,6 +5,7 @@ use std::{
convert::{TryFrom, TryInto},
env,
net::SocketAddr,
str::FromStr,
sync::Arc,
time::Duration,
};
@ -26,7 +27,7 @@ use crate::{
},
peer_set::candidate_set::CandidateSet,
protocol::types::PeerServices,
AddressBook, Config,
AddressBook,
};
/// The number of test cases to use for proptest that have verbose failures.
@ -275,11 +276,14 @@ proptest! {
) {
zebra_test::init();
let config = Config { listen_addr: local_listener, ..Config::default() };
let address_book = AddressBook::new_with_addrs(&config, Span::none(), address_book_addrs);
let address_book = AddressBook::new_with_addrs(
local_listener,
Span::none(),
address_book_addrs
);
let sanitized_addrs = address_book.sanitized();
let expected_local_listener = address_book.get_local_listener();
let expected_local_listener = address_book.local_listener_meta_addr();
let canonical_local_listener = canonical_socket_addr(local_listener);
let book_sanitized_local_listener = sanitized_addrs.iter().find(|meta_addr| meta_addr.addr == canonical_local_listener );
@ -339,7 +343,11 @@ proptest! {
None
};
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(&Config::default(), Span::none(), addrs)));
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
Span::none(),
addrs,
)));
let peer_service = service_fn(|_| async { unreachable!("Service should not be called") });
let mut candidate_set = CandidateSet::new(address_book.clone(), peer_service);

View File

@ -1,5 +1,7 @@
use std::{
env,
net::SocketAddr,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
@ -16,8 +18,9 @@ use zebra_chain::serialization::DateTime32;
use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::MIN_PEER_CONNECTION_INTERVAL, meta_addr::MetaAddrChange, types::MetaAddr,
AddressBook, BoxError, Config, Request, Response,
constants::MIN_PEER_CONNECTION_INTERVAL,
meta_addr::{MetaAddr, MetaAddrChange},
AddressBook, BoxError, Request, Response,
};
/// The maximum number of candidates for a "next peer" test.
@ -67,7 +70,7 @@ proptest! {
});
// Since the address book is empty, there won't be any available peers
let address_book = AddressBook::new(&Config::default(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
@ -103,7 +106,7 @@ proptest! {
unreachable!("Mock peer service is never used");
});
let mut address_book = AddressBook::new(&Config::default(), Span::none());
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
address_book.extend(peers);
let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);

View File

@ -3,6 +3,7 @@ use std::{
convert::TryInto,
iter,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
time::Duration as StdDuration,
};
@ -23,7 +24,7 @@ use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::{GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL},
types::{MetaAddr, PeerServices},
AddressBook, Config, Request, Response,
AddressBook, Request, Response,
};
/// Test that offset is applied when all addresses have `last_seen` times in the future.
@ -144,7 +145,7 @@ fn candidate_set_updates_are_rate_limited() {
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
let address_book = AddressBook::new(&Config::default(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
@ -179,7 +180,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();
let address_book = AddressBook::new(&Config::default(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);

View File

@ -31,6 +31,9 @@ use super::CandidateSet;
use super::PeerSet;
use peer::Client;
#[cfg(test)]
mod tests;
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// Initialize a peer set.
@ -64,7 +67,9 @@ where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let (address_book, timestamp_collector) = TimestampCollector::spawn(&config);
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (inv_sender, inv_receiver) = broadcast::channel(100);
// Construct services that handle inbound handshakes and perform outbound
@ -115,24 +120,8 @@ where
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
// 1. Incoming peer connections, via a listener.
// Warn if we're configured using the wrong network port.
use Network::*;
let wrong_net = match config.network {
Mainnet => Testnet,
Testnet => Mainnet,
};
if config.listen_addr.port() == wrong_net.default_port() {
warn!(
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
config.listen_addr.port(),
config.network,
wrong_net
);
}
let listen_guard = tokio::spawn(
listen(config.listen_addr, listen_handshaker, peerset_tx.clone())
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
);
@ -232,23 +221,37 @@ where
Ok(())
}
/// Listens for peer connections on `addr`, then sets up each connection as a
/// Zcash peer.
/// Open a peer connection listener on `config.listen_addr`,
/// returning the opened [`TcpListener`], and the address it is bound to.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the `Client` result over `tx`.
#[instrument(skip(tx, handshaker))]
async fn listen<S>(
addr: SocketAddr,
mut handshaker: S,
tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
info!("Trying to open Zcash protocol endpoint at {}...", addr);
let listener_result = TcpListener::bind(addr).await;
/// If the listener is configured to use an automatically chosen port (port `0`),
/// then the returned address will contain the actual port.
///
/// # Panics
///
/// If opening the listener fails.
#[instrument(skip(config), fields(addr = ?config.listen_addr))]
async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
// Warn if we're configured using the wrong network port.
use Network::*;
let wrong_net = match config.network {
Mainnet => Testnet,
Testnet => Mainnet,
};
if config.listen_addr.port() == wrong_net.default_port() {
warn!(
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
config.listen_addr.port(),
config.network,
wrong_net
);
}
info!(
"Trying to open Zcash protocol endpoint at {}...",
config.listen_addr
);
let listener_result = TcpListener::bind(config.listen_addr).await;
let listener = match listener_result {
Ok(l) => l,
@ -256,12 +259,33 @@ where
"Opening Zcash network protocol listener {:?} failed: {:?}. \
Hint: Check if another zebrad or zcashd process is running. \
Try changing the network listen_addr in the Zebra config.",
addr, e,
config.listen_addr, e,
),
};
let local_addr = listener.local_addr()?;
let local_addr = listener
.local_addr()
.expect("unexpected missing local addr for open listener");
info!("Opened Zcash protocol endpoint at {}", local_addr);
(listener, local_addr)
}
/// Listens for peer connections on `addr`, then sets up each connection as a
/// Zcash peer.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`Client`][peer::Client] result over `tx`.
#[instrument(skip(listener, handshaker, tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>(
listener: TcpListener,
mut handshaker: S,
tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);

View File

@ -0,0 +1,3 @@
//! Tests for zebra-network initialization
mod vectors;

View File

@ -0,0 +1,103 @@
//! Specific configs used for zebra-network initialization tests.
//!
//! ### Note on port conflict
//!
//! If the test has a port conflict with another test, or another process, then it will fail.
//! If these conflicts cause test failures, run the tests in an isolated environment.
use std::{collections::HashSet, net::SocketAddr};
use tower::service_fn;
use zebra_chain::parameters::Network;
use zebra_test::net::random_known_port;
use crate::Config;
use super::super::init;
use Network::*;
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
/// and sends them to the `AddressBook`.
///
/// Note: This test doesn't cover local interface or public IP address discovery.
#[tokio::test]
async fn local_listener_unspecified_port_unspecified_addr() {
zebra_test::init();
local_listener_port_with("0.0.0.0:0".parse().unwrap(), Mainnet).await;
local_listener_port_with("0.0.0.0:0".parse().unwrap(), Testnet).await;
// these tests might fail on machines without IPv6
local_listener_port_with("[::]:0".parse().unwrap(), Mainnet).await;
local_listener_port_with("[::]:0".parse().unwrap(), Testnet).await;
}
/// Test that zebra-network discovers dynamic localhost listener ports,
/// and sends them to the `AddressBook`.
#[tokio::test]
async fn local_listener_unspecified_port_localhost_addr() {
zebra_test::init();
// these tests might fail on machines with unusual IPv4 localhost configs
local_listener_port_with("127.0.0.1:0".parse().unwrap(), Mainnet).await;
local_listener_port_with("127.0.0.1:0".parse().unwrap(), Testnet).await;
// these tests might fail on machines without IPv6
local_listener_port_with("[::1]:0".parse().unwrap(), Mainnet).await;
local_listener_port_with("[::1]:0".parse().unwrap(), Testnet).await;
}
/// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`.
#[tokio::test]
async fn local_listener_fixed_port_localhost_addr() {
zebra_test::init();
let localhost_v4 = "127.0.0.1".parse().unwrap();
let localhost_v6 = "::1".parse().unwrap();
// these tests might fail on machines with unusual IPv4 localhost configs
local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Mainnet).await;
local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Testnet).await;
// these tests might fail on machines without IPv6
local_listener_port_with(SocketAddr::new(localhost_v6, random_known_port()), Mainnet).await;
local_listener_port_with(SocketAddr::new(localhost_v6, random_known_port()), Testnet).await;
}
async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let config = Config {
listen_addr,
network,
// Stop Zebra making outbound connections
initial_mainnet_peers: HashSet::new(),
initial_testnet_peers: HashSet::new(),
..Config::default()
};
let inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let (_peer_service, address_book) = init(config, inbound_service).await;
let local_listener = address_book.lock().unwrap().local_listener_meta_addr();
if listen_addr.port() == 0 {
assert_ne!(
local_listener.addr.port(),
0,
"dynamic ports are replaced with OS-assigned ports"
);
} else {
assert_eq!(
local_listener.addr.port(),
listen_addr.port(),
"fixed ports are correctly propagated"
);
}
assert_eq!(
local_listener.addr.ip(),
listen_addr.ip(),
"IP addresses are correctly propagated"
);
}

View File

@ -1,21 +1,23 @@
//! The timestamp collector collects liveness information from peers.
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};
use futures::{channel::mpsc, prelude::*};
use crate::{meta_addr::MetaAddrChange, AddressBook, Config};
use crate::{meta_addr::MetaAddrChange, AddressBook};
/// The timestamp collector hooks into incoming message streams for each peer and
/// records per-connection last-seen timestamps into an [`AddressBook`].
pub struct TimestampCollector {}
impl TimestampCollector {
/// Spawn a new [`TimestampCollector`] task, and return handles for the
/// transmission channel for timestamp events and for the [`AddressBook`] it
/// updates.
/// Spawn a new [`TimestampCollector`] task, updating a new [`AddressBook`]
/// configured with a `local_listener`.
///
/// Returns handles for the transmission channel for timestamp events, and
/// the address book.
pub fn spawn(
config: &Config,
local_listener: SocketAddr,
) -> (
Arc<std::sync::Mutex<AddressBook>>,
mpsc::Sender<MetaAddrChange>,
@ -24,7 +26,7 @@ impl TimestampCollector {
const TIMESTAMP_WORKER_BUFFER_SIZE: usize = 100;
let (worker_tx, mut worker_rx) = mpsc::channel(TIMESTAMP_WORKER_BUFFER_SIZE);
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new(
config,
local_listener,
span!(Level::TRACE, "timestamp collector"),
)));
let worker_address_book = address_book.clone();

View File

@ -10,18 +10,21 @@ edition = "2018"
[dependencies]
hex = "0.4.3"
lazy_static = "1.4.0"
proptest = "0.10.1"
rand = "0.8"
regex = "1.4.6"
tower = { version = "0.4", features = ["util"] }
futures = "0.3.15"
color-eyre = "0.5.11"
owo-colors = "2.0.0"
pretty_assertions = "0.7.2"
spandoc = "0.2.0"
thiserror = "1.0.25"
tracing = "0.1.26"
tracing-subscriber = "0.2.18"
tracing-error = "0.1.2"
spandoc = "0.2.0"
regex = "1.4.6"
thiserror = "1.0.25"
pretty_assertions = "0.7.2"
owo-colors = "2.0.0"
proptest = "0.10.1"
[dev-dependencies]
tempdir = "0.3.7"

View File

@ -19,6 +19,7 @@ use std::sync::Once;
#[allow(missing_docs)]
pub mod command;
pub mod net;
pub mod prelude;
pub mod transcript;
pub mod vectors;
@ -37,6 +38,7 @@ pub fn init() {
EnvFilter::try_new("warn")
.unwrap()
.add_directive("zebra_consensus=error".parse().unwrap())
.add_directive("zebra_network=error".parse().unwrap())
.add_directive("zebrad=error".parse().unwrap())
});

36
zebra-test/src/net.rs Normal file
View File

@ -0,0 +1,36 @@
//! Network testing utility functions for Zebra.
use rand::Rng;
/// Returns a random port number from the ephemeral port range.
///
/// Does not check if the port is already in use. It's impossible to do this
/// check in a reliable, cross-platform way.
///
/// ## Usage
///
/// If you want a once-off random unallocated port, use
/// `random_unallocated_port`. Don't use this function if you don't need
/// to - it has a small risk of port conflcits.
///
/// Use this function when you need to use the same random port multiple
/// times. For example: setting up both ends of a connection, or re-using
/// the same port multiple times.
pub fn random_known_port() -> u16 {
// Use the intersection of the IANA/Windows/macOS ephemeral port range,
// and the Linux ephemeral port range:
// - https://en.wikipedia.org/wiki/Ephemeral_port#Range
// excluding ports less than 53500, to avoid:
// - typical Hyper-V reservations up to 52000:
// - https://github.com/googlevr/gvr-unity-sdk/issues/1002
// - https://github.com/docker/for-win/issues/3171
// - the MOM-Clear port 51515
// - https://docs.microsoft.com/en-us/troubleshoot/windows-server/networking/service-overview-and-network-port-requirements
// - the LDAP Kerberos byte-swapped reservation 53249
// - https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity/ldap-kerberos-server-reset-tcp-sessions
// - macOS and Windows sequential ephemeral port allocations,
// starting from 49152:
// - https://dataplane.org/ephemeralports.html
rand::thread_rng().gen_range(53500..60999)
}

View File

@ -20,7 +20,6 @@ gumdrop = "0.7"
serde = { version = "1", features = ["serde_derive"] }
toml = "0.5"
chrono = "0.4"
rand = "0.8"
hyper = { version = "0.14.0-dev", features = ["full"] }
futures = "0.3"

View File

@ -34,6 +34,7 @@ use zebra_network::constants::PORT_IN_USE_ERROR;
use zebra_state::constants::LOCK_FILE_ERROR;
use zebra_test::{
command::{ContextFrom, TestDirExt},
net::random_known_port,
prelude::*,
};
use zebrad::config::ZebradConfig;
@ -926,40 +927,6 @@ fn sync_past_mandatory_checkpoint_testnet() {
sync_past_mandatory_checkpoint(network).unwrap();
}
/// Returns a random port number from the ephemeral port range.
///
/// Does not check if the port is already in use. It's impossible to do this
/// check in a reliable, cross-platform way.
///
/// ## Usage
///
/// If you want a once-off random unallocated port, use
/// `random_unallocated_port`. Don't use this function if you don't need
/// to - it has a small risk of port conflcits.
///
/// Use this function when you need to use the same random port multiple
/// times. For example: setting up both ends of a connection, or re-using
/// the same port multiple times.
fn random_known_port() -> u16 {
// Use the intersection of the IANA/Windows/macOS ephemeral port range,
// and the Linux ephemeral port range:
// - https://en.wikipedia.org/wiki/Ephemeral_port#Range
// excluding ports less than 53500, to avoid:
// - typical Hyper-V reservations up to 52000:
// - https://github.com/googlevr/gvr-unity-sdk/issues/1002
// - https://github.com/docker/for-win/issues/3171
// - the MOM-Clear port 51515
// - https://docs.microsoft.com/en-us/troubleshoot/windows-server/networking/service-overview-and-network-port-requirements
// - the LDAP Kerberos byte-swapped reservation 53249
// - https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity/ldap-kerberos-server-reset-tcp-sessions
// - macOS and Windows sequential ephemeral port allocations,
// starting from 49152:
// - https://dataplane.org/ephemeralports.html
use rand::Rng;
rand::thread_rng().gen_range(53500..60999)
}
/// Returns the "magic" port number that tells the operating system to
/// choose a random unallocated port.
///