Add extra instrumentation for initialize and handshakes (#2122)

* Instrument the crawl task

When we created the crawl task, we forgot to instrument it with the
global span. This fix makes sure that the git and network span appears on
crawl logs.

* Instrument the connector

* Improve handshake instrumentation

Make some spans debug, so there are not too many spans.

* Add the address to initial peer connection errors
This commit is contained in:
teor 2021-05-18 06:49:16 +10:00 committed by GitHub
parent 7969459b19
commit b0b8b2f61a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 46 additions and 24 deletions

View File

@ -8,6 +8,7 @@ use std::{
use futures::prelude::*; use futures::prelude::*;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tower::{discover::Change, Service, ServiceExt}; use tower::{discover::Change, Service, ServiceExt};
use tracing_futures::Instrument;
use crate::{BoxError, Request, Response}; use crate::{BoxError, Request, Response};
@ -50,13 +51,15 @@ where
fn call(&mut self, addr: SocketAddr) -> Self::Future { fn call(&mut self, addr: SocketAddr) -> Self::Future {
let mut hs = self.handshaker.clone(); let mut hs = self.handshaker.clone();
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let connector_span = info_span!("connector", peer = ?connected_addr);
async move { async move {
let stream = TcpStream::connect(addr).await?; let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?; hs.ready_and().await?;
let connected_addr = ConnectedAddr::new_outbound_direct(addr);
let client = hs.call((stream, connected_addr)).await?; let client = hs.call((stream, connected_addr)).await?;
Ok(Change::Insert(addr, client)) Ok(Change::Insert(addr, client))
} }
.instrument(connector_span)
.boxed() .boxed()
} }
} }

View File

@ -554,7 +554,7 @@ where
fn call(&mut self, req: HandshakeRequest) -> Self::Future { fn call(&mut self, req: HandshakeRequest) -> Self::Future {
let (tcp_stream, connected_addr) = req; let (tcp_stream, connected_addr) = req;
let negotiator_span = span!(Level::INFO, "negotiator", peer = ?connected_addr); let negotiator_span = debug_span!("negotiator", peer = ?connected_addr);
// set the peer connection span's parent to the global span, as it // set the peer connection span's parent to the global span, as it
// should exist independently of its creation source (inbound // should exist independently of its creation source (inbound
// connection, crawler, initial peer, ...) // connection, crawler, initial peer, ...)
@ -633,7 +633,9 @@ where
// Instrument the peer's rx and tx streams. // Instrument the peer's rx and tx streams.
let inner_conn_span = connection_span.clone();
let peer_tx = peer_tx.with(move |msg: Message| { let peer_tx = peer_tx.with(move |msg: Message| {
let span = debug_span!(parent: inner_conn_span.clone(), "outbound_metric");
// Add a metric for outbound messages. // Add a metric for outbound messages.
metrics::counter!( metrics::counter!(
"zcash.net.out.messages", "zcash.net.out.messages",
@ -645,7 +647,7 @@ where
// because we need the sink to be Unpin, and the With<Fut, ...> // because we need the sink to be Unpin, and the With<Fut, ...>
// returned by .with is Unpin only if Fut is Unpin, and the // returned by .with is Unpin only if Fut is Unpin, and the
// futures generated by async blocks are not Unpin. // futures generated by async blocks are not Unpin.
future::ready(Ok(msg)) future::ready(Ok(msg)).instrument(span)
}); });
// CORRECTNESS // CORRECTNESS
@ -654,11 +656,15 @@ where
// the inbound_ts_collector. // the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone(); let inbound_ts_collector = timestamp_collector.clone();
let inv_collector = inv_collector.clone(); let inv_collector = inv_collector.clone();
let ts_inner_conn_span = connection_span.clone();
let inv_inner_conn_span = connection_span.clone();
let peer_rx = peer_rx let peer_rx = peer_rx
.then(move |msg| { .then(move |msg| {
// Add a metric for inbound messages and errors. // Add a metric for inbound messages and errors.
// Fire a timestamp or failure event. // Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone(); let mut inbound_ts_collector = inbound_ts_collector.clone();
let span =
debug_span!(parent: ts_inner_conn_span.clone(), "inbound_ts_collector");
async move { async move {
match &msg { match &msg {
Ok(msg) => { Ok(msg) => {
@ -694,10 +700,11 @@ where
} }
msg msg
} }
.instrument(span)
}) })
.then(move |msg| { .then(move |msg| {
let inv_collector = inv_collector.clone(); let inv_collector = inv_collector.clone();
let span = debug_span!("inventory_filter"); let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
async move { async move {
if let (Ok(Message::Inv(hashes)), Some(transient_addr)) = if let (Ok(Message::Inv(hashes)), Some(transient_addr)) =
(&msg, connected_addr.get_transient_addr()) (&msg, connected_addr.get_transient_addr())

View File

@ -204,16 +204,22 @@ where
// an indefinite period. We can use `CallAllUnordered` without filling // an indefinite period. We can use `CallAllUnordered` without filling
// the underlying `Inbound` buffer, because we immediately drive this // the underlying `Inbound` buffer, because we immediately drive this
// single `CallAll` to completion, and handshakes have a short timeout. // single `CallAll` to completion, and handshakes have a short timeout.
use tower::util::CallAllUnordered; let mut handshakes: FuturesUnordered<_> = initial_peers
let addr_stream = futures::stream::iter(initial_peers.into_iter()); .into_iter()
let mut handshakes = CallAllUnordered::new(outbound_connector, addr_stream); .map(|addr| {
outbound_connector
.clone()
.oneshot(addr)
.map_err(move |e| (addr, e))
})
.collect();
while let Some(handshake_result) = handshakes.next().await { while let Some(handshake_result) = handshakes.next().await {
// this is verbose, but it's better than just hanging with no output // this is verbose, but it's better than just hanging with no output
if let Err(ref e) = handshake_result { if let Err((addr, ref e)) = handshake_result {
info!(?e, "an initial peer connection failed"); info!(?addr, ?e, "an initial peer connection failed");
} }
tx.send(handshake_result).await?; tx.send(handshake_result.map_err(|(_addr, e)| e)).await?;
} }
Ok(()) Ok(())
@ -251,19 +257,26 @@ where
info!("Opened Zcash protocol endpoint at {}", local_addr); info!("Opened Zcash protocol endpoint at {}", local_addr);
loop { loop {
if let Ok((tcp_stream, addr)) = listener.accept().await { if let Ok((tcp_stream, addr)) = listener.accept().await {
debug!(?addr, "got incoming connection"); let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
let accept_span = info_span!("listen_accept", peer = ?connected_addr);
let _guard = accept_span.enter();
debug!("got incoming connection");
handshaker.ready_and().await?; handshaker.ready_and().await?;
// TODO: distinguish between proxied listeners and direct listeners // TODO: distinguish between proxied listeners and direct listeners
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr); let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
// Construct a handshake future but do not drive it yet.... // Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, connected_addr)); let handshake = handshaker.call((tcp_stream, connected_addr));
// ... instead, spawn a new task to handle this connection // ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone(); let mut tx2 = tx.clone();
tokio::spawn(async move { tokio::spawn(
if let Ok(client) = handshake.await { async move {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await; if let Ok(client) = handshake.await {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
}
} }
}); .instrument(handshaker_span),
);
} }
} }
} }
@ -381,15 +394,14 @@ where
DemandHandshake { candidate } => { DemandHandshake { candidate } => {
// spawn each handshake into an independent task, so it can make // spawn each handshake into an independent task, so it can make
// progress independently of the crawls // progress independently of the crawls
let hs_join = let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone()))
tokio::spawn(dial(candidate, outbound_connector.clone())).map(move |res| { .map(move |res| match res {
match res { Ok(crawler_action) => crawler_action,
Ok(crawler_action) => crawler_action, Err(e) => {
Err(e) => { panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
} }
}); })
.instrument(Span::current());
handshakes.push(Box::pin(hs_join)); handshakes.push(Box::pin(hs_join));
} }
DemandCrawl => { DemandCrawl => {