Limit the number of outbound peer connections (#2944)

* Limit the number of outbound connections in the crawler

* Make zebra-network channel bounds depend on config.peerset_initial_target_size

* Bias Zebra towards outbound connections

And turn connection limits into `Config` methods.

* Downgrade some connection logs to debug

* Remove verbose or outdated fields in tracing logs

* Clarify connection limits

Includes:
- `fastmod OUTBOUND_PEER_BIAS_FRACTION OUTBOUND_PEER_BIAS_DENOMINATOR zebra*`
- clarify connection limit documentation

* Clarify inventory channel capacity

* Add zebra_network::initialize tests with limited numbers of peers

* Avoid cooperative async task starvation in the peer crawler and listener

If we don't yield in these loops, they can run for a long time before
tokio forces them to yield.

* Test the crawler with small connection limits

And use the multi-threaded runtime to avoid long hangs.

* Stop using the multi-threaded executor in tests where it's not needed

* Avoid starvation for every connection

Adds yields after inbound successes and initial peer connections.

* Add a crawler peer connection success test

* Add outbound connection limit tests

* Improve outbound tests
This commit is contained in:
teor 2021-10-28 07:28:51 +10:00 committed by GitHub
parent 46fb33a04f
commit 3e03d48799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 783 additions and 51 deletions

View File

@ -9,7 +9,10 @@ use serde::{de, Deserialize, Deserializer};
use zebra_chain::{parameters::Network, serialization::canonical_socket_addr};
use crate::BoxError;
use crate::{constants, BoxError};
#[cfg(test)]
mod tests;
/// The number of times Zebra will retry each initial peer's DNS resolution,
/// before checking if any other initial peers have returned addresses.
@ -53,6 +56,8 @@ pub struct Config {
/// The initial target size for the peer set.
///
/// Also used to limit the number of inbound and outbound connections made by Zebra.
///
/// If you have a slow network connection, and Zebra is having trouble
/// syncing, try reducing the peer set size. You can also reduce the peer
/// set size to reduce Zebra's bandwidth usage.
@ -72,6 +77,38 @@ pub struct Config {
}
impl Config {
/// The maximum number of outbound connections that Zebra will open at the same time.
/// When this limit is reached, Zebra stops opening outbound connections.
///
/// # Security
///
/// This is larger than the inbound connection limit,
/// so Zebra is more likely to be connected to peers that it has selected.
pub fn peerset_outbound_connection_limit(&self) -> usize {
let inbound_limit = self.peerset_inbound_connection_limit();
inbound_limit + inbound_limit / constants::OUTBOUND_PEER_BIAS_DENOMINATOR
}
/// The maximum number of inbound connections that Zebra will accept at the same time.
/// When this limit is reached, Zebra drops new inbound connections without handshaking on them.
pub fn peerset_inbound_connection_limit(&self) -> usize {
self.peerset_initial_target_size
}
/// The maximum number of inbound and outbound connections that Zebra will have at the same time.
pub fn peerset_total_connection_limit(&self) -> usize {
self.peerset_outbound_connection_limit() + self.peerset_inbound_connection_limit()
}
/// Get the initial seed peers based on the configured network.
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
match self.network {
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
}
}
/// Concurrently resolves `peers` into zero or more IP addresses, with a
/// timeout of a few seconds on each DNS request.
///
@ -115,14 +152,6 @@ impl Config {
}
}
/// Get the initial seed peers based on the configured network.
pub async fn initial_peers(&self) -> HashSet<SocketAddr> {
match self.network {
Network::Mainnet => Config::resolve_peers(&self.initial_mainnet_peers).await,
Network::Testnet => Config::resolve_peers(&self.initial_testnet_peers).await,
}
}
/// Resolves `host` into zero or more IP addresses, retrying up to
/// `max_retries` times.
///
@ -265,6 +294,7 @@ impl<'de> Deserialize<'de> for Config {
}
let config = DConfig::deserialize(deserializer)?;
// TODO: perform listener DNS lookups asynchronously with a timeout (#1631)
let listen_addr = match config.listen_addr.parse::<SocketAddr>() {
Ok(socket) => Ok(socket),
@ -287,6 +317,3 @@ impl<'de> Deserialize<'de> for Config {
})
}
}
#[cfg(test)]
mod tests;

View File

@ -13,6 +13,24 @@ use zebra_chain::{
serialization::Duration32,
};
/// The fractional bias towards outbound peers in the peer set,
/// if connection limits have been reached.
///
/// Inbound and outbound connections are limited based on
/// [`Config.peerset_initial_target_size`].
///
/// The outbound limit is larger than the inbound limit by:
/// `Config.peerset_initial_target_size / OUTBOUND_PEER_BIAS_DENOMINATOR`.
///
/// # Security
///
/// This bias helps make sure that Zebra is connected to a majority of peers
/// that it has chosen from its [`AddressBook`].
///
/// Inbound peer connections are initiated by the remote peer,
/// so inbound peer selection is not controlled by the local node.
pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2;
/// The buffer size for the peer set.
///
/// This should be greater than 1 to avoid sender contention, but also reasonably

View File

@ -12,10 +12,9 @@ mod error;
mod handshake;
use client::{ClientRequest, ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender};
use error::ErrorSlot;
pub use client::Client;
pub use connection::Connection;
pub use connector::{Connector, OutboundConnectorRequest};
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use error::{ErrorSlot, HandshakeError, PeerError, SharedPeerError};
pub use handshake::{ConnectedAddr, Handshake, HandshakeRequest};

View File

@ -17,21 +17,29 @@ use super::{ErrorSlot, PeerError, SharedPeerError};
/// The "client" duplex half of a peer connection.
pub struct Client {
// Used to shut down the corresponding heartbeat.
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
/// Used to shut down the corresponding heartbeat.
/// This is always Some except when we take it on drop.
pub(crate) shutdown_tx: Option<oneshot::Sender<()>>,
/// Used to send [`Request`]s to the remote peer.
pub(crate) server_tx: mpsc::Sender<ClientRequest>,
/// A slot for an error shared between the Connection and the Client that uses it.
///
/// `None` unless the connection or client have errored.
pub(crate) error_slot: ErrorSlot,
}
/// A message from the `peer::Client` to the `peer::Server`.
#[derive(Debug)]
pub(super) struct ClientRequest {
/// The actual request.
pub(crate) struct ClientRequest {
/// The actual network request for the peer.
pub request: Request,
/// The return message channel, included because `peer::Client::call` returns a
/// The response [`Message`] channel, included because `peer::Client::call` returns a
/// future that may be moved around before it resolves.
pub tx: oneshot::Sender<Result<Response, SharedPeerError>>,
/// The tracing context for the request, so that work the connection task does
/// processing messages in the context of this request will have correct context.
pub span: tracing::Span,

View File

@ -331,6 +331,8 @@ pub struct Connection<S, Tx> {
pub(super) client_rx: ClientRequestReceiver,
/// A slot for an error shared between the Connection and the Client that uses it.
//
/// `None` unless the connection or client have errored.
pub(super) error_slot: ErrorSlot,
/// A channel for sending requests to the connected peer.

View File

@ -72,7 +72,7 @@ pub enum PeerError {
/// mutex should be held for as short a time as possible. This avoids blocking
/// the async task thread on acquiring the mutex.
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
pub struct ErrorSlot(Arc<std::sync::Mutex<Option<SharedPeerError>>>);
impl std::fmt::Debug for ErrorSlot {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {

View File

@ -63,6 +63,11 @@ type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers.
///
/// # Panics
///
/// If `config.config.peerset_initial_target_size` is zero.
/// (zebra-network expects to be able to connect to at least one peer.)
pub async fn init<S, C>(
config: Config,
inbound_service: S,
@ -76,10 +81,25 @@ where
S::Future: Send + 'static,
C: ChainTip + Clone + Send + 'static,
{
// If we want Zebra to operate with no network,
// we should implement a `zebrad` command that doesn't use `zebra-network`.
assert!(
config.peerset_initial_target_size > 0,
"Zebra must be allowed to connect to at least one peer"
);
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (inv_sender, inv_receiver) = broadcast::channel(100);
// Create a broadcast channel for peer inventory advertisements.
// If it reaches capacity, this channel drops older inventory advertisements.
//
// When Zebra is at the chain tip with an up-to-date mempool,
// we expect to have at most 1 new transaction per connected peer,
// and 1-2 new blocks across the entire network.
// (The block syncer and mempool crawler handle bulk fetches of blocks and transactions.)
let (inv_sender, inv_receiver) = broadcast::channel(config.peerset_total_connection_limit());
// Construct services that handle inbound handshakes and perform outbound
// handshakes. These use the same handshake service internally to detect
@ -106,10 +126,14 @@ where
)
};
// Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(100);
// Create an mpsc channel for peer changes,
// based on the maximum number of inbound and outbound peers.
let (peerset_tx, peerset_rx) =
mpsc::channel::<PeerChange>(config.peerset_total_connection_limit());
// Create an mpsc channel for peerset demand signaling,
// based on the maximum number of outbound peers.
let (mut demand_tx, demand_rx) =
mpsc::channel::<MorePeers>(config.peerset_outbound_connection_limit());
// Create a oneshot to send background task JoinHandles to the peer set
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
@ -174,9 +198,10 @@ where
let _ = demand_tx.try_send(MorePeers);
}
let crawl_guard = tokio::spawn(
let crawl_fut = {
let config = config.clone();
crawl_and_dial(
config.crawl_new_peer_interval,
config,
demand_tx,
demand_rx,
candidates,
@ -184,8 +209,8 @@ where
peerset_tx,
active_outbound_connections,
)
.instrument(Span::current()),
);
};
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));
handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();
@ -294,6 +319,11 @@ where
peerset_tx
.send(handshake_result.map_err(|(_addr, e)| e))
.await?;
// Security: Let other tasks run after each connection is processed.
//
// Avoids remote peers starving other Zebra tasks using initial connection successes or errors.
tokio::task::yield_now().await;
}
let outbound_connections = active_outbound_connections.update_count();
@ -406,7 +436,7 @@ where
if let Ok((tcp_stream, addr)) = listener.accept().await {
// The peer already opened a connection, so increment the connection count immediately.
let connection_tracker = active_inbound_connections.track_connection();
info!(
debug!(
inbound_connections = ?active_inbound_connections.update_count(),
"handshaking on an open inbound peer connection"
);
@ -449,6 +479,11 @@ where
// but most OSes also limit the number of queued inbound connections on a listener port.
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
}
// Security: Let other tasks run after each connection is processed.
//
// Avoids remote peers starving other Zebra tasks using inbound connection successes or errors.
tokio::task::yield_now().await;
}
}
@ -476,9 +511,9 @@ enum CrawlerAction {
/// and connect to new peers, and send the resulting `peer::Client`s through the
/// `peerset_tx` channel.
///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to
/// one new peer using `outbound_connector`.
/// Crawl for new peers every `config.crawl_new_peer_interval`.
/// Also crawl whenever there is demand, but no new peers in `candidates`.
/// After crawling, try to connect to one new peer using `outbound_connector`.
///
/// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`.
@ -487,11 +522,19 @@ enum CrawlerAction {
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
///
/// Uses `active_outbound_connections` to track active outbound connections
/// Uses `active_outbound_connections` to track the number of active outbound connections
/// in both the initial peers and crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))]
#[instrument(skip(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
config: Config,
mut demand_tx: mpsc::Sender<MorePeers>,
mut demand_rx: mpsc::Receiver<MorePeers>,
mut candidates: CandidateSet<S>,
@ -530,7 +573,7 @@ where
handshakes.push(future::pending().boxed());
let mut crawl_timer =
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
tokio::time::interval(config.crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
loop {
metrics::gauge!(
@ -548,8 +591,8 @@ where
next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"),
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if handshakes.len() > 50 {
// Too many pending handshakes already
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
// Too many open connections or pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
@ -566,13 +609,13 @@ where
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
trace!("too many open connections or in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// Increment the connection count before we spawn the connection.
let outbound_connection_tracker = active_outbound_connections.track_connection();
info!(
debug!(
outbound_connections = ?active_outbound_connections.update_count(),
"opening an outbound peer connection"
);
@ -635,6 +678,11 @@ where
let _ = demand_tx.try_send(MorePeers);
}
}
// Security: Let other tasks run after each crawler action is processed.
//
// Avoids remote peers starving other Zebra tasks using outbound connection errors.
tokio::task::yield_now().await;
}
}

View File

@ -1,4 +1,4 @@
//! Specific configs used for zebra-network initialization tests.
//! zebra-network initialization tests using fixed configs.
//!
//! ## Failures due to Port Conflicts
//!
@ -13,19 +13,43 @@
//! If it does not have any IPv4 interfaces, or IPv4 localhost is not on `127.0.0.1`,
//! skip all the network tests by setting the `ZEBRA_SKIP_NETWORK_TESTS` environmental variable.
use std::{collections::HashSet, net::SocketAddr};
use std::{
collections::HashSet,
net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use tower::service_fn;
use futures::{
channel::{mpsc, oneshot},
FutureExt,
};
use tower::{discover::Change, service_fn, Service};
use tracing::Span;
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32};
use zebra_test::net::random_known_port;
use crate::Config;
use super::super::init;
use crate::{
init,
meta_addr::MetaAddr,
peer::{self, ErrorSlot, OutboundConnectorRequest},
peer_set::{
initialize::{crawl_and_dial, PeerChange},
set::MorePeers,
ActiveConnectionCounter, CandidateSet,
},
protocol::types::PeerServices,
AddressBook, BoxError, Config, Request, Response,
};
use Network::*;
/// The amount of time to run the crawler, before testing what it has done.
///
/// Using a very short time can make the crawler not run at all.
const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10);
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
/// and sends them to the `AddressBook`.
///
@ -98,6 +122,473 @@ async fn local_listener_fixed_port_localhost_addr() {
local_listener_port_with(SocketAddr::new(localhost_v6, random_known_port()), Testnet).await;
}
/// Test zebra-network with a peer limit of zero peers on mainnet.
/// (Zebra does not support this mode of operation.)
#[tokio::test]
#[should_panic]
async fn peer_limit_zero_mainnet() {
zebra_test::init();
// This test should not require network access, because the connection limit is zero.
let unreachable_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let address_book = init_with_peer_limit(0, unreachable_inbound_service, Mainnet).await;
assert_eq!(
address_book.lock().unwrap().peers().count(),
0,
"expected no peers in Mainnet address book, but got: {:?}",
address_book.lock().unwrap().address_metrics()
);
}
/// Test zebra-network with a peer limit of zero peers on testnet.
/// (Zebra does not support this mode of operation.)
#[tokio::test]
#[should_panic]
async fn peer_limit_zero_testnet() {
zebra_test::init();
// This test should not require network access, because the connection limit is zero.
let unreachable_inbound_service =
service_fn(|_| async { unreachable!("inbound service should never be called") });
let address_book = init_with_peer_limit(0, unreachable_inbound_service, Testnet).await;
assert_eq!(
address_book.lock().unwrap().peers().count(),
0,
"expected no peers in Testnet address book, but got: {:?}",
address_book.lock().unwrap().address_metrics()
);
}
/// Test zebra-network with a peer limit of one inbound and one outbound peer on mainnet.
#[tokio::test]
async fn peer_limit_one_mainnet() {
zebra_test::init();
if zebra_test::net::zebra_skip_network_tests() {
return;
}
let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) });
let _ = init_with_peer_limit(1, nil_inbound_service, Mainnet).await;
// Let the crawler run for a while.
tokio::time::sleep(CRAWLER_TEST_DURATION).await;
// Any number of address book peers is valid here, because some peers might have failed.
}
/// Test zebra-network with a peer limit of one inbound and one outbound peer on testnet.
#[tokio::test]
async fn peer_limit_one_testnet() {
zebra_test::init();
if zebra_test::net::zebra_skip_network_tests() {
return;
}
let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) });
let _ = init_with_peer_limit(1, nil_inbound_service, Testnet).await;
// Let the crawler run for a while.
tokio::time::sleep(CRAWLER_TEST_DURATION).await;
// Any number of address book peers is valid here, because some peers might have failed.
}
/// Test zebra-network with a peer limit of two inbound and three outbound peers on mainnet.
#[tokio::test]
async fn peer_limit_two_mainnet() {
zebra_test::init();
if zebra_test::net::zebra_skip_network_tests() {
return;
}
let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) });
let _ = init_with_peer_limit(2, nil_inbound_service, Mainnet).await;
// Let the crawler run for a while.
tokio::time::sleep(CRAWLER_TEST_DURATION).await;
// Any number of address book peers is valid here, because some peers might have failed.
}
/// Test zebra-network with a peer limit of two inbound and three outbound peers on testnet.
#[tokio::test]
async fn peer_limit_two_testnet() {
zebra_test::init();
if zebra_test::net::zebra_skip_network_tests() {
return;
}
let nil_inbound_service = service_fn(|_| async { Ok(Response::Nil) });
let _ = init_with_peer_limit(2, nil_inbound_service, Testnet).await;
// Let the crawler run for a while.
tokio::time::sleep(CRAWLER_TEST_DURATION).await;
// Any number of address book peers is valid here, because some peers might have failed.
}
/// Test the crawler with an outbound peer limit of zero peers, and a connector that panics.
#[tokio::test]
async fn crawler_peer_limit_zero_connect_panic() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let unreachable_outbound_connector = service_fn(|_| async {
unreachable!("outbound connector should never be called with a zero peer limit")
});
let (_config, mut peerset_tx) =
spawn_crawler_with_peer_limit(0, unreachable_outbound_connector).await;
let peer_result = peerset_tx.try_next();
assert!(
// `Err(_)` means that no peers are available, and the sender has not been dropped.
// `Ok(None)` means that no peers are available, and the sender has been dropped.
matches!(peer_result, Err(_) | Ok(None)),
"unexpected peer when outbound limit is zero: {:?}",
peer_result,
);
}
/// Test the crawler with an outbound peer limit of one peer, and a connector that always errors.
#[tokio::test]
async fn crawler_peer_limit_one_connect_error() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let error_outbound_connector =
service_fn(|_| async { Err("test outbound connector always returns errors".into()) });
let (_config, mut peerset_tx) =
spawn_crawler_with_peer_limit(1, error_outbound_connector).await;
let peer_result = peerset_tx.try_next();
assert!(
// `Err(_)` means that no peers are available, and the sender has not been dropped.
// `Ok(None)` means that no peers are available, and the sender has been dropped.
matches!(peer_result, Err(_) | Ok(None)),
"unexpected peer when all connections error: {:?}",
peer_result,
);
}
/// Test the crawler with an outbound peer limit of one peer,
/// and a connector that returns success then disconnects the peer.
#[tokio::test]
async fn crawler_peer_limit_one_connect_ok_then_drop() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let success_disconnect_outbound_connector =
service_fn(|req: OutboundConnectorRequest| async move {
let OutboundConnectorRequest {
addr,
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
};
// Fake the connection closing.
std::mem::drop(connection_tracker);
// Give the crawler time to get the message.
tokio::task::yield_now().await;
Ok(Change::Insert(addr, fake_client))
});
let (config, mut peerset_tx) =
spawn_crawler_with_peer_limit(1, success_disconnect_outbound_connector).await;
let mut peer_count: usize = 0;
loop {
let peer_result = peerset_tx.try_next();
match peer_result {
// A peer handshake succeeded.
Ok(Some(peer_result)) => {
assert!(
matches!(peer_result, Ok(Change::Insert(_, _))),
"unexpected connection error: {:?}\n\
{} previous peers succeeded",
peer_result,
peer_count,
);
peer_count += 1;
}
// The channel is closed and there are no messages left in the channel.
Ok(None) => break,
// The channel is still open, but there are no messages left in the channel.
Err(_) => break,
}
}
assert!(
peer_count > config.peerset_outbound_connection_limit(),
"unexpected number of peer connections {}, should be at least the limit of {}",
peer_count,
config.peerset_outbound_connection_limit(),
);
}
/// Test the crawler with an outbound peer limit of one peer,
/// and a connector that returns success then holds the peer open.
#[tokio::test]
async fn crawler_peer_limit_one_connect_ok_stay_open() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let success_stay_open_outbound_connector =
service_fn(|req: OutboundConnectorRequest| async move {
let OutboundConnectorRequest {
addr,
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
};
// Fake the connection being open forever.
std::mem::forget(connection_tracker);
Ok(Change::Insert(addr, fake_client))
});
let (config, mut peerset_tx) =
spawn_crawler_with_peer_limit(1, success_stay_open_outbound_connector).await;
let mut peer_count: usize = 0;
loop {
let peer_result = peerset_tx.try_next();
match peer_result {
// A peer handshake succeeded.
Ok(Some(peer_result)) => {
assert!(
matches!(peer_result, Ok(Change::Insert(_, _))),
"unexpected connection error: {:?}\n\
{} previous peers succeeded",
peer_result,
peer_count,
);
peer_count += 1;
}
// The channel is closed and there are no messages left in the channel.
Ok(None) => break,
// The channel is still open, but there are no messages left in the channel.
Err(_) => break,
}
}
assert!(
peer_count <= config.peerset_outbound_connection_limit(),
"unexpected number of peer connections {}, over limit of {}",
peer_count,
config.peerset_outbound_connection_limit(),
);
}
/// Test the crawler with the default outbound peer limit, and a connector that always errors.
#[tokio::test]
async fn crawler_peer_limit_default_connect_error() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let error_outbound_connector =
service_fn(|_| async { Err("test outbound connector always returns errors".into()) });
let (_config, mut peerset_tx) =
spawn_crawler_with_peer_limit(None, error_outbound_connector).await;
let peer_result = peerset_tx.try_next();
assert!(
// `Err(_)` means that no peers are available, and the sender has not been dropped.
// `Ok(None)` means that no peers are available, and the sender has been dropped.
matches!(peer_result, Err(_) | Ok(None)),
"unexpected peer when all connections error: {:?}",
peer_result,
);
}
/// Test the crawler with the default outbound peer limit,
/// and a connector that returns success then disconnects the peer.
#[tokio::test]
async fn crawler_peer_limit_default_connect_ok_then_drop() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let success_disconnect_outbound_connector =
service_fn(|req: OutboundConnectorRequest| async move {
let OutboundConnectorRequest {
addr,
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
};
// Fake the connection closing.
std::mem::drop(connection_tracker);
// Give the crawler time to get the message.
tokio::task::yield_now().await;
Ok(Change::Insert(addr, fake_client))
});
let (config, mut peerset_tx) =
spawn_crawler_with_peer_limit(None, success_disconnect_outbound_connector).await;
let mut peer_count: usize = 0;
loop {
let peer_result = peerset_tx.try_next();
match peer_result {
// A peer handshake succeeded.
Ok(Some(peer_result)) => {
assert!(
matches!(peer_result, Ok(Change::Insert(_, _))),
"unexpected connection error: {:?}\n\
{} previous peers succeeded",
peer_result,
peer_count,
);
peer_count += 1;
}
// The channel is closed and there are no messages left in the channel.
Ok(None) => break,
// The channel is still open, but there are no messages left in the channel.
Err(_) => break,
}
}
// TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit
// (currently, getting over the limit can take 30 seconds or more)
let lower_limit = config.peerset_outbound_connection_limit() / 3;
assert!(
peer_count > lower_limit,
"unexpected number of peer connections {}, should be over the limit of {}",
peer_count,
lower_limit,
);
}
/// Test the crawler with the default outbound peer limit,
/// and a connector that returns success then holds the peer open.
#[tokio::test]
async fn crawler_peer_limit_default_connect_ok_stay_open() {
zebra_test::init();
// This test does not require network access, because the outbound connector
// and peer set are fake.
let success_stay_open_outbound_connector =
service_fn(|req: OutboundConnectorRequest| async move {
let OutboundConnectorRequest {
addr,
connection_tracker,
} = req;
let (server_tx, _server_rx) = mpsc::channel(0);
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default();
let fake_client = peer::Client {
shutdown_tx: Some(shutdown_tx),
server_tx,
error_slot,
};
// Fake the connection being open forever.
std::mem::forget(connection_tracker);
Ok(Change::Insert(addr, fake_client))
});
// The initial target size is multiplied by 1.5 to give the actual limit.
let (config, mut peerset_tx) =
spawn_crawler_with_peer_limit(None, success_stay_open_outbound_connector).await;
let mut peer_count: usize = 0;
loop {
let peer_result = peerset_tx.try_next();
match peer_result {
// A peer handshake succeeded.
Ok(Some(peer_result)) => {
assert!(
matches!(peer_result, Ok(Change::Insert(_, _))),
"unexpected connection error: {:?}\n\
{} previous peers succeeded",
peer_result,
peer_count,
);
peer_count += 1;
}
// The channel is closed and there are no messages left in the channel.
Ok(None) => break,
// The channel is still open, but there are no messages left in the channel.
Err(_) => break,
}
}
assert!(
peer_count <= config.peerset_outbound_connection_limit(),
"unexpected number of peer connections {}, over limit of {}",
peer_count,
config.peerset_outbound_connection_limit(),
);
}
/// Open a local listener on `listen_addr` for `network`.
/// Asserts that the local listener address works as expected.
async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
let config = Config {
listen_addr,
@ -133,3 +624,142 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) {
"IP addresses are correctly propagated"
);
}
/// Initialize a peer set with `peerset_initial_target_size` and `inbound_service` on `network`.
/// Returns the newly created [`AddressBook`] for testing.
async fn init_with_peer_limit<S>(
peerset_initial_target_size: usize,
inbound_service: S,
network: Network,
) -> Arc<std::sync::Mutex<AddressBook>>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
// This test might fail on machines with no configured IPv4 addresses
// (localhost should be enough).
let unused_v4 = "0.0.0.0:0".parse().unwrap();
let config = Config {
peerset_initial_target_size,
network,
listen_addr: unused_v4,
..Config::default()
};
let (_peer_service, address_book) = init(config, inbound_service, NoChainTip).await;
address_book
}
/// Run a peer crawler with `peerset_initial_target_size` and `outbound_connector`.
///
/// Uses the default values for all other config fields.
///
/// Returns the generated [`Config`], and the peer set receiver.
async fn spawn_crawler_with_peer_limit<C>(
peerset_initial_target_size: impl Into<Option<usize>>,
outbound_connector: C,
) -> (Config, mpsc::Receiver<PeerChange>)
where
C: Service<
OutboundConnectorRequest,
Response = Change<SocketAddr, peer::Client>,
Error = BoxError,
> + Clone
+ Send
+ 'static,
C::Future: Send + 'static,
{
// Create a test config.
let mut config = Config::default();
if let Some(peerset_initial_target_size) = peerset_initial_target_size.into() {
config.peerset_initial_target_size = peerset_initial_target_size;
}
// Manually initialize an address book without a timestamp tracker.
let mut address_book = AddressBook::new(config.listen_addr, Span::current());
// Add enough fake peers to go over the limit, even if the limit is zero.
let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1;
let mut fake_peer = None;
for address_number in 0..over_limit_peers {
let addr = SocketAddr::new(Ipv4Addr::new(127, 1, 1, address_number as _).into(), 1);
let addr =
MetaAddr::new_gossiped_meta_addr(addr, PeerServices::NODE_NETWORK, DateTime32::now());
fake_peer = Some(addr);
let addr = addr.new_gossiped_change();
address_book.update(addr);
}
// Create a fake peer set.
let nil_peer_set = service_fn(move |req| async move {
let rsp = match req {
// Return the correct response variant for Peers requests,
// re-using one of the peers we already provided.
Request::Peers => Response::Peers(vec![fake_peer.unwrap()]),
_ => unreachable!("unexpected request: {:?}", req),
};
Ok(rsp)
});
let address_book = Arc::new(std::sync::Mutex::new(address_book));
// Make the channels large enough to hold all the peers.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(over_limit_peers);
let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(over_limit_peers);
let candidates = CandidateSet::new(address_book.clone(), nil_peer_set);
// In zebra_network::initialize() the counter would already have some initial peer connections,
// but in this test we start with an empty counter.
let active_outbound_connections = ActiveConnectionCounter::new_counter();
// Add fake demand over the limit.
for _ in 0..over_limit_peers {
let _ = demand_tx.try_send(MorePeers);
}
// Start the crawler.
let crawl_fut = crawl_and_dial(
config.clone(),
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
);
let crawl_task_handle = tokio::spawn(crawl_fut);
// Let the crawler run for a while.
tokio::time::sleep(CRAWLER_TEST_DURATION).await;
// Stop the crawler and let it finish.
crawl_task_handle.abort();
tokio::task::yield_now().await;
// Check for panics or errors in the crawler.
let crawl_result = crawl_task_handle.now_or_never();
assert!(
matches!(crawl_result, None)
|| matches!(crawl_result, Some(Err(ref e)) if e.is_cancelled()),
"unexpected error or panic in peer crawler task: {:?}",
crawl_result,
);
// Check the final address book contents.
assert_eq!(
address_book.lock().unwrap().peers().count(),
over_limit_peers,
"expected {} peers in Mainnet address book, but got: {:?}",
over_limit_peers,
address_book.lock().unwrap().address_metrics()
);
(config, peerset_rx)
}