fix(handshake): Add extra timeout logging to peer TCP connections (#6969)

* Add a missing timeout to outbound TCP connections

* Move inbound handshakes into their own function, replacing the manual span

* Delete a useless manual span in zebra_network::config

* Add an extra timeout to the spawned inbound handshake task
This commit is contained in:
teor 2023-06-16 07:11:24 +10:00 committed by GitHub
parent 484f3d746d
commit 1e12a58b5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 99 additions and 63 deletions

View File

@ -320,11 +320,6 @@ impl Config {
Ok(Ok(ip_addrs)) => {
let ip_addrs: Vec<PeerSocketAddr> = ip_addrs.map(canonical_peer_addr).collect();
// if we're logging at debug level,
// the full list of IP addresses will be shown in the log message
let debug_span = debug_span!("", remote_ip_addrs = ?ip_addrs);
let _span_guard = debug_span.enter();
// This log is needed for user debugging, but it's annoying during tests.
#[cfg(not(test))]
info!(seed = ?host, remote_ip_count = ?ip_addrs.len(), "resolved seed peer IP addresses");

View File

@ -83,7 +83,11 @@ pub const PEERSET_BUFFER_SIZE: usize = 3;
/// and receiving a response from a remote peer.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
/// The timeout for handshakes when connecting to new peers.
/// The timeout for connections and handshakes when connecting to new peers.
///
/// Outbound TCP connections must complete within this timeout,
/// then the handshake messages get an additional `HANDSHAKE_TIMEOUT` to complete.
/// (Inbound TCP accepts can't have a timeout, because they are handled by the OS.)
///
/// This timeout should remain small, because it helps stop slow peers getting
/// into the peer set. This is particularly important for network-constrained

View File

@ -7,13 +7,14 @@ use std::{
};
use futures::prelude::*;
use tokio::net::TcpStream;
use tokio::{net::TcpStream, time::timeout};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::chain_tip::{ChainTip, NoChainTip};
use crate::{
constants::HANDSHAKE_TIMEOUT,
peer::{Client, ConnectedAddr, Handshake, HandshakeRequest},
peer_set::ConnectionTracker,
BoxError, PeerSocketAddr, Request, Response,
@ -93,7 +94,7 @@ where
let connector_span = info_span!("connector", peer = ?connected_addr);
async move {
let tcp_stream = TcpStream::connect(*addr).await?;
let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??;
let client = hs
.oneshot(HandshakeRequest::<TcpStream> {
data_stream: tcp_stream,

View File

@ -34,7 +34,7 @@ use zebra_chain::chain_tip::ChainTip;
use crate::{
address_book_updater::AddressBookUpdater,
constants,
constants::{self, HANDSHAKE_TIMEOUT},
meta_addr::{MetaAddr, MetaAddrChange},
peer::{
self, address_is_valid_for_inbound_listeners, HandshakeRequest, MinimumPeerVersion,
@ -100,9 +100,9 @@ pub async fn init<S, C>(
Arc<std::sync::Mutex<AddressBook>>,
)
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
S::Future: Send + 'static,
C: ChainTip + Clone + Send + 'static,
C: ChainTip + Clone + Send + Sync + 'static,
{
// If we want Zebra to operate with no network,
// we should implement a `zebrad` command that doesn't use `zebra-network`.
@ -551,7 +551,7 @@ async fn accept_inbound_connections<S>(
config: Config,
listener: TcpListener,
min_inbound_peer_connection_interval: Duration,
mut handshaker: S,
handshaker: S,
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
) -> Result<(), BoxError>
where
@ -579,6 +579,7 @@ where
None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
},
// This future must wait until new connections are available: it can't have a timeout.
inbound_result = listener.accept() => inbound_result,
};
@ -602,51 +603,26 @@ where
"handshaking on an open inbound peer connection"
);
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
let accept_span = info_span!("listen_accept", peer = ?connected_addr);
let _guard = accept_span.enter();
debug!("got incoming connection");
// # Correctness
//
// Holding the drop guard returned by Span::enter across .await points will
// result in incorrect traces if it yields.
//
// This await is okay because the handshaker's `poll_ready` method always returns Ready.
handshaker.ready().await?;
// TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call(HandshakeRequest {
data_stream: tcp_stream,
connected_addr,
let handshake_task = accept_inbound_handshake(
addr,
handshaker.clone(),
tcp_stream,
connection_tracker,
});
// ... instead, spawn a new task to handle this connection
{
let mut peerset_tx = peerset_tx.clone();
peerset_tx.clone(),
)
.await?;
let handshake_task = tokio::spawn(
async move {
let handshake_result = handshake.await;
if let Ok(client) = handshake_result {
// The connection limit makes sure this send doesn't block
let _ = peerset_tx.send((addr, client)).await;
} else {
debug!(?handshake_result, "error handshaking with inbound peer");
}
}
.instrument(handshaker_span),
);
handshakes.push(Box::pin(handshake_task));
}
// We need to drop the guard before yielding.
std::mem::drop(_guard);
// This timeout helps locate inbound peer connection hangs, see #6763 for details.
handshakes.push(Box::pin(
tokio::time::timeout(
// Only trigger this timeout if the inner handshake timeout fails
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
handshake_task,
)
.inspect_err(|_elapsed| {
info!("timeout in spawned accept_inbound_handshake() task")
}),
));
// Rate-limit inbound connection handshakes.
// But sleep longer after a successful connection,
@ -676,6 +652,63 @@ where
}
}
/// Set up a new inbound connection as a Zcash peer.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`peer::Client`] result over `peerset_tx`.
#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
async fn accept_inbound_handshake<S>(
addr: PeerSocketAddr,
mut handshaker: S,
tcp_stream: TcpStream,
connection_tracker: ConnectionTracker,
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
) -> Result<tokio::task::JoinHandle<()>, BoxError>
where
S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
+ Clone,
S::Future: Send + 'static,
{
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
debug!("got incoming connection");
// # Correctness
//
// Holding the drop guard returned by Span::enter across .await points will
// result in incorrect traces if it yields.
//
// This await is okay because the handshaker's `poll_ready` method always returns Ready.
handshaker.ready().await?;
// TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call(HandshakeRequest {
data_stream: tcp_stream,
connected_addr,
connection_tracker,
});
// ... instead, spawn a new task to handle this connection
let mut peerset_tx = peerset_tx.clone();
let handshake_task = tokio::spawn(
async move {
let handshake_result = handshake.await;
if let Ok(client) = handshake_result {
// The connection limit makes sure this send doesn't block
let _ = peerset_tx.send((addr, client)).await;
} else {
debug!(?handshake_result, "error handshaking with inbound peer");
}
}
.instrument(handshaker_span),
);
Ok(handshake_task)
}
/// An action that the peer crawler can take.
enum CrawlerAction {
/// Drop the demand signal because there are too many pending handshakes.

View File

@ -1453,7 +1453,7 @@ async fn init_with_peer_limit<S>(
default_config: impl Into<Option<Config>>,
) -> Arc<std::sync::Mutex<AddressBook>>
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + Sync + 'static,
S::Future: Send + 'static,
{
// This test might fail on machines with no configured IPv4 addresses
@ -1610,6 +1610,7 @@ where
S: Service<peer::HandshakeRequest<TcpStream>, Response = peer::Client, Error = BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
S::Future: Send + 'static,
{

View File

@ -6,10 +6,7 @@ use futures::FutureExt;
use indexmap::IndexSet;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{
buffer::Buffer,
builder::ServiceBuilder,
util::{BoxCloneService, BoxService},
ServiceExt,
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt,
};
use zebra_chain::{
@ -600,7 +597,12 @@ async fn setup(
// connected peer which responds with isolated_peer_response
Buffer<zebra_network::Client, zebra_network::Request>,
// inbound service
BoxCloneService<zebra_network::Request, zebra_network::Response, BoxError>,
LoadShed<
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
zebra_network::Request,
>,
>,
// outbound peer set (only has the connected peer)
Buffer<
BoxService<zebra_network::Request, zebra_network::Response, BoxError>,
@ -626,11 +628,11 @@ async fn setup(
// Inbound
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx);
// TODO: add a timeout just above the service, if needed
let inbound_service = ServiceBuilder::new()
.boxed_clone()
.load_shed()
.buffer(10)
.service(inbound_service);
.service(BoxService::new(inbound_service));
// State
// UTXO verification doesn't matter for these tests.