Limit the number of inbound peer connections (#2961)
* Limit open inbound connections based on the config * Log inbound connection errors at debug level * Test inbound connection limits * Use clone directly in function call argument lists * Remove an outdated comment * Update tests to use an unbounded channel rather than mem::forget And rename some variables. * Use a lower limit in a slow test and require that it is exceeded
This commit is contained in:
parent
8d01750459
commit
f26a60b801
|
@ -144,6 +144,7 @@ where
|
||||||
|
|
||||||
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
|
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
|
||||||
let peer_set = PeerSet::new(
|
let peer_set = PeerSet::new(
|
||||||
|
&config,
|
||||||
PeakEwmaDiscover::new(
|
PeakEwmaDiscover::new(
|
||||||
// Discover interprets an error as stream termination,
|
// Discover interprets an error as stream termination,
|
||||||
// so discard any errored connections...
|
// so discard any errored connections...
|
||||||
|
@ -162,19 +163,20 @@ where
|
||||||
// Connect peerset_tx to the 3 peer sources:
|
// Connect peerset_tx to the 3 peer sources:
|
||||||
//
|
//
|
||||||
// 1. Incoming peer connections, via a listener.
|
// 1. Incoming peer connections, via a listener.
|
||||||
let listen_guard = tokio::spawn(
|
let listen_fut = accept_inbound_connections(
|
||||||
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
|
config.clone(),
|
||||||
.instrument(Span::current()),
|
tcp_listener,
|
||||||
|
listen_handshaker,
|
||||||
|
peerset_tx.clone(),
|
||||||
);
|
);
|
||||||
|
let listen_guard = tokio::spawn(listen_fut.instrument(Span::current()));
|
||||||
|
|
||||||
// 2. Initial peers, specified in the config.
|
// 2. Initial peers, specified in the config.
|
||||||
let initial_peers_fut = {
|
let initial_peers_fut = add_initial_peers(
|
||||||
let config = config.clone();
|
config.clone(),
|
||||||
let outbound_connector = outbound_connector.clone();
|
outbound_connector.clone(),
|
||||||
let peerset_tx = peerset_tx.clone();
|
peerset_tx.clone(),
|
||||||
add_initial_peers(config, outbound_connector, peerset_tx)
|
);
|
||||||
};
|
|
||||||
|
|
||||||
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
|
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
|
||||||
|
|
||||||
// 3. Outgoing peers we connect to in response to load.
|
// 3. Outgoing peers we connect to in response to load.
|
||||||
|
@ -202,9 +204,7 @@ where
|
||||||
let _ = demand_tx.try_send(MorePeers);
|
let _ = demand_tx.try_send(MorePeers);
|
||||||
}
|
}
|
||||||
|
|
||||||
let crawl_fut = {
|
let crawl_fut = crawl_and_dial(
|
||||||
let config = config.clone();
|
|
||||||
crawl_and_dial(
|
|
||||||
config,
|
config,
|
||||||
demand_tx,
|
demand_tx,
|
||||||
demand_rx,
|
demand_rx,
|
||||||
|
@ -212,8 +212,7 @@ where
|
||||||
outbound_connector,
|
outbound_connector,
|
||||||
peerset_tx,
|
peerset_tx,
|
||||||
active_outbound_connections,
|
active_outbound_connections,
|
||||||
)
|
);
|
||||||
};
|
|
||||||
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));
|
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));
|
||||||
|
|
||||||
handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();
|
handle_tx.send(vec![listen_guard, crawl_guard]).unwrap();
|
||||||
|
@ -434,8 +433,11 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
|
||||||
///
|
///
|
||||||
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
||||||
/// the [`peer::Client`] result over `peerset_tx`.
|
/// the [`peer::Client`] result over `peerset_tx`.
|
||||||
#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
|
///
|
||||||
|
/// Limit the number of active inbound connections based on `config`.
|
||||||
|
#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))]
|
||||||
async fn accept_inbound_connections<S>(
|
async fn accept_inbound_connections<S>(
|
||||||
|
config: Config,
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
mut handshaker: S,
|
mut handshaker: S,
|
||||||
peerset_tx: mpsc::Sender<PeerChange>,
|
peerset_tx: mpsc::Sender<PeerChange>,
|
||||||
|
@ -447,8 +449,19 @@ where
|
||||||
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
|
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok((tcp_stream, addr)) = listener.accept().await {
|
let inbound_result = listener.accept().await;
|
||||||
// The peer already opened a connection, so increment the connection count immediately.
|
if let Ok((tcp_stream, addr)) = inbound_result {
|
||||||
|
if active_inbound_connections.update_count()
|
||||||
|
>= config.peerset_inbound_connection_limit()
|
||||||
|
{
|
||||||
|
// Too many open inbound connections or pending handshakes already.
|
||||||
|
// Close the connection.
|
||||||
|
std::mem::drop(tcp_stream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The peer already opened a connection to us.
|
||||||
|
// So we want to increment the connection count as soon as possible.
|
||||||
let connection_tracker = active_inbound_connections.track_connection();
|
let connection_tracker = active_inbound_connections.track_connection();
|
||||||
debug!(
|
debug!(
|
||||||
inbound_connections = ?active_inbound_connections.update_count(),
|
inbound_connections = ?active_inbound_connections.update_count(),
|
||||||
|
@ -492,6 +505,8 @@ where
|
||||||
// Zebra can't control how many queued connections are waiting,
|
// Zebra can't control how many queued connections are waiting,
|
||||||
// but most OSes also limit the number of queued inbound connections on a listener port.
|
// but most OSes also limit the number of queued inbound connections on a listener port.
|
||||||
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
|
tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await;
|
||||||
|
} else {
|
||||||
|
debug!(?inbound_result, "error accepting inbound connection");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Security: Let other tasks run after each connection is processed.
|
// Security: Let other tasks run after each connection is processed.
|
||||||
|
@ -536,8 +551,8 @@ enum CrawlerAction {
|
||||||
/// permanent internal error. Transient errors and individual peer errors should
|
/// permanent internal error. Transient errors and individual peer errors should
|
||||||
/// be handled within the crawler.
|
/// be handled within the crawler.
|
||||||
///
|
///
|
||||||
/// Uses `active_outbound_connections` to track the number of active outbound connections
|
/// Uses `active_outbound_connections` to limit the number of active outbound connections
|
||||||
/// in both the initial peers and crawler.
|
/// across both the initial peers and crawler. The limit is based on `config`.
|
||||||
#[instrument(skip(
|
#[instrument(skip(
|
||||||
config,
|
config,
|
||||||
demand_tx,
|
demand_tx,
|
||||||
|
@ -606,7 +621,7 @@ where
|
||||||
// turn the demand into an action, based on the crawler's current state
|
// turn the demand into an action, based on the crawler's current state
|
||||||
_ = demand_rx.next() => {
|
_ = demand_rx.next() => {
|
||||||
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
|
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
|
||||||
// Too many open connections or pending handshakes already
|
// Too many open outbound connections or pending handshakes already
|
||||||
DemandDrop
|
DemandDrop
|
||||||
} else if let Some(candidate) = candidates.next().await {
|
} else if let Some(candidate) = candidates.next().await {
|
||||||
// candidates.next has a short delay, and briefly holds the address
|
// candidates.next has a short delay, and briefly holds the address
|
||||||
|
|
|
@ -24,7 +24,7 @@ use futures::{
|
||||||
channel::{mpsc, oneshot},
|
channel::{mpsc, oneshot},
|
||||||
FutureExt, StreamExt,
|
FutureExt, StreamExt,
|
||||||
};
|
};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::{net::TcpStream, task::JoinHandle};
|
||||||
use tower::{discover::Change, service_fn, Service};
|
use tower::{discover::Change, service_fn, Service};
|
||||||
use tracing::Span;
|
use tracing::Span;
|
||||||
|
|
||||||
|
@ -34,9 +34,12 @@ use zebra_test::net::random_known_port;
|
||||||
use crate::{
|
use crate::{
|
||||||
constants, init,
|
constants, init,
|
||||||
meta_addr::MetaAddr,
|
meta_addr::MetaAddr,
|
||||||
peer::{self, ErrorSlot, OutboundConnectorRequest},
|
peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest},
|
||||||
peer_set::{
|
peer_set::{
|
||||||
initialize::{add_initial_peers, crawl_and_dial, PeerChange},
|
initialize::{
|
||||||
|
accept_inbound_connections, add_initial_peers, crawl_and_dial, open_listener,
|
||||||
|
PeerChange,
|
||||||
|
},
|
||||||
set::MorePeers,
|
set::MorePeers,
|
||||||
ActiveConnectionCounter, CandidateSet,
|
ActiveConnectionCounter, CandidateSet,
|
||||||
},
|
},
|
||||||
|
@ -51,12 +54,17 @@ use Network::*;
|
||||||
/// Using a very short time can make the crawler not run at all.
|
/// Using a very short time can make the crawler not run at all.
|
||||||
const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10);
|
const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
/// The amount of time to run the listener, before testing what it has done.
|
||||||
|
///
|
||||||
|
/// Using a very short time can make the listener not run at all.
|
||||||
|
const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
|
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
|
||||||
/// and sends them to the `AddressBook`.
|
/// and sends them to the `AddressBook`.
|
||||||
///
|
///
|
||||||
/// Note: This test doesn't cover local interface or public IP address discovery.
|
/// Note: This test doesn't cover local interface or public IP address discovery.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn local_listener_unspecified_port_unspecified_addr() {
|
async fn local_listener_unspecified_port_unspecified_addr_v4() {
|
||||||
zebra_test::init();
|
zebra_test::init();
|
||||||
|
|
||||||
if zebra_test::net::zebra_skip_network_tests() {
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
@ -67,6 +75,19 @@ async fn local_listener_unspecified_port_unspecified_addr() {
|
||||||
// (localhost should be enough)
|
// (localhost should be enough)
|
||||||
local_listener_port_with("0.0.0.0:0".parse().unwrap(), Mainnet).await;
|
local_listener_port_with("0.0.0.0:0".parse().unwrap(), Mainnet).await;
|
||||||
local_listener_port_with("0.0.0.0:0".parse().unwrap(), Testnet).await;
|
local_listener_port_with("0.0.0.0:0".parse().unwrap(), Testnet).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports,
|
||||||
|
/// and sends them to the `AddressBook`.
|
||||||
|
///
|
||||||
|
/// Note: This test doesn't cover local interface or public IP address discovery.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn local_listener_unspecified_port_unspecified_addr_v6() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if zebra_test::net::zebra_skip_ipv6_tests() {
|
if zebra_test::net::zebra_skip_ipv6_tests() {
|
||||||
return;
|
return;
|
||||||
|
@ -80,7 +101,7 @@ async fn local_listener_unspecified_port_unspecified_addr() {
|
||||||
/// Test that zebra-network discovers dynamic localhost listener ports,
|
/// Test that zebra-network discovers dynamic localhost listener ports,
|
||||||
/// and sends them to the `AddressBook`.
|
/// and sends them to the `AddressBook`.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn local_listener_unspecified_port_localhost_addr() {
|
async fn local_listener_unspecified_port_localhost_addr_v4() {
|
||||||
zebra_test::init();
|
zebra_test::init();
|
||||||
|
|
||||||
if zebra_test::net::zebra_skip_network_tests() {
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
@ -90,6 +111,17 @@ async fn local_listener_unspecified_port_localhost_addr() {
|
||||||
// these tests might fail on machines with unusual IPv4 localhost configs
|
// these tests might fail on machines with unusual IPv4 localhost configs
|
||||||
local_listener_port_with("127.0.0.1:0".parse().unwrap(), Mainnet).await;
|
local_listener_port_with("127.0.0.1:0".parse().unwrap(), Mainnet).await;
|
||||||
local_listener_port_with("127.0.0.1:0".parse().unwrap(), Testnet).await;
|
local_listener_port_with("127.0.0.1:0".parse().unwrap(), Testnet).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that zebra-network discovers dynamic localhost listener ports,
|
||||||
|
/// and sends them to the `AddressBook`.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn local_listener_unspecified_port_localhost_addr_v6() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if zebra_test::net::zebra_skip_ipv6_tests() {
|
if zebra_test::net::zebra_skip_ipv6_tests() {
|
||||||
return;
|
return;
|
||||||
|
@ -102,11 +134,10 @@ async fn local_listener_unspecified_port_localhost_addr() {
|
||||||
|
|
||||||
/// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`.
|
/// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn local_listener_fixed_port_localhost_addr() {
|
async fn local_listener_fixed_port_localhost_addr_v4() {
|
||||||
zebra_test::init();
|
zebra_test::init();
|
||||||
|
|
||||||
let localhost_v4 = "127.0.0.1".parse().unwrap();
|
let localhost_v4 = "127.0.0.1".parse().unwrap();
|
||||||
let localhost_v6 = "::1".parse().unwrap();
|
|
||||||
|
|
||||||
if zebra_test::net::zebra_skip_network_tests() {
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
return;
|
return;
|
||||||
|
@ -114,6 +145,18 @@ async fn local_listener_fixed_port_localhost_addr() {
|
||||||
|
|
||||||
local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Mainnet).await;
|
local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Mainnet).await;
|
||||||
local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Testnet).await;
|
local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Testnet).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn local_listener_fixed_port_localhost_addr_v6() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
let localhost_v6 = "::1".parse().unwrap();
|
||||||
|
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if zebra_test::net::zebra_skip_ipv6_tests() {
|
if zebra_test::net::zebra_skip_ipv6_tests() {
|
||||||
return;
|
return;
|
||||||
|
@ -253,10 +296,10 @@ async fn crawler_peer_limit_zero_connect_panic() {
|
||||||
unreachable!("outbound connector should never be called with a zero peer limit")
|
unreachable!("outbound connector should never be called with a zero peer limit")
|
||||||
});
|
});
|
||||||
|
|
||||||
let (_config, mut peerset_tx) =
|
let (_config, mut peerset_rx) =
|
||||||
spawn_crawler_with_peer_limit(0, unreachable_outbound_connector).await;
|
spawn_crawler_with_peer_limit(0, unreachable_outbound_connector).await;
|
||||||
|
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_result = peerset_rx.try_next();
|
||||||
assert!(
|
assert!(
|
||||||
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
||||||
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
||||||
|
@ -277,10 +320,10 @@ async fn crawler_peer_limit_one_connect_error() {
|
||||||
let error_outbound_connector =
|
let error_outbound_connector =
|
||||||
service_fn(|_| async { Err("test outbound connector always returns errors".into()) });
|
service_fn(|_| async { Err("test outbound connector always returns errors".into()) });
|
||||||
|
|
||||||
let (_config, mut peerset_tx) =
|
let (_config, mut peerset_rx) =
|
||||||
spawn_crawler_with_peer_limit(1, error_outbound_connector).await;
|
spawn_crawler_with_peer_limit(1, error_outbound_connector).await;
|
||||||
|
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_result = peerset_rx.try_next();
|
||||||
assert!(
|
assert!(
|
||||||
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
||||||
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
||||||
|
@ -325,12 +368,12 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() {
|
||||||
Ok(Change::Insert(addr, fake_client))
|
Ok(Change::Insert(addr, fake_client))
|
||||||
});
|
});
|
||||||
|
|
||||||
let (config, mut peerset_tx) =
|
let (config, mut peerset_rx) =
|
||||||
spawn_crawler_with_peer_limit(1, success_disconnect_outbound_connector).await;
|
spawn_crawler_with_peer_limit(1, success_disconnect_outbound_connector).await;
|
||||||
|
|
||||||
let mut peer_count: usize = 0;
|
let mut peer_count: usize = 0;
|
||||||
loop {
|
loop {
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_result = peerset_rx.try_next();
|
||||||
match peer_result {
|
match peer_result {
|
||||||
// A peer handshake succeeded.
|
// A peer handshake succeeded.
|
||||||
Ok(Some(peer_result)) => {
|
Ok(Some(peer_result)) => {
|
||||||
|
@ -368,8 +411,11 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() {
|
||||||
// This test does not require network access, because the outbound connector
|
// This test does not require network access, because the outbound connector
|
||||||
// and peer set are fake.
|
// and peer set are fake.
|
||||||
|
|
||||||
let success_stay_open_outbound_connector =
|
let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded();
|
||||||
service_fn(|req: OutboundConnectorRequest| async move {
|
|
||||||
|
let success_stay_open_outbound_connector = service_fn(move |req: OutboundConnectorRequest| {
|
||||||
|
let peer_tracker_tx = peer_tracker_tx.clone();
|
||||||
|
async move {
|
||||||
let OutboundConnectorRequest {
|
let OutboundConnectorRequest {
|
||||||
addr,
|
addr,
|
||||||
connection_tracker,
|
connection_tracker,
|
||||||
|
@ -385,29 +431,49 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() {
|
||||||
error_slot,
|
error_slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Fake the connection being open forever.
|
// Make the connection staying open.
|
||||||
std::mem::forget(connection_tracker);
|
peer_tracker_tx
|
||||||
|
.unbounded_send(connection_tracker)
|
||||||
|
.expect("unexpected error sending to unbounded channel");
|
||||||
|
|
||||||
Ok(Change::Insert(addr, fake_client))
|
Ok(Change::Insert(addr, fake_client))
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let (config, mut peerset_tx) =
|
let (config, mut peerset_rx) =
|
||||||
spawn_crawler_with_peer_limit(1, success_stay_open_outbound_connector).await;
|
spawn_crawler_with_peer_limit(1, success_stay_open_outbound_connector).await;
|
||||||
|
|
||||||
let mut peer_count: usize = 0;
|
let mut peer_change_count: usize = 0;
|
||||||
loop {
|
loop {
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_change_result = peerset_rx.try_next();
|
||||||
match peer_result {
|
match peer_change_result {
|
||||||
// A peer handshake succeeded.
|
// A peer handshake succeeded.
|
||||||
Ok(Some(peer_result)) => {
|
Ok(Some(peer_change_result)) => {
|
||||||
assert!(
|
assert!(
|
||||||
matches!(peer_result, Ok(Change::Insert(_, _))),
|
matches!(peer_change_result, Ok(Change::Insert(_, _))),
|
||||||
"unexpected connection error: {:?}\n\
|
"unexpected connection error: {:?}\n\
|
||||||
{} previous peers succeeded",
|
{} previous peers succeeded",
|
||||||
peer_result,
|
peer_change_result,
|
||||||
peer_count,
|
peer_change_count,
|
||||||
);
|
);
|
||||||
peer_count += 1;
|
peer_change_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut peer_tracker_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_tracker_result = peer_tracker_rx.try_next();
|
||||||
|
match peer_tracker_result {
|
||||||
|
// We held this peer tracker open until now.
|
||||||
|
Ok(Some(peer_tracker)) => {
|
||||||
|
std::mem::drop(peer_tracker);
|
||||||
|
peer_tracker_count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The channel is closed and there are no messages left in the channel.
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
@ -418,10 +484,19 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
peer_count <= config.peerset_outbound_connection_limit(),
|
peer_change_count <= config.peerset_outbound_connection_limit(),
|
||||||
"unexpected number of peer connections {}, over limit of {}",
|
"unexpected number of peer changes {}, over limit of {}, had {} peer trackers",
|
||||||
peer_count,
|
peer_change_count,
|
||||||
config.peerset_outbound_connection_limit(),
|
config.peerset_outbound_connection_limit(),
|
||||||
|
peer_tracker_count,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_tracker_count <= config.peerset_outbound_connection_limit(),
|
||||||
|
"unexpected number of peer trackers {}, over limit of {}, had {} peer changes",
|
||||||
|
peer_tracker_count,
|
||||||
|
config.peerset_outbound_connection_limit(),
|
||||||
|
peer_change_count,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -436,10 +511,10 @@ async fn crawler_peer_limit_default_connect_error() {
|
||||||
let error_outbound_connector =
|
let error_outbound_connector =
|
||||||
service_fn(|_| async { Err("test outbound connector always returns errors".into()) });
|
service_fn(|_| async { Err("test outbound connector always returns errors".into()) });
|
||||||
|
|
||||||
let (_config, mut peerset_tx) =
|
let (_config, mut peerset_rx) =
|
||||||
spawn_crawler_with_peer_limit(None, error_outbound_connector).await;
|
spawn_crawler_with_peer_limit(None, error_outbound_connector).await;
|
||||||
|
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_result = peerset_rx.try_next();
|
||||||
assert!(
|
assert!(
|
||||||
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
||||||
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
||||||
|
@ -484,12 +559,14 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() {
|
||||||
Ok(Change::Insert(addr, fake_client))
|
Ok(Change::Insert(addr, fake_client))
|
||||||
});
|
});
|
||||||
|
|
||||||
let (config, mut peerset_tx) =
|
// TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit
|
||||||
spawn_crawler_with_peer_limit(None, success_disconnect_outbound_connector).await;
|
// (currently, getting over the limit can take 30 seconds or more)
|
||||||
|
let (config, mut peerset_rx) =
|
||||||
|
spawn_crawler_with_peer_limit(15, success_disconnect_outbound_connector).await;
|
||||||
|
|
||||||
let mut peer_count: usize = 0;
|
let mut peer_count: usize = 0;
|
||||||
loop {
|
loop {
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_result = peerset_rx.try_next();
|
||||||
match peer_result {
|
match peer_result {
|
||||||
// A peer handshake succeeded.
|
// A peer handshake succeeded.
|
||||||
Ok(Some(peer_result)) => {
|
Ok(Some(peer_result)) => {
|
||||||
|
@ -510,14 +587,11 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit
|
|
||||||
// (currently, getting over the limit can take 30 seconds or more)
|
|
||||||
let lower_limit = config.peerset_outbound_connection_limit() / 3;
|
|
||||||
assert!(
|
assert!(
|
||||||
peer_count > lower_limit,
|
peer_count > config.peerset_outbound_connection_limit(),
|
||||||
"unexpected number of peer connections {}, should be over the limit of {}",
|
"unexpected number of peer connections {}, should be over the limit of {}",
|
||||||
peer_count,
|
peer_count,
|
||||||
lower_limit,
|
config.peerset_outbound_connection_limit(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -530,8 +604,11 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() {
|
||||||
// This test does not require network access, because the outbound connector
|
// This test does not require network access, because the outbound connector
|
||||||
// and peer set are fake.
|
// and peer set are fake.
|
||||||
|
|
||||||
let success_stay_open_outbound_connector =
|
let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded();
|
||||||
service_fn(|req: OutboundConnectorRequest| async move {
|
|
||||||
|
let success_stay_open_outbound_connector = service_fn(move |req: OutboundConnectorRequest| {
|
||||||
|
let peer_tracker_tx = peer_tracker_tx.clone();
|
||||||
|
async move {
|
||||||
let OutboundConnectorRequest {
|
let OutboundConnectorRequest {
|
||||||
addr,
|
addr,
|
||||||
connection_tracker,
|
connection_tracker,
|
||||||
|
@ -547,19 +624,173 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() {
|
||||||
error_slot,
|
error_slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Fake the connection being open forever.
|
// Make the connection staying open.
|
||||||
std::mem::forget(connection_tracker);
|
peer_tracker_tx
|
||||||
|
.unbounded_send(connection_tracker)
|
||||||
|
.expect("unexpected error sending to unbounded channel");
|
||||||
|
|
||||||
Ok(Change::Insert(addr, fake_client))
|
Ok(Change::Insert(addr, fake_client))
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// The initial target size is multiplied by 1.5 to give the actual limit.
|
// The initial target size is multiplied by 1.5 to give the actual limit.
|
||||||
let (config, mut peerset_tx) =
|
let (config, mut peerset_rx) =
|
||||||
spawn_crawler_with_peer_limit(None, success_stay_open_outbound_connector).await;
|
spawn_crawler_with_peer_limit(None, success_stay_open_outbound_connector).await;
|
||||||
|
|
||||||
|
let mut peer_change_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_change_result = peerset_rx.try_next();
|
||||||
|
match peer_change_result {
|
||||||
|
// A peer handshake succeeded.
|
||||||
|
Ok(Some(peer_change_result)) => {
|
||||||
|
assert!(
|
||||||
|
matches!(peer_change_result, Ok(Change::Insert(_, _))),
|
||||||
|
"unexpected connection error: {:?}\n\
|
||||||
|
{} previous peers succeeded",
|
||||||
|
peer_change_result,
|
||||||
|
peer_change_count,
|
||||||
|
);
|
||||||
|
peer_change_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut peer_tracker_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_tracker_result = peer_tracker_rx.try_next();
|
||||||
|
match peer_tracker_result {
|
||||||
|
// We held this peer tracker open until now.
|
||||||
|
Ok(Some(peer_tracker)) => {
|
||||||
|
std::mem::drop(peer_tracker);
|
||||||
|
peer_tracker_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_change_count <= config.peerset_outbound_connection_limit(),
|
||||||
|
"unexpected number of peer changes {}, over limit of {}, had {} peer trackers",
|
||||||
|
peer_change_count,
|
||||||
|
config.peerset_outbound_connection_limit(),
|
||||||
|
peer_tracker_count,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_tracker_count <= config.peerset_outbound_connection_limit(),
|
||||||
|
"unexpected number of peer trackers {}, over limit of {}, had {} peer changes",
|
||||||
|
peer_tracker_count,
|
||||||
|
config.peerset_outbound_connection_limit(),
|
||||||
|
peer_change_count,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with an inbound peer limit of zero peers, and a handshaker that panics.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_zero_handshake_panic() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let unreachable_inbound_handshaker = service_fn(|_| async {
|
||||||
|
unreachable!("inbound handshaker should never be called with a zero peer limit")
|
||||||
|
});
|
||||||
|
|
||||||
|
let (_config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(0, unreachable_inbound_handshaker).await;
|
||||||
|
|
||||||
|
let peer_result = peerset_rx.try_next();
|
||||||
|
assert!(
|
||||||
|
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
||||||
|
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
||||||
|
matches!(peer_result, Err(_) | Ok(None)),
|
||||||
|
"unexpected peer when inbound limit is zero: {:?}",
|
||||||
|
peer_result,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with an inbound peer limit of one peer, and a handshaker that always errors.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_one_handshake_error() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let error_inbound_handshaker =
|
||||||
|
service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) });
|
||||||
|
|
||||||
|
let (_config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(1, error_inbound_handshaker).await;
|
||||||
|
|
||||||
|
let peer_result = peerset_rx.try_next();
|
||||||
|
assert!(
|
||||||
|
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
||||||
|
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
||||||
|
matches!(peer_result, Err(_) | Ok(None)),
|
||||||
|
"unexpected peer when all handshakes error: {:?}",
|
||||||
|
peer_result,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with an inbound peer limit of one peer,
|
||||||
|
/// and a handshaker that returns success then disconnects the peer.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_one_handshake_ok_then_drop() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move {
|
||||||
|
let HandshakeRequest {
|
||||||
|
tcp_stream,
|
||||||
|
connected_addr: _,
|
||||||
|
connection_tracker,
|
||||||
|
} = req;
|
||||||
|
|
||||||
|
let (server_tx, _server_rx) = mpsc::channel(0);
|
||||||
|
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
|
||||||
|
let error_slot = ErrorSlot::default();
|
||||||
|
|
||||||
|
let fake_client = peer::Client {
|
||||||
|
shutdown_tx: Some(shutdown_tx),
|
||||||
|
server_tx,
|
||||||
|
error_slot,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Actually close the connection.
|
||||||
|
std::mem::drop(connection_tracker);
|
||||||
|
std::mem::drop(tcp_stream);
|
||||||
|
|
||||||
|
// Give the crawler time to get the message.
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
Ok(fake_client)
|
||||||
|
});
|
||||||
|
|
||||||
|
let (config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await;
|
||||||
|
|
||||||
let mut peer_count: usize = 0;
|
let mut peer_count: usize = 0;
|
||||||
loop {
|
loop {
|
||||||
let peer_result = peerset_tx.try_next();
|
let peer_result = peerset_rx.try_next();
|
||||||
match peer_result {
|
match peer_result {
|
||||||
// A peer handshake succeeded.
|
// A peer handshake succeeded.
|
||||||
Ok(Some(peer_result)) => {
|
Ok(Some(peer_result)) => {
|
||||||
|
@ -581,10 +812,312 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
peer_count <= config.peerset_outbound_connection_limit(),
|
peer_count > config.peerset_inbound_connection_limit(),
|
||||||
"unexpected number of peer connections {}, over limit of {}",
|
"unexpected number of peer connections {}, should be over the limit of {}",
|
||||||
peer_count,
|
peer_count,
|
||||||
config.peerset_outbound_connection_limit(),
|
config.peerset_inbound_connection_limit(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with an inbound peer limit of one peer,
|
||||||
|
/// and a handshaker that returns success then holds the peer open.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_one_handshake_ok_stay_open() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| {
|
||||||
|
let peer_tracker_tx = peer_tracker_tx.clone();
|
||||||
|
async move {
|
||||||
|
let HandshakeRequest {
|
||||||
|
tcp_stream,
|
||||||
|
connected_addr: _,
|
||||||
|
connection_tracker,
|
||||||
|
} = req;
|
||||||
|
|
||||||
|
let (server_tx, _server_rx) = mpsc::channel(0);
|
||||||
|
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
|
||||||
|
let error_slot = ErrorSlot::default();
|
||||||
|
|
||||||
|
let fake_client = peer::Client {
|
||||||
|
shutdown_tx: Some(shutdown_tx),
|
||||||
|
server_tx,
|
||||||
|
error_slot,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Make the connection staying open.
|
||||||
|
peer_tracker_tx
|
||||||
|
.unbounded_send((tcp_stream, connection_tracker))
|
||||||
|
.expect("unexpected error sending to unbounded channel");
|
||||||
|
|
||||||
|
Ok(fake_client)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let (config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await;
|
||||||
|
|
||||||
|
let mut peer_change_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_change_result = peerset_rx.try_next();
|
||||||
|
match peer_change_result {
|
||||||
|
// A peer handshake succeeded.
|
||||||
|
Ok(Some(peer_change_result)) => {
|
||||||
|
assert!(
|
||||||
|
matches!(peer_change_result, Ok(Change::Insert(_, _))),
|
||||||
|
"unexpected connection error: {:?}\n\
|
||||||
|
{} previous peers succeeded",
|
||||||
|
peer_change_result,
|
||||||
|
peer_change_count,
|
||||||
|
);
|
||||||
|
peer_change_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut peer_tracker_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_tracker_result = peer_tracker_rx.try_next();
|
||||||
|
match peer_tracker_result {
|
||||||
|
// We held this peer connection and tracker open until now.
|
||||||
|
Ok(Some((peer_connection, peer_tracker))) => {
|
||||||
|
std::mem::drop(peer_connection);
|
||||||
|
std::mem::drop(peer_tracker);
|
||||||
|
peer_tracker_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_change_count <= config.peerset_inbound_connection_limit(),
|
||||||
|
"unexpected number of peer changes {}, over limit of {}, had {} peer trackers",
|
||||||
|
peer_change_count,
|
||||||
|
config.peerset_inbound_connection_limit(),
|
||||||
|
peer_tracker_count,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_tracker_count <= config.peerset_inbound_connection_limit(),
|
||||||
|
"unexpected number of peer trackers {}, over limit of {}, had {} peer changes",
|
||||||
|
peer_tracker_count,
|
||||||
|
config.peerset_inbound_connection_limit(),
|
||||||
|
peer_change_count,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with the default inbound peer limit, and a handshaker that always errors.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_default_handshake_error() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let error_inbound_handshaker =
|
||||||
|
service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) });
|
||||||
|
|
||||||
|
let (_config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(None, error_inbound_handshaker).await;
|
||||||
|
|
||||||
|
let peer_result = peerset_rx.try_next();
|
||||||
|
assert!(
|
||||||
|
// `Err(_)` means that no peers are available, and the sender has not been dropped.
|
||||||
|
// `Ok(None)` means that no peers are available, and the sender has been dropped.
|
||||||
|
matches!(peer_result, Err(_) | Ok(None)),
|
||||||
|
"unexpected peer when all handshakes error: {:?}",
|
||||||
|
peer_result,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with the default inbound peer limit,
|
||||||
|
/// and a handshaker that returns success then disconnects the peer.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_default_handshake_ok_then_drop() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move {
|
||||||
|
let HandshakeRequest {
|
||||||
|
tcp_stream,
|
||||||
|
connected_addr: _,
|
||||||
|
connection_tracker,
|
||||||
|
} = req;
|
||||||
|
|
||||||
|
let (server_tx, _server_rx) = mpsc::channel(0);
|
||||||
|
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
|
||||||
|
let error_slot = ErrorSlot::default();
|
||||||
|
|
||||||
|
let fake_client = peer::Client {
|
||||||
|
shutdown_tx: Some(shutdown_tx),
|
||||||
|
server_tx,
|
||||||
|
error_slot,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Actually close the connection.
|
||||||
|
std::mem::drop(connection_tracker);
|
||||||
|
std::mem::drop(tcp_stream);
|
||||||
|
|
||||||
|
// Give the crawler time to get the message.
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
Ok(fake_client)
|
||||||
|
});
|
||||||
|
|
||||||
|
let (config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await;
|
||||||
|
|
||||||
|
let mut peer_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_result = peerset_rx.try_next();
|
||||||
|
match peer_result {
|
||||||
|
// A peer handshake succeeded.
|
||||||
|
Ok(Some(peer_result)) => {
|
||||||
|
assert!(
|
||||||
|
matches!(peer_result, Ok(Change::Insert(_, _))),
|
||||||
|
"unexpected connection error: {:?}\n\
|
||||||
|
{} previous peers succeeded",
|
||||||
|
peer_result,
|
||||||
|
peer_count,
|
||||||
|
);
|
||||||
|
peer_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_count > config.peerset_inbound_connection_limit(),
|
||||||
|
"unexpected number of peer connections {}, should be over the limit of {}",
|
||||||
|
peer_count,
|
||||||
|
config.peerset_inbound_connection_limit(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the listener with the default inbound peer limit,
|
||||||
|
/// and a handshaker that returns success then holds the peer open.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn listener_peer_limit_default_handshake_ok_stay_open() {
|
||||||
|
zebra_test::init();
|
||||||
|
|
||||||
|
// This test requires an IPv4 network stack with 127.0.0.1 as localhost.
|
||||||
|
if zebra_test::net::zebra_skip_network_tests() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded();
|
||||||
|
|
||||||
|
let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| {
|
||||||
|
let peer_tracker_tx = peer_tracker_tx.clone();
|
||||||
|
async move {
|
||||||
|
let HandshakeRequest {
|
||||||
|
tcp_stream,
|
||||||
|
connected_addr: _,
|
||||||
|
connection_tracker,
|
||||||
|
} = req;
|
||||||
|
|
||||||
|
let (server_tx, _server_rx) = mpsc::channel(0);
|
||||||
|
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
|
||||||
|
let error_slot = ErrorSlot::default();
|
||||||
|
|
||||||
|
let fake_client = peer::Client {
|
||||||
|
shutdown_tx: Some(shutdown_tx),
|
||||||
|
server_tx,
|
||||||
|
error_slot,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Make the connection staying open.
|
||||||
|
peer_tracker_tx
|
||||||
|
.unbounded_send((tcp_stream, connection_tracker))
|
||||||
|
.expect("unexpected error sending to unbounded channel");
|
||||||
|
|
||||||
|
Ok(fake_client)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let (config, mut peerset_rx) =
|
||||||
|
spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await;
|
||||||
|
|
||||||
|
let mut peer_change_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_change_result = peerset_rx.try_next();
|
||||||
|
match peer_change_result {
|
||||||
|
// A peer handshake succeeded.
|
||||||
|
Ok(Some(peer_change_result)) => {
|
||||||
|
assert!(
|
||||||
|
matches!(peer_change_result, Ok(Change::Insert(_, _))),
|
||||||
|
"unexpected connection error: {:?}\n\
|
||||||
|
{} previous peers succeeded",
|
||||||
|
peer_change_result,
|
||||||
|
peer_change_count,
|
||||||
|
);
|
||||||
|
peer_change_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut peer_tracker_count: usize = 0;
|
||||||
|
loop {
|
||||||
|
let peer_tracker_result = peer_tracker_rx.try_next();
|
||||||
|
match peer_tracker_result {
|
||||||
|
// We held this peer connection and tracker open until now.
|
||||||
|
Ok(Some((peer_connection, peer_tracker))) => {
|
||||||
|
std::mem::drop(peer_connection);
|
||||||
|
std::mem::drop(peer_tracker);
|
||||||
|
peer_tracker_count += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The channel is closed and there are no messages left in the channel.
|
||||||
|
Ok(None) => break,
|
||||||
|
// The channel is still open, but there are no messages left in the channel.
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_change_count <= config.peerset_inbound_connection_limit(),
|
||||||
|
"unexpected number of peer changes {}, over limit of {}, had {} peer trackers",
|
||||||
|
peer_change_count,
|
||||||
|
config.peerset_inbound_connection_limit(),
|
||||||
|
peer_tracker_count,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
peer_tracker_count <= config.peerset_inbound_connection_limit(),
|
||||||
|
"unexpected number of peer trackers {}, over limit of {}, had {} peer changes",
|
||||||
|
peer_tracker_count,
|
||||||
|
config.peerset_inbound_connection_limit(),
|
||||||
|
peer_change_count,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -801,6 +1334,102 @@ where
|
||||||
(config, peerset_rx)
|
(config, peerset_rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run an inbound peer listener with `peerset_initial_target_size` and `handshaker`.
|
||||||
|
///
|
||||||
|
/// Uses the default values for all other config fields.
|
||||||
|
///
|
||||||
|
/// Returns the generated [`Config`], and the peer set receiver.
|
||||||
|
async fn spawn_inbound_listener_with_peer_limit<S>(
|
||||||
|
peerset_initial_target_size: impl Into<Option<usize>>,
|
||||||
|
listen_handshaker: S,
|
||||||
|
) -> (Config, mpsc::Receiver<PeerChange>)
|
||||||
|
where
|
||||||
|
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ 'static,
|
||||||
|
S::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
// Create a test config that listens on any unused port.
|
||||||
|
let listen_addr = "127.0.0.1:0".parse().unwrap();
|
||||||
|
let mut config = Config {
|
||||||
|
listen_addr,
|
||||||
|
..Config::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(peerset_initial_target_size) = peerset_initial_target_size.into() {
|
||||||
|
config.peerset_initial_target_size = peerset_initial_target_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open the listener port.
|
||||||
|
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
|
||||||
|
|
||||||
|
// Make enough inbound connections to go over the limit, even if the limit is zero.
|
||||||
|
// Make the channels large enough to hold all the connections.
|
||||||
|
let over_limit_connections = config.peerset_inbound_connection_limit() * 2 + 1;
|
||||||
|
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(over_limit_connections);
|
||||||
|
|
||||||
|
// Start listening for connections.
|
||||||
|
let listen_fut = accept_inbound_connections(
|
||||||
|
config.clone(),
|
||||||
|
tcp_listener,
|
||||||
|
listen_handshaker,
|
||||||
|
peerset_tx.clone(),
|
||||||
|
);
|
||||||
|
let listen_task_handle = tokio::spawn(listen_fut);
|
||||||
|
|
||||||
|
// Open inbound connections.
|
||||||
|
let mut outbound_task_handles = Vec::new();
|
||||||
|
for _ in 0..over_limit_connections {
|
||||||
|
let outbound_fut = async move {
|
||||||
|
let outbound_result = TcpStream::connect(listen_addr).await;
|
||||||
|
// Let other tasks run before we block on reading.
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
if let Ok(outbound_stream) = outbound_result {
|
||||||
|
// Wait until the listener closes the connection.
|
||||||
|
// The handshaker is fake, so it never sends any data.
|
||||||
|
let readable_result = outbound_stream.readable().await;
|
||||||
|
info!(
|
||||||
|
?readable_result,
|
||||||
|
"outbound connection became readable or errored: \
|
||||||
|
closing connection to test inbound listener"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// If the connection is closed quickly, we might get errors here.
|
||||||
|
debug!(
|
||||||
|
?outbound_result,
|
||||||
|
"outbound connection error in inbound listener test"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let outbound_task_handle = tokio::spawn(outbound_fut);
|
||||||
|
outbound_task_handles.push(outbound_task_handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let the listener run for a while.
|
||||||
|
tokio::time::sleep(LISTENER_TEST_DURATION).await;
|
||||||
|
|
||||||
|
// Stop the listener and outbound tasks, and let them finish.
|
||||||
|
listen_task_handle.abort();
|
||||||
|
for outbound_task_handle in outbound_task_handles {
|
||||||
|
outbound_task_handle.abort();
|
||||||
|
}
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
|
||||||
|
// Check for panics or errors in the listener.
|
||||||
|
let listen_result = listen_task_handle.now_or_never();
|
||||||
|
assert!(
|
||||||
|
matches!(listen_result, None)
|
||||||
|
|| matches!(listen_result, Some(Err(ref e)) if e.is_cancelled()),
|
||||||
|
"unexpected error or panic in inbound peer listener task: {:?}",
|
||||||
|
listen_result,
|
||||||
|
);
|
||||||
|
|
||||||
|
(config, peerset_rx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Initialize a task that connects to `peer_count` initial peers using the
|
/// Initialize a task that connects to `peer_count` initial peers using the
|
||||||
/// given connector.
|
/// given connector.
|
||||||
///
|
///
|
||||||
|
|
|
@ -80,7 +80,7 @@ use crate::{
|
||||||
external::InventoryHash,
|
external::InventoryHash,
|
||||||
internal::{Request, Response},
|
internal::{Request, Response},
|
||||||
},
|
},
|
||||||
AddressBook, BoxError,
|
AddressBook, BoxError, Config,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
|
/// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
|
||||||
|
@ -134,6 +134,8 @@ where
|
||||||
///
|
///
|
||||||
/// Used for logging diagnostics.
|
/// Used for logging diagnostics.
|
||||||
address_book: Arc<std::sync::Mutex<AddressBook>>,
|
address_book: Arc<std::sync::Mutex<AddressBook>>,
|
||||||
|
/// The configured limit for inbound and outbound connections.
|
||||||
|
peerset_total_connection_limit: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<D> PeerSet<D>
|
impl<D> PeerSet<D>
|
||||||
|
@ -147,6 +149,7 @@ where
|
||||||
{
|
{
|
||||||
/// Construct a peerset which uses `discover` internally.
|
/// Construct a peerset which uses `discover` internally.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
config: &Config,
|
||||||
discover: D,
|
discover: D,
|
||||||
demand_signal: mpsc::Sender<MorePeers>,
|
demand_signal: mpsc::Sender<MorePeers>,
|
||||||
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
|
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
|
||||||
|
@ -165,6 +168,7 @@ where
|
||||||
inventory_registry: InventoryRegistry::new(inv_stream),
|
inventory_registry: InventoryRegistry::new(inv_stream),
|
||||||
last_peer_log: None,
|
last_peer_log: None,
|
||||||
address_book,
|
address_book,
|
||||||
|
peerset_total_connection_limit: config.peerset_total_connection_limit(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,6 +436,17 @@ where
|
||||||
metrics::gauge!("pool.num_ready", num_ready as f64);
|
metrics::gauge!("pool.num_ready", num_ready as f64);
|
||||||
metrics::gauge!("pool.num_unready", num_unready as f64);
|
metrics::gauge!("pool.num_unready", num_unready as f64);
|
||||||
metrics::gauge!("zcash.net.peers", num_peers as f64);
|
metrics::gauge!("zcash.net.peers", num_peers as f64);
|
||||||
|
|
||||||
|
// Security: make sure we haven't exceeded the connection limit
|
||||||
|
if num_peers > self.peerset_total_connection_limit {
|
||||||
|
let address_metrics = self.address_book.lock().unwrap().address_metrics();
|
||||||
|
panic!(
|
||||||
|
"unexpectedly exceeded configured peer set connection limit: \n\
|
||||||
|
peers: {:?}, ready: {:?}, unready: {:?}, \n\
|
||||||
|
address_metrics: {:?}",
|
||||||
|
num_peers, num_ready, num_unready, address_metrics,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue