diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index d01a6a314..ab355b6fc 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -144,6 +144,7 @@ where // Connect the rx end to a PeerSet, wrapping new peers in load instruments. let peer_set = PeerSet::new( + &config, PeakEwmaDiscover::new( // Discover interprets an error as stream termination, // so discard any errored connections... @@ -162,19 +163,20 @@ where // Connect peerset_tx to the 3 peer sources: // // 1. Incoming peer connections, via a listener. - let listen_guard = tokio::spawn( - accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone()) - .instrument(Span::current()), + let listen_fut = accept_inbound_connections( + config.clone(), + tcp_listener, + listen_handshaker, + peerset_tx.clone(), ); + let listen_guard = tokio::spawn(listen_fut.instrument(Span::current())); // 2. Initial peers, specified in the config. - let initial_peers_fut = { - let config = config.clone(); - let outbound_connector = outbound_connector.clone(); - let peerset_tx = peerset_tx.clone(); - add_initial_peers(config, outbound_connector, peerset_tx) - }; - + let initial_peers_fut = add_initial_peers( + config.clone(), + outbound_connector.clone(), + peerset_tx.clone(), + ); let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current())); // 3. Outgoing peers we connect to in response to load. @@ -202,18 +204,15 @@ where let _ = demand_tx.try_send(MorePeers); } - let crawl_fut = { - let config = config.clone(); - crawl_and_dial( - config, - demand_tx, - demand_rx, - candidates, - outbound_connector, - peerset_tx, - active_outbound_connections, - ) - }; + let crawl_fut = crawl_and_dial( + config, + demand_tx, + demand_rx, + candidates, + outbound_connector, + peerset_tx, + active_outbound_connections, + ); let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current())); handle_tx.send(vec![listen_guard, crawl_guard]).unwrap(); @@ -434,8 +433,11 @@ async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) { /// /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends /// the [`peer::Client`] result over `peerset_tx`. -#[instrument(skip(listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] +/// +/// Limit the number of active inbound connections based on `config`. +#[instrument(skip(config, listener, handshaker, peerset_tx), fields(listener_addr = ?listener.local_addr()))] async fn accept_inbound_connections( + config: Config, listener: TcpListener, mut handshaker: S, peerset_tx: mpsc::Sender, @@ -447,8 +449,19 @@ where let mut active_inbound_connections = ActiveConnectionCounter::new_counter(); loop { - if let Ok((tcp_stream, addr)) = listener.accept().await { - // The peer already opened a connection, so increment the connection count immediately. + let inbound_result = listener.accept().await; + if let Ok((tcp_stream, addr)) = inbound_result { + if active_inbound_connections.update_count() + >= config.peerset_inbound_connection_limit() + { + // Too many open inbound connections or pending handshakes already. + // Close the connection. + std::mem::drop(tcp_stream); + continue; + } + + // The peer already opened a connection to us. + // So we want to increment the connection count as soon as possible. let connection_tracker = active_inbound_connections.track_connection(); debug!( inbound_connections = ?active_inbound_connections.update_count(), @@ -492,6 +505,8 @@ where // Zebra can't control how many queued connections are waiting, // but most OSes also limit the number of queued inbound connections on a listener port. tokio::time::sleep(constants::MIN_PEER_CONNECTION_INTERVAL).await; + } else { + debug!(?inbound_result, "error accepting inbound connection"); } // Security: Let other tasks run after each connection is processed. @@ -536,8 +551,8 @@ enum CrawlerAction { /// permanent internal error. Transient errors and individual peer errors should /// be handled within the crawler. /// -/// Uses `active_outbound_connections` to track the number of active outbound connections -/// in both the initial peers and crawler. +/// Uses `active_outbound_connections` to limit the number of active outbound connections +/// across both the initial peers and crawler. The limit is based on `config`. #[instrument(skip( config, demand_tx, @@ -606,7 +621,7 @@ where // turn the demand into an action, based on the crawler's current state _ = demand_rx.next() => { if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() { - // Too many open connections or pending handshakes already + // Too many open outbound connections or pending handshakes already DemandDrop } else if let Some(candidate) = candidates.next().await { // candidates.next has a short delay, and briefly holds the address diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 71366de8a..245fb212b 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -24,7 +24,7 @@ use futures::{ channel::{mpsc, oneshot}, FutureExt, StreamExt, }; -use tokio::task::JoinHandle; +use tokio::{net::TcpStream, task::JoinHandle}; use tower::{discover::Change, service_fn, Service}; use tracing::Span; @@ -34,9 +34,12 @@ use zebra_test::net::random_known_port; use crate::{ constants, init, meta_addr::MetaAddr, - peer::{self, ErrorSlot, OutboundConnectorRequest}, + peer::{self, ErrorSlot, HandshakeRequest, OutboundConnectorRequest}, peer_set::{ - initialize::{add_initial_peers, crawl_and_dial, PeerChange}, + initialize::{ + accept_inbound_connections, add_initial_peers, crawl_and_dial, open_listener, + PeerChange, + }, set::MorePeers, ActiveConnectionCounter, CandidateSet, }, @@ -51,12 +54,17 @@ use Network::*; /// Using a very short time can make the crawler not run at all. const CRAWLER_TEST_DURATION: Duration = Duration::from_secs(10); +/// The amount of time to run the listener, before testing what it has done. +/// +/// Using a very short time can make the listener not run at all. +const LISTENER_TEST_DURATION: Duration = Duration::from_secs(10); + /// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports, /// and sends them to the `AddressBook`. /// /// Note: This test doesn't cover local interface or public IP address discovery. #[tokio::test] -async fn local_listener_unspecified_port_unspecified_addr() { +async fn local_listener_unspecified_port_unspecified_addr_v4() { zebra_test::init(); if zebra_test::net::zebra_skip_network_tests() { @@ -67,6 +75,19 @@ async fn local_listener_unspecified_port_unspecified_addr() { // (localhost should be enough) local_listener_port_with("0.0.0.0:0".parse().unwrap(), Mainnet).await; local_listener_port_with("0.0.0.0:0".parse().unwrap(), Testnet).await; +} + +/// Test that zebra-network discovers dynamic bind-to-all-interfaces listener ports, +/// and sends them to the `AddressBook`. +/// +/// Note: This test doesn't cover local interface or public IP address discovery. +#[tokio::test] +async fn local_listener_unspecified_port_unspecified_addr_v6() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } if zebra_test::net::zebra_skip_ipv6_tests() { return; @@ -80,7 +101,7 @@ async fn local_listener_unspecified_port_unspecified_addr() { /// Test that zebra-network discovers dynamic localhost listener ports, /// and sends them to the `AddressBook`. #[tokio::test] -async fn local_listener_unspecified_port_localhost_addr() { +async fn local_listener_unspecified_port_localhost_addr_v4() { zebra_test::init(); if zebra_test::net::zebra_skip_network_tests() { @@ -90,6 +111,17 @@ async fn local_listener_unspecified_port_localhost_addr() { // these tests might fail on machines with unusual IPv4 localhost configs local_listener_port_with("127.0.0.1:0".parse().unwrap(), Mainnet).await; local_listener_port_with("127.0.0.1:0".parse().unwrap(), Testnet).await; +} + +/// Test that zebra-network discovers dynamic localhost listener ports, +/// and sends them to the `AddressBook`. +#[tokio::test] +async fn local_listener_unspecified_port_localhost_addr_v6() { + zebra_test::init(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } if zebra_test::net::zebra_skip_ipv6_tests() { return; @@ -102,11 +134,10 @@ async fn local_listener_unspecified_port_localhost_addr() { /// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`. #[tokio::test] -async fn local_listener_fixed_port_localhost_addr() { +async fn local_listener_fixed_port_localhost_addr_v4() { zebra_test::init(); let localhost_v4 = "127.0.0.1".parse().unwrap(); - let localhost_v6 = "::1".parse().unwrap(); if zebra_test::net::zebra_skip_network_tests() { return; @@ -114,6 +145,18 @@ async fn local_listener_fixed_port_localhost_addr() { local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Mainnet).await; local_listener_port_with(SocketAddr::new(localhost_v4, random_known_port()), Testnet).await; +} + +/// Test that zebra-network propagates fixed localhost listener ports to the `AddressBook`. +#[tokio::test] +async fn local_listener_fixed_port_localhost_addr_v6() { + zebra_test::init(); + + let localhost_v6 = "::1".parse().unwrap(); + + if zebra_test::net::zebra_skip_network_tests() { + return; + } if zebra_test::net::zebra_skip_ipv6_tests() { return; @@ -253,10 +296,10 @@ async fn crawler_peer_limit_zero_connect_panic() { unreachable!("outbound connector should never be called with a zero peer limit") }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_crawler_with_peer_limit(0, unreachable_outbound_connector).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -277,10 +320,10 @@ async fn crawler_peer_limit_one_connect_error() { let error_outbound_connector = service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_crawler_with_peer_limit(1, error_outbound_connector).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -325,12 +368,12 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() { Ok(Change::Insert(addr, fake_client)) }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(1, success_disconnect_outbound_connector).await; let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -368,8 +411,11 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { // This test does not require network access, because the outbound connector // and peer set are fake. - let success_stay_open_outbound_connector = - service_fn(|req: OutboundConnectorRequest| async move { + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); + + let success_stay_open_outbound_connector = service_fn(move |req: OutboundConnectorRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { let OutboundConnectorRequest { addr, connection_tracker, @@ -385,29 +431,49 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { error_slot, }; - // Fake the connection being open forever. - std::mem::forget(connection_tracker); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send(connection_tracker) + .expect("unexpected error sending to unbounded channel"); Ok(Change::Insert(addr, fake_client)) - }); + } + }); - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(1, success_stay_open_outbound_connector).await; - let mut peer_count: usize = 0; + let mut peer_change_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); - match peer_result { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { // A peer handshake succeeded. - Ok(Some(peer_result)) => { + Ok(Some(peer_change_result)) => { assert!( - matches!(peer_result, Ok(Change::Insert(_, _))), + matches!(peer_change_result, Ok(Change::Insert(_, _))), "unexpected connection error: {:?}\n\ {} previous peers succeeded", - peer_result, - peer_count, + peer_change_result, + peer_change_count, ); - peer_count += 1; + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer tracker open until now. + Ok(Some(peer_tracker)) => { + std::mem::drop(peer_tracker); + peer_tracker_count += 1; } // The channel is closed and there are no messages left in the channel. @@ -418,10 +484,19 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() { } assert!( - peer_count <= config.peerset_outbound_connection_limit(), - "unexpected number of peer connections {}, over limit of {}", - peer_count, + peer_change_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, config.peerset_outbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, + config.peerset_outbound_connection_limit(), + peer_change_count, ); } @@ -436,10 +511,10 @@ async fn crawler_peer_limit_default_connect_error() { let error_outbound_connector = service_fn(|_| async { Err("test outbound connector always returns errors".into()) }); - let (_config, mut peerset_tx) = + let (_config, mut peerset_rx) = spawn_crawler_with_peer_limit(None, error_outbound_connector).await; - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); assert!( // `Err(_)` means that no peers are available, and the sender has not been dropped. // `Ok(None)` means that no peers are available, and the sender has been dropped. @@ -484,12 +559,14 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { Ok(Change::Insert(addr, fake_client)) }); - let (config, mut peerset_tx) = - spawn_crawler_with_peer_limit(None, success_disconnect_outbound_connector).await; + // TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit + // (currently, getting over the limit can take 30 seconds or more) + let (config, mut peerset_rx) = + spawn_crawler_with_peer_limit(15, success_disconnect_outbound_connector).await; let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -510,14 +587,11 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() { } } - // TODO: tweak the crawler timeouts and rate-limits so we get over the actual limit - // (currently, getting over the limit can take 30 seconds or more) - let lower_limit = config.peerset_outbound_connection_limit() / 3; assert!( - peer_count > lower_limit, + peer_count > config.peerset_outbound_connection_limit(), "unexpected number of peer connections {}, should be over the limit of {}", peer_count, - lower_limit, + config.peerset_outbound_connection_limit(), ); } @@ -530,8 +604,11 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { // This test does not require network access, because the outbound connector // and peer set are fake. - let success_stay_open_outbound_connector = - service_fn(|req: OutboundConnectorRequest| async move { + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); + + let success_stay_open_outbound_connector = service_fn(move |req: OutboundConnectorRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { let OutboundConnectorRequest { addr, connection_tracker, @@ -547,19 +624,173 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { error_slot, }; - // Fake the connection being open forever. - std::mem::forget(connection_tracker); + // Make the connection staying open. + peer_tracker_tx + .unbounded_send(connection_tracker) + .expect("unexpected error sending to unbounded channel"); Ok(Change::Insert(addr, fake_client)) - }); + } + }); // The initial target size is multiplied by 1.5 to give the actual limit. - let (config, mut peerset_tx) = + let (config, mut peerset_rx) = spawn_crawler_with_peer_limit(None, success_stay_open_outbound_connector).await; + let mut peer_change_count: usize = 0; + loop { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { + // A peer handshake succeeded. + Ok(Some(peer_change_result)) => { + assert!( + matches!(peer_change_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_change_result, + peer_change_count, + ); + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer tracker open until now. + Ok(Some(peer_tracker)) => { + std::mem::drop(peer_tracker); + peer_tracker_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_change_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, + config.peerset_outbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_outbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, + config.peerset_outbound_connection_limit(), + peer_change_count, + ); +} + +/// Test the listener with an inbound peer limit of zero peers, and a handshaker that panics. +#[tokio::test] +async fn listener_peer_limit_zero_handshake_panic() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let unreachable_inbound_handshaker = service_fn(|_| async { + unreachable!("inbound handshaker should never be called with a zero peer limit") + }); + + let (_config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(0, unreachable_inbound_handshaker).await; + + let peer_result = peerset_rx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when inbound limit is zero: {:?}", + peer_result, + ); +} + +/// Test the listener with an inbound peer limit of one peer, and a handshaker that always errors. +#[tokio::test] +async fn listener_peer_limit_one_handshake_error() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let error_inbound_handshaker = + service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); + + let (_config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(1, error_inbound_handshaker).await; + + let peer_result = peerset_rx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when all handshakes error: {:?}", + peer_result, + ); +} + +/// Test the listener with an inbound peer limit of one peer, +/// and a handshaker that returns success then disconnects the peer. +#[tokio::test] +async fn listener_peer_limit_one_handshake_ok_then_drop() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Actually close the connection. + std::mem::drop(connection_tracker); + std::mem::drop(tcp_stream); + + // Give the crawler time to get the message. + tokio::task::yield_now().await; + + Ok(fake_client) + }); + + let (config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(1, success_disconnect_inbound_handshaker).await; + let mut peer_count: usize = 0; loop { - let peer_result = peerset_tx.try_next(); + let peer_result = peerset_rx.try_next(); match peer_result { // A peer handshake succeeded. Ok(Some(peer_result)) => { @@ -581,10 +812,312 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() { } assert!( - peer_count <= config.peerset_outbound_connection_limit(), - "unexpected number of peer connections {}, over limit of {}", + peer_count > config.peerset_inbound_connection_limit(), + "unexpected number of peer connections {}, should be over the limit of {}", peer_count, - config.peerset_outbound_connection_limit(), + config.peerset_inbound_connection_limit(), + ); +} + +/// Test the listener with an inbound peer limit of one peer, +/// and a handshaker that returns success then holds the peer open. +#[tokio::test] +async fn listener_peer_limit_one_handshake_ok_stay_open() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); + + let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Make the connection staying open. + peer_tracker_tx + .unbounded_send((tcp_stream, connection_tracker)) + .expect("unexpected error sending to unbounded channel"); + + Ok(fake_client) + } + }); + + let (config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(1, success_stay_open_inbound_handshaker).await; + + let mut peer_change_count: usize = 0; + loop { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { + // A peer handshake succeeded. + Ok(Some(peer_change_result)) => { + assert!( + matches!(peer_change_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_change_result, + peer_change_count, + ); + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer connection and tracker open until now. + Ok(Some((peer_connection, peer_tracker))) => { + std::mem::drop(peer_connection); + std::mem::drop(peer_tracker); + peer_tracker_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_change_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, + config.peerset_inbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, + config.peerset_inbound_connection_limit(), + peer_change_count, + ); +} + +/// Test the listener with the default inbound peer limit, and a handshaker that always errors. +#[tokio::test] +async fn listener_peer_limit_default_handshake_error() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let error_inbound_handshaker = + service_fn(|_| async { Err("test inbound handshaker always returns errors".into()) }); + + let (_config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(None, error_inbound_handshaker).await; + + let peer_result = peerset_rx.try_next(); + assert!( + // `Err(_)` means that no peers are available, and the sender has not been dropped. + // `Ok(None)` means that no peers are available, and the sender has been dropped. + matches!(peer_result, Err(_) | Ok(None)), + "unexpected peer when all handshakes error: {:?}", + peer_result, + ); +} + +/// Test the listener with the default inbound peer limit, +/// and a handshaker that returns success then disconnects the peer. +#[tokio::test] +async fn listener_peer_limit_default_handshake_ok_then_drop() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let success_disconnect_inbound_handshaker = service_fn(|req: HandshakeRequest| async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Actually close the connection. + std::mem::drop(connection_tracker); + std::mem::drop(tcp_stream); + + // Give the crawler time to get the message. + tokio::task::yield_now().await; + + Ok(fake_client) + }); + + let (config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(None, success_disconnect_inbound_handshaker).await; + + let mut peer_count: usize = 0; + loop { + let peer_result = peerset_rx.try_next(); + match peer_result { + // A peer handshake succeeded. + Ok(Some(peer_result)) => { + assert!( + matches!(peer_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_result, + peer_count, + ); + peer_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_count > config.peerset_inbound_connection_limit(), + "unexpected number of peer connections {}, should be over the limit of {}", + peer_count, + config.peerset_inbound_connection_limit(), + ); +} + +/// Test the listener with the default inbound peer limit, +/// and a handshaker that returns success then holds the peer open. +#[tokio::test] +async fn listener_peer_limit_default_handshake_ok_stay_open() { + zebra_test::init(); + + // This test requires an IPv4 network stack with 127.0.0.1 as localhost. + if zebra_test::net::zebra_skip_network_tests() { + return; + } + + let (peer_tracker_tx, mut peer_tracker_rx) = mpsc::unbounded(); + + let success_stay_open_inbound_handshaker = service_fn(move |req: HandshakeRequest| { + let peer_tracker_tx = peer_tracker_tx.clone(); + async move { + let HandshakeRequest { + tcp_stream, + connected_addr: _, + connection_tracker, + } = req; + + let (server_tx, _server_rx) = mpsc::channel(0); + let (shutdown_tx, _shutdown_rx) = oneshot::channel(); + let error_slot = ErrorSlot::default(); + + let fake_client = peer::Client { + shutdown_tx: Some(shutdown_tx), + server_tx, + error_slot, + }; + + // Make the connection staying open. + peer_tracker_tx + .unbounded_send((tcp_stream, connection_tracker)) + .expect("unexpected error sending to unbounded channel"); + + Ok(fake_client) + } + }); + + let (config, mut peerset_rx) = + spawn_inbound_listener_with_peer_limit(None, success_stay_open_inbound_handshaker).await; + + let mut peer_change_count: usize = 0; + loop { + let peer_change_result = peerset_rx.try_next(); + match peer_change_result { + // A peer handshake succeeded. + Ok(Some(peer_change_result)) => { + assert!( + matches!(peer_change_result, Ok(Change::Insert(_, _))), + "unexpected connection error: {:?}\n\ + {} previous peers succeeded", + peer_change_result, + peer_change_count, + ); + peer_change_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + let mut peer_tracker_count: usize = 0; + loop { + let peer_tracker_result = peer_tracker_rx.try_next(); + match peer_tracker_result { + // We held this peer connection and tracker open until now. + Ok(Some((peer_connection, peer_tracker))) => { + std::mem::drop(peer_connection); + std::mem::drop(peer_tracker); + peer_tracker_count += 1; + } + + // The channel is closed and there are no messages left in the channel. + Ok(None) => break, + // The channel is still open, but there are no messages left in the channel. + Err(_) => break, + } + } + + assert!( + peer_change_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer changes {}, over limit of {}, had {} peer trackers", + peer_change_count, + config.peerset_inbound_connection_limit(), + peer_tracker_count, + ); + + assert!( + peer_tracker_count <= config.peerset_inbound_connection_limit(), + "unexpected number of peer trackers {}, over limit of {}, had {} peer changes", + peer_tracker_count, + config.peerset_inbound_connection_limit(), + peer_change_count, ); } @@ -801,6 +1334,102 @@ where (config, peerset_rx) } +/// Run an inbound peer listener with `peerset_initial_target_size` and `handshaker`. +/// +/// Uses the default values for all other config fields. +/// +/// Returns the generated [`Config`], and the peer set receiver. +async fn spawn_inbound_listener_with_peer_limit( + peerset_initial_target_size: impl Into>, + listen_handshaker: S, +) -> (Config, mpsc::Receiver) +where + S: Service + + Clone + + Send + + 'static, + S::Future: Send + 'static, +{ + // Create a test config that listens on any unused port. + let listen_addr = "127.0.0.1:0".parse().unwrap(); + let mut config = Config { + listen_addr, + ..Config::default() + }; + + if let Some(peerset_initial_target_size) = peerset_initial_target_size.into() { + config.peerset_initial_target_size = peerset_initial_target_size; + } + + // Open the listener port. + let (tcp_listener, listen_addr) = open_listener(&config.clone()).await; + + // Make enough inbound connections to go over the limit, even if the limit is zero. + // Make the channels large enough to hold all the connections. + let over_limit_connections = config.peerset_inbound_connection_limit() * 2 + 1; + let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_connections); + + // Start listening for connections. + let listen_fut = accept_inbound_connections( + config.clone(), + tcp_listener, + listen_handshaker, + peerset_tx.clone(), + ); + let listen_task_handle = tokio::spawn(listen_fut); + + // Open inbound connections. + let mut outbound_task_handles = Vec::new(); + for _ in 0..over_limit_connections { + let outbound_fut = async move { + let outbound_result = TcpStream::connect(listen_addr).await; + // Let other tasks run before we block on reading. + tokio::task::yield_now().await; + + if let Ok(outbound_stream) = outbound_result { + // Wait until the listener closes the connection. + // The handshaker is fake, so it never sends any data. + let readable_result = outbound_stream.readable().await; + info!( + ?readable_result, + "outbound connection became readable or errored: \ + closing connection to test inbound listener" + ); + } else { + // If the connection is closed quickly, we might get errors here. + debug!( + ?outbound_result, + "outbound connection error in inbound listener test" + ); + } + }; + + let outbound_task_handle = tokio::spawn(outbound_fut); + outbound_task_handles.push(outbound_task_handle); + } + + // Let the listener run for a while. + tokio::time::sleep(LISTENER_TEST_DURATION).await; + + // Stop the listener and outbound tasks, and let them finish. + listen_task_handle.abort(); + for outbound_task_handle in outbound_task_handles { + outbound_task_handle.abort(); + } + tokio::task::yield_now().await; + + // Check for panics or errors in the listener. + let listen_result = listen_task_handle.now_or_never(); + assert!( + matches!(listen_result, None) + || matches!(listen_result, Some(Err(ref e)) if e.is_cancelled()), + "unexpected error or panic in inbound peer listener task: {:?}", + listen_result, + ); + + (config, peerset_rx) +} + /// Initialize a task that connects to `peer_count` initial peers using the /// given connector. /// diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index d224d2568..653185996 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -80,7 +80,7 @@ use crate::{ external::InventoryHash, internal::{Request, Response}, }, - AddressBook, BoxError, + AddressBook, BoxError, Config, }; /// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra. @@ -134,6 +134,8 @@ where /// /// Used for logging diagnostics. address_book: Arc>, + /// The configured limit for inbound and outbound connections. + peerset_total_connection_limit: usize, } impl PeerSet @@ -147,6 +149,7 @@ where { /// Construct a peerset which uses `discover` internally. pub fn new( + config: &Config, discover: D, demand_signal: mpsc::Sender, handle_rx: tokio::sync::oneshot::Receiver>>>, @@ -165,6 +168,7 @@ where inventory_registry: InventoryRegistry::new(inv_stream), last_peer_log: None, address_book, + peerset_total_connection_limit: config.peerset_total_connection_limit(), } } @@ -432,6 +436,17 @@ where metrics::gauge!("pool.num_ready", num_ready as f64); metrics::gauge!("pool.num_unready", num_unready as f64); metrics::gauge!("zcash.net.peers", num_peers as f64); + + // Security: make sure we haven't exceeded the connection limit + if num_peers > self.peerset_total_connection_limit { + let address_metrics = self.address_book.lock().unwrap().address_metrics(); + panic!( + "unexpectedly exceeded configured peer set connection limit: \n\ + peers: {:?}, ready: {:?}, unready: {:?}, \n\ + address_metrics: {:?}", + num_peers, num_ready, num_unready, address_metrics, + ); + } } }