feat(diagnostics): Add tokio-console support to zebrad (#4519)

* Always activate tokio/tracing feature

And always build tests with all tokio features.

* Refactor tracing-subscriber init to simplify it

* Add the tokio-console feature and dependencies

* Add optional tokio-console support, and log the installed tracing layers at info level

Uses a tracing Registry for tokio-console, and a fmt::Subscriber otherwise.

* Add some TODOs based on tracing-subscriber features

* Fix up some spans

* Add a TODO for fixing a log filter bug in tokio-console mode
This commit is contained in:
teor 2022-06-15 16:43:20 +10:00 committed by GitHub
parent cc75c3f5f9
commit 83b4e6f975
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 211 additions and 56 deletions

View File

@ -1,4 +1,6 @@
# Zebra cargo configuration
# Flags that apply to all Zebra crates and configurations
[target.'cfg(all())']
rustflags = [
# Zebra standard lints for Rust 1.58+

41
Cargo.lock generated
View File

@ -854,6 +854,42 @@ dependencies = [
"winapi",
]
[[package]]
name = "console-api"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06c5fd425783d81668ed68ec98408a80498fb4ae2fd607797539e1a9dfa3618f"
dependencies = [
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31432bc31ff8883bf6a693a79371862f73087822470c82d6a1ec778781ee3978"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.11",
]
[[package]]
name = "const-oid"
version = "0.6.2"
@ -5428,9 +5464,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.22"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03cfcb51380632a72d3111cb8d3447a8d908e577d31beeac006f836383d29a23"
checksum = "f54c8ca710e81886d498c2fd3331b56c93aa248d49de2222ad2742247c60072f"
dependencies = [
"lazy_static",
"valuable",
@ -6403,6 +6439,7 @@ dependencies = [
"atty",
"chrono",
"color-eyre",
"console-subscriber",
"dirs",
"futures",
"gumdrop",

View File

@ -18,9 +18,11 @@ tracing-futures = "0.2.5"
color-eyre = "0.6.1"
ed25519-zebra = "3.0.0"
rand = { version = "0.8.5", package = "rand" }
tokio = { version = "1.19.2", features = ["full"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
tokio-test = "0.4.2"
tower-fallback = { path = "../tower-fallback/" }
tower-test = "0.4.0"
tracing = "0.1.31"
zebra-test = { path = "../zebra-test/" }

View File

@ -12,5 +12,6 @@ futures-core = "0.3.21"
tracing = "0.1.31"
[dev-dependencies]
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
zebra-test = { path = "../zebra-test/" }
tokio = { version = "1.19.2", features = ["full"] }

View File

@ -62,7 +62,8 @@ proptest-derive = { version = "0.3.0", optional = true }
rand = { version = "0.8.5", optional = true, package = "rand" }
rand_chacha = { version = "0.3.1", optional = true }
tokio = { version = "1.19.2", optional = true }
tokio = { version = "1.19.2", features = ["tracing"], optional = true }
# ZF deps
ed25519-zebra = "3.0.0"
@ -79,10 +80,11 @@ tracing = "0.1.31"
proptest = "0.10.1"
proptest-derive = "0.3.0"
rand = { version = "0.8.5", package = "rand" }
rand_chacha = "0.3.1"
tokio = "1.19.2"
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
zebra-test = { path = "../zebra-test/" }

View File

@ -57,7 +57,8 @@ proptest = "0.10.1"
proptest-derive = "0.3.0"
rand07 = { package = "rand", version = "0.7" }
spandoc = "0.2.2"
tokio = { version = "1.19.2", features = ["full"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
tracing-error = "0.2.0"
tracing-subscriber = "0.3.11"

View File

@ -53,7 +53,7 @@ proptest = "0.10.1"
proptest-derive = "0.3.0"
static_assertions = "1.1.0"
tokio = { version = "1.19.2", features = ["test-util"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
toml = "0.5.9"
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }

View File

@ -25,7 +25,6 @@ use tokio_stream::wrappers::IntervalStream;
use tower::{
buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
};
use tracing::Span;
use tracing_futures::Instrument;
use zebra_chain::{chain_tip::ChainTip, parameters::Network};
@ -179,7 +178,7 @@ where
listen_handshaker,
peerset_tx.clone(),
);
let listen_guard = tokio::spawn(listen_fut.instrument(Span::current()));
let listen_guard = tokio::spawn(listen_fut.in_current_span());
// 2. Initial peers, specified in the config.
let initial_peers_fut = add_initial_peers(
@ -188,7 +187,7 @@ where
peerset_tx.clone(),
address_book_updater,
);
let initial_peers_join = tokio::spawn(initial_peers_fut.instrument(Span::current()));
let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
// 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
@ -228,7 +227,7 @@ where
peerset_tx,
active_outbound_connections,
);
let crawl_guard = tokio::spawn(crawl_fut.instrument(Span::current()));
let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
handle_tx
.send(vec![listen_guard, crawl_guard, address_book_updater_guard])
@ -646,15 +645,20 @@ enum CrawlerAction {
///
/// Uses `active_outbound_connections` to limit the number of active outbound connections
/// across both the initial peers and crawler. The limit is based on `config`.
#[instrument(skip(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
))]
#[instrument(
skip(
config,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
active_outbound_connections,
),
fields(
new_peer_interval = ?config.crawl_new_peer_interval,
)
)]
async fn crawl_and_dial<C, S>(
config: Config,
mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
@ -761,7 +765,8 @@ where
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
})
.instrument(Span::current());
.in_current_span();
handshakes.push(Box::pin(hs_join));
}
DemandCrawl => {

View File

@ -48,7 +48,8 @@ proptest = "0.10.1"
proptest-derive = "0.3.0"
serde_json = "1.0.81"
thiserror = "1.0.31"
tokio = { version = "1.19.2", features = ["full", "test-util"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-state = { path = "../zebra-state", features = ["proptest-impl"] }

View File

@ -27,7 +27,8 @@ rocksdb = { version = "0.18.0", default_features = false, features = ["lz4"] }
serde = { version = "1.0.137", features = ["serde_derive"] }
tempfile = "3.3.0"
thiserror = "1.0.31"
tokio = { version = "1.19.2", features = ["sync"] }
tokio = { version = "1.19.2", features = ["sync", "tracing"] }
tower = { version = "0.4.12", features = ["buffer", "util"] }
tracing = "0.1.31"
@ -48,7 +49,7 @@ proptest-derive = "0.3.0"
halo2 = { package = "halo2_proofs", version = "0.1.0" }
jubjub = "0.9.0"
tokio = { version = "1.19.2", features = ["full"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] }
zebra-test = { path = "../zebra-test/" }

View File

@ -17,7 +17,7 @@ once_cell = "1.12.0"
rand = { version = "0.8.5", package = "rand" }
regex = "1.5.6"
tokio = { version = "1.19.2", features = ["full"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
tower = { version = "0.4.12", features = ["util"] }
futures = "0.3.21"

View File

@ -42,6 +42,19 @@ proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl", "ze
# The gRPC tests also need an installed lightwalletd binary
lightwalletd-grpc-tests = ["tonic-build"]
# tokio-console support
#
# To activate this feature, run:
# ```sh
# RUSTFLAGS="--cfg tokio_unstable" cargo build --no-default-features --features="tokio-console" --bin zebrad
# ```
#
# The console-subscriber is incompatible with the tracing/max_level_* features.
#
# For more details, see:
# https://github.com/tokio-rs/console/blob/main/console-subscriber/README.md#enabling-tokio-instrumentation
tokio-console = ["console-subscriber"]
# TODO: replace with environmental variables that skip the tests when not set (part of #2995)
test_sync_to_mandatory_checkpoint_mainnet = []
test_sync_to_mandatory_checkpoint_testnet = []
@ -105,6 +118,9 @@ log = "0.4.17"
proptest = { version = "0.10.1", optional = true }
proptest-derive = { version = "0.3.0", optional = true }
# test feature tokio-console
console-subscriber = { version = "0.1.6", optional = true }
[build-dependencies]
vergen = { version = "7.2.1", default-features = false, features = ["cargo", "git"] }
@ -121,7 +137,8 @@ semver = "1.0.10"
# zebra-rpc needs the preserve_order feature, it also makes test results more stable
serde_json = { version = "1.0.81", features = ["preserve_order"] }
tempfile = "3.3.0"
tokio = { version = "1.19.2", features = ["full", "test-util"] }
tokio = { version = "1.19.2", features = ["full", "tracing", "test-util"] }
tokio-stream = "0.1.9"
# test feature lightwalletd-grpc-tests

View File

@ -4,7 +4,6 @@ use abscissa_core::{Component, FrameworkError, FrameworkErrorKind, Shutdown};
use tracing_error::ErrorLayer;
use tracing_subscriber::{
fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter,
FmtSubscriber,
};
use crate::{application::app_version, config::TracingSection};
@ -13,7 +12,15 @@ use super::flame;
/// Abscissa component for initializing the `tracing` subsystem
pub struct Tracing {
filter_handle: Handle<EnvFilter, Formatter>,
/// The installed filter reloading handle, if enabled.
//
// TODO: when fmt::Subscriber supports per-layer filtering, remove the Option
filter_handle: Option<Handle<EnvFilter, Formatter>>,
/// The originally configured filter.
initial_filter: String,
/// The installed flame graph collector, if enabled.
flamegrapher: Option<flame::Grapher>,
}
@ -28,58 +35,129 @@ impl Tracing {
let use_color =
config.force_use_color || (config.use_color && atty::is(atty::Stream::Stdout));
// Construct a tracing subscriber with the supplied filter and enable reloading.
let builder = FmtSubscriber::builder()
.with_ansi(use_color)
.with_env_filter(&filter)
.with_filter_reloading();
let filter_handle = builder.reload_handle();
// Construct a format subscriber with the supplied global logging filter, and enable reloading.
// TODO: when fmt::Subscriber supports per-layer filtering, always enable this code
#[cfg(not(all(feature = "tokio-console", tokio_unstable)))]
let (subscriber, filter_handle) = {
use tracing_subscriber::FmtSubscriber;
let logger = FmtSubscriber::builder()
.with_ansi(use_color)
.with_env_filter(&filter)
.with_filter_reloading();
let filter_handle = logger.reload_handle();
let subscriber = logger.finish().with(ErrorLayer::default());
(subscriber, Some(filter_handle))
};
// Construct a tracing registry with the supplied per-layer logging filter,
// and disable filter reloading.
//
// TODO: when fmt::Subscriber supports per-layer filtering,
// remove this registry code, and layer tokio-console on top of fmt::Subscriber
#[cfg(all(feature = "tokio-console", tokio_unstable))]
let (subscriber, filter_handle) = {
use tracing_subscriber::{fmt, Layer};
let subscriber = tracing_subscriber::registry();
// TODO: find out why crawl_and_dial and try_to_sync evade this filter,
// and why they also don't get the global net/commit span
//
// Using `registry` as the base subscriber, the logs from most other functions get filtered.
// Using `FmtSubscriber` as the base subscriber, all the logs get filtered.
let logger = fmt::Layer::new()
.with_ansi(use_color)
.with_filter(EnvFilter::from(&filter));
let subscriber = subscriber.with(logger);
let span_logger = ErrorLayer::default().with_filter(EnvFilter::from(&filter));
let subscriber = subscriber.with(span_logger);
(subscriber, None)
};
// Add optional layers based on dynamic and compile-time configs
// Add a flamegraph
let (flamelayer, flamegrapher) = if let Some(path) = flame_root {
let (flamelayer, flamegrapher) = flame::layer(path);
(Some(flamelayer), Some(flamegrapher))
} else {
(None, None)
};
let subscriber = subscriber.with(flamelayer);
let journaldlayer = if config.use_journald {
let layer = tracing_journald::layer()
.map_err(|e| FrameworkErrorKind::ComponentError.context(e))?;
// If the global filter can't be used, add a per-layer filter instead.
// TODO: when fmt::Subscriber supports per-layer filtering, always enable this code
#[cfg(all(feature = "tokio-console", tokio_unstable))]
let layer = {
use tracing_subscriber::Layer;
layer.with_filter(EnvFilter::from(&filter))
};
Some(layer)
} else {
None
};
let subscriber = builder.finish().with(ErrorLayer::default());
let subscriber = subscriber.with(journaldlayer);
#[cfg(feature = "enable-sentry")]
let subscriber = subscriber.with(sentry_tracing::layer());
match (flamelayer, journaldlayer) {
(None, None) => subscriber.init(),
(Some(layer1), None) => subscriber.with(layer1).init(),
(None, Some(layer2)) => subscriber.with(layer2).init(),
(Some(layer1), Some(layer2)) => subscriber.with(layer1).with(layer2).init(),
};
// spawn the console server in the background, and apply the console layer
// TODO: set Builder::poll_duration_histogram_max() if needed
#[cfg(all(feature = "tokio-console", tokio_unstable))]
let subscriber = subscriber.with(console_subscriber::spawn());
// Initialise the global tracing subscriber
subscriber.init();
// Log the tracing stack we just created
tracing::info!(
?filter,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"started tracing component",
);
if flame_root.is_some() {
info!("installed flamegraph tracing layer");
}
if config.use_journald {
info!(?filter, "installed journald tracing layer");
}
#[cfg(feature = "enable-sentry")]
info!("installed sentry tracing layer");
#[cfg(all(feature = "tokio-console", tokio_unstable))]
info!(
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"installed tokio-console tracing layer",
);
Ok(Self {
filter_handle,
initial_filter: filter,
flamegrapher,
})
}
/// Return the currently-active tracing filter.
pub fn filter(&self) -> String {
self.filter_handle
.with_current(|filter| filter.to_string())
.expect("the subscriber is not dropped before the component is")
if let Some(filter_handle) = self.filter_handle.as_ref() {
filter_handle
.with_current(|filter| filter.to_string())
.expect("the subscriber is not dropped before the component is")
} else {
self.initial_filter.clone()
}
}
/// Reload the currently-active filter with the supplied value.
@ -87,18 +165,26 @@ impl Tracing {
/// This can be used to provide a dynamic tracing filter endpoint.
pub fn reload_filter(&self, filter: impl Into<EnvFilter>) {
let filter = filter.into();
let filter_str = filter.to_string();
self.filter_handle
.reload(filter)
.expect("the subscriber is not dropped before the component is");
if let Some(filter_handle) = self.filter_handle.as_ref() {
tracing::info!(
?filter,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"reloading tracing filter",
);
tracing::info!(
filter = ?filter_str,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"reloaded tracing filter",
);
filter_handle
.reload(filter)
.expect("the subscriber is not dropped before the component is");
} else {
tracing::warn!(
?filter,
TRACING_STATIC_MAX_LEVEL = ?tracing::level_filters::STATIC_MAX_LEVEL,
LOG_STATIC_MAX_LEVEL = ?log::STATIC_MAX_LEVEL,
"attempted to reload tracing filter, but filter reloading is disabled",
);
}
}
}