diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 547152e59..a128b59f2 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -8,6 +8,7 @@ use std::{ use futures::prelude::*; use tokio::net::TcpStream; use tower::{discover::Change, Service, ServiceExt}; +use tracing_futures::Instrument; use crate::{BoxError, Request, Response}; @@ -50,13 +51,15 @@ where fn call(&mut self, addr: SocketAddr) -> Self::Future { let mut hs = self.handshaker.clone(); + let connected_addr = ConnectedAddr::new_outbound_direct(addr); + let connector_span = info_span!("connector", peer = ?connected_addr); async move { let stream = TcpStream::connect(addr).await?; hs.ready_and().await?; - let connected_addr = ConnectedAddr::new_outbound_direct(addr); let client = hs.call((stream, connected_addr)).await?; Ok(Change::Insert(addr, client)) } + .instrument(connector_span) .boxed() } } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index c276dbd09..62ca2c992 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -554,7 +554,7 @@ where fn call(&mut self, req: HandshakeRequest) -> Self::Future { let (tcp_stream, connected_addr) = req; - let negotiator_span = span!(Level::INFO, "negotiator", peer = ?connected_addr); + let negotiator_span = debug_span!("negotiator", peer = ?connected_addr); // set the peer connection span's parent to the global span, as it // should exist independently of its creation source (inbound // connection, crawler, initial peer, ...) @@ -633,7 +633,9 @@ where // Instrument the peer's rx and tx streams. + let inner_conn_span = connection_span.clone(); let peer_tx = peer_tx.with(move |msg: Message| { + let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric"); // Add a metric for outbound messages. metrics::counter!( "zcash.net.out.messages", @@ -645,7 +647,7 @@ where // because we need the sink to be Unpin, and the With // returned by .with is Unpin only if Fut is Unpin, and the // futures generated by async blocks are not Unpin. - future::ready(Ok(msg)) + future::ready(Ok(msg)).instrument(span) }); // CORRECTNESS @@ -654,11 +656,15 @@ where // the inbound_ts_collector. let inbound_ts_collector = timestamp_collector.clone(); let inv_collector = inv_collector.clone(); + let ts_inner_conn_span = connection_span.clone(); + let inv_inner_conn_span = connection_span.clone(); let peer_rx = peer_rx .then(move |msg| { // Add a metric for inbound messages and errors. // Fire a timestamp or failure event. let mut inbound_ts_collector = inbound_ts_collector.clone(); + let span = + debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector"); async move { match &msg { Ok(msg) => { @@ -694,10 +700,11 @@ where } msg } + .instrument(span) }) .then(move |msg| { let inv_collector = inv_collector.clone(); - let span = debug_span!("inventory_filter"); + let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter"); async move { if let (Ok(Message::Inv(hashes)), Some(transient_addr)) = (&msg, connected_addr.get_transient_addr()) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index c8dffc0a1..2ed4d8f0c 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -204,16 +204,22 @@ where // an indefinite period. We can use `CallAllUnordered` without filling // the underlying `Inbound` buffer, because we immediately drive this // single `CallAll` to completion, and handshakes have a short timeout. - use tower::util::CallAllUnordered; - let addr_stream = futures::stream::iter(initial_peers.into_iter()); - let mut handshakes = CallAllUnordered::new(outbound_connector, addr_stream); + let mut handshakes: FuturesUnordered<_> = initial_peers + .into_iter() + .map(|addr| { + outbound_connector + .clone() + .oneshot(addr) + .map_err(move |e| (addr, e)) + }) + .collect(); while let Some(handshake_result) = handshakes.next().await { // this is verbose, but it's better than just hanging with no output - if let Err(ref e) = handshake_result { - info!(?e, "an initial peer connection failed"); + if let Err((addr, ref e)) = handshake_result { + info!(?addr, ?e, "an initial peer connection failed"); } - tx.send(handshake_result).await?; + tx.send(handshake_result.map_err(|(_addr, e)| e)).await?; } Ok(()) @@ -251,19 +257,26 @@ where info!("Opened Zcash protocol endpoint at {}", local_addr); loop { if let Ok((tcp_stream, addr)) = listener.accept().await { - debug!(?addr, "got incoming connection"); + let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); + let accept_span = info_span!("listen_accept", peer = ?connected_addr); + let _guard = accept_span.enter(); + + debug!("got incoming connection"); handshaker.ready_and().await?; // TODO: distinguish between proxied listeners and direct listeners - let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); + let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); // Construct a handshake future but do not drive it yet.... let handshake = handshaker.call((tcp_stream, connected_addr)); // ... instead, spawn a new task to handle this connection let mut tx2 = tx.clone(); - tokio::spawn(async move { - if let Ok(client) = handshake.await { - let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + tokio::spawn( + async move { + if let Ok(client) = handshake.await { + let _ = tx2.send(Ok(Change::Insert(addr, client))).await; + } } - }); + .instrument(handshaker_span), + ); } } } @@ -381,15 +394,14 @@ where DemandHandshake { candidate } => { // spawn each handshake into an independent task, so it can make // progress independently of the crawls - let hs_join = - tokio::spawn(dial(candidate, outbound_connector.clone())).map(move |res| { - match res { - Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking with {:?}: {:?} ", candidate, e); - } + let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone())) + .map(move |res| match res { + Ok(crawler_action) => crawler_action, + Err(e) => { + panic!("panic during handshaking with {:?}: {:?} ", candidate, e); } - }); + }) + .instrument(Span::current()); handshakes.push(Box::pin(hs_join)); } DemandCrawl => {