fix(test) switch zebrad to a non-blocking tracing logger (#5032)

* adds non-blocking writer for tracing subscriber

* use non_blocking writer for the fmt::Layer with the tokio-console feature as well

* adds doc comment to _guard field

* adds acceptance test

* update filter_handle type to use NonBlocking

* adds more detail on lossy non-blocking writer and sets tracing.filter to "trace" in acceptance test

* drops ZebradApp before process::exit(1) in the event of a FrameworkError

* reduces buffered lines limit to 8000

* adds tracing.buffer_limit config and some comments

* update acceptance.rs

* fix acceptance test

* fixes ambigious phrasing in comment

* updates zebrad/src/application.rs

* Find out what the join error is in the GitHub runner tests

* updates acceptance test to use recv_timeout instead of always waiting 10 seconds, removes unnecessary echo command, and reduces # of rpc requests to 500

* see if sleeping for a few seconds before exiting helps the macOS test pass

* Expand exit sleep docs

Co-authored-by: Arya <aryasolhi@gmail.com>

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Arya 2022-09-07 03:39:30 -04:00 committed by GitHub
parent fb2a1e8595
commit d9fae6e311
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 148 additions and 4 deletions

12
Cargo.lock generated
View File

@ -5532,6 +5532,17 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-appender"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e"
dependencies = [
"crossbeam-channel",
"time 0.3.14",
"tracing-subscriber 0.3.11",
]
[[package]]
name = "tracing-attributes"
version = "0.1.19"
@ -6638,6 +6649,7 @@ dependencies = [
"tonic-build",
"tower",
"tracing",
"tracing-appender",
"tracing-error",
"tracing-flame",
"tracing-futures",

View File

@ -108,6 +108,11 @@ impl FinalizedState {
// So we want to drop it before we exit.
std::mem::drop(new_state);
// Drops tracing log output that's hasn't already been written to stdout
// since this exits before calling drop on the WorkerGuard for the logger thread.
// This is okay for now because this is test-only code
//
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
Self::exit_process();
}
}
@ -356,6 +361,11 @@ impl FinalizedState {
// We're just about to do a forced exit, so it's ok to do a forced db shutdown
self.db.shutdown(true);
// Drops tracing log output that's hasn't already been written to stdout
// since this exits before calling drop on the WorkerGuard for the logger thread.
// This is okay for now because this is test-only code
//
// TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout
Self::exit_process();
}
@ -392,6 +402,13 @@ impl FinalizedState {
let _ = stdout().lock().flush();
let _ = stderr().lock().flush();
// Give some time to logger thread to flush out any remaining lines to stdout
// and yield so that tests pass on MacOS
std::thread::sleep(std::time::Duration::from_secs(3));
// Exits before calling drop on the WorkerGuard for the logger thread,
// dropping any lines that haven't already been written to stdout.
// This is okay for now because this is test-only code
std::process::exit(0);
}
}

View File

@ -99,6 +99,7 @@ tinyvec = { version = "1.6.0", features = ["rustc_1_55"] }
thiserror = "1.0.34"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
tracing-appender = "0.2.2"
tracing-error = "0.2.0"
tracing-futures = "0.2.5"
tracing = "0.1.31"

View File

@ -6,7 +6,7 @@ use self::entry_point::EntryPoint;
use std::{fmt::Write as _, io::Write as _, process};
use abscissa_core::{
application::{self, fatal_error, AppCell},
application::{self, AppCell},
config::{self, Configurable},
status_err,
terminal::{component::Terminal, stderr, stdout, ColorChoice},
@ -18,6 +18,13 @@ use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR};
use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradConfig};
/// See <https://docs.rs/abscissa_core/latest/src/abscissa_core/application/exit.rs.html#7-10>
/// Print a fatal error message and exit
fn fatal_error(app_name: String, err: &dyn std::error::Error) -> ! {
status_err!("{} fatal error: {}", app_name, err);
process::exit(1)
}
/// Application state
pub static APPLICATION: AppCell<ZebradApp> = AppCell::new();
@ -462,7 +469,11 @@ impl Application for ZebradApp {
let _ = stderr().lock().flush();
if let Err(e) = self.state().components.shutdown(self, shutdown) {
fatal_error(self, &e)
let app_name = self.name().to_string();
// Swap out a fake app so we can trigger the destructor on the original
let _ = std::mem::take(self);
fatal_error(app_name, &e);
}
// Swap out a fake app so we can trigger the destructor on the original

View File

@ -3,9 +3,15 @@
use abscissa_core::{Component, FrameworkError, Shutdown};
use tracing_error::ErrorLayer;
use tracing_subscriber::{
fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter,
fmt::{format, Formatter},
layer::SubscriberExt,
reload::Handle,
util::SubscriberInitExt,
EnvFilter,
};
use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
use crate::{application::app_version, config::TracingSection};
#[cfg(feature = "flamegraph")]
@ -16,7 +22,12 @@ pub struct Tracing {
/// The installed filter reloading handle, if enabled.
//
// TODO: when fmt::Subscriber supports per-layer filtering, remove the Option
filter_handle: Option<Handle<EnvFilter, Formatter>>,
filter_handle: Option<
Handle<
EnvFilter,
Formatter<format::DefaultFields, format::Format<format::Full>, NonBlocking>,
>,
>,
/// The originally configured filter.
initial_filter: String,
@ -24,6 +35,10 @@ pub struct Tracing {
/// The installed flame graph collector, if enabled.
#[cfg(feature = "flamegraph")]
flamegrapher: Option<flame::Grapher>,
/// Drop guard for worker thread of non-blocking logger,
/// responsible for flushing any remaining logs when the program terminates
_guard: WorkerGuard,
}
impl Tracing {
@ -32,6 +47,13 @@ impl Tracing {
let filter = config.filter.unwrap_or_else(|| "".to_string());
let flame_root = &config.flamegraph;
// Builds a lossy NonBlocking logger with a default line limit of 128_000 or an explicit buffer_limit.
// The write method queues lines down a bounded channel with this capacity to a worker thread that writes to stdout.
// Increments error_counter and drops lines when the buffer is full.
let (non_blocking, _guard) = NonBlockingBuilder::default()
.buffered_lines_limit(config.buffer_limit.max(100))
.finish(std::io::stdout());
// Only use color if tracing output is being sent to a terminal or if it was explicitly
// forced to.
let use_color =
@ -47,6 +69,7 @@ impl Tracing {
let logger = FmtSubscriber::builder()
.with_ansi(use_color)
.with_writer(non_blocking)
.with_env_filter(&filter);
// Enable reloading if that feature is selected.
@ -82,6 +105,7 @@ impl Tracing {
// Using `FmtSubscriber` as the base subscriber, all the logs get filtered.
let logger = fmt::Layer::new()
.with_ansi(use_color)
.with_writer(non_blocking)
.with_filter(EnvFilter::from(&filter));
let subscriber = subscriber.with(logger);
@ -185,6 +209,7 @@ impl Tracing {
initial_filter: filter,
#[cfg(feature = "flamegraph")]
flamegrapher,
_guard,
})
}

View File

@ -93,6 +93,12 @@ pub struct TracingSection {
/// verification of every 1000th block.
pub filter: Option<String>,
/// The buffer_limit size sets the number of log lines that can be queued by the tracing subscriber
/// to be written to stdout before logs are dropped.
///
/// Defaults to 128,000 with a minimum of 100.
pub buffer_limit: usize,
/// The address used for an ad-hoc RPC endpoint allowing dynamic control of the tracing filter.
///
/// Install Zebra using `cargo install --features=filter-reload` to enable this config.
@ -140,6 +146,7 @@ impl Default for TracingSection {
use_color: true,
force_use_color: false,
filter: None,
buffer_limit: 128_000,
endpoint_addr: None,
flamegraph: None,
use_journald: false,

View File

@ -1239,6 +1239,77 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> {
Ok(())
}
#[test]
fn non_blocking_logger() -> Result<()> {
use futures::FutureExt;
use std::{sync::mpsc, time::Duration};
let rt = tokio::runtime::Runtime::new().unwrap();
let (done_tx, done_rx) = mpsc::channel();
let test_task_handle: tokio::task::JoinHandle<Result<()>> = rt.spawn(async move {
let _init_guard = zebra_test::init();
// Write a configuration that has RPC listen_addr set
// [Note on port conflict](#Note on port conflict)
let mut config = random_known_rpc_port_config(false)?;
config.tracing.filter = Some("trace".to_string());
config.tracing.buffer_limit = 100;
let zebra_rpc_address = config.rpc.listen_addr.unwrap();
let dir = testdir()?.with_config(&mut config)?;
let mut child = dir.spawn_child(args!["start"])?;
// Wait until port is open.
child.expect_stdout_line_matches(
format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(),
)?;
// Create an http client
let client = reqwest::Client::new();
// Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe,
// fill the channel that tracing logs are queued onto, and drop logs rather than block execution.
for _ in 0..500 {
let res = client
.post(format!("http://{}", &zebra_rpc_address))
.body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#)
.header("Content-Type", "application/json")
.send()
.await?;
// Test that zebrad rpc endpoint is still responding to requests
assert!(res.status().is_success());
}
child.kill(false)?;
let output = child.wait_with_output()?;
let output = output.assert_failure()?;
// [Note on port conflict](#Note on port conflict)
output
.assert_was_killed()
.wrap_err("Possible port conflict. Are there other acceptance tests running?")?;
done_tx.send(())?;
Ok(())
});
// Wait until the spawned task finishes or return an error in 45 seconds
if done_rx.recv_timeout(Duration::from_secs(45)).is_err() {
return Err(eyre!("unexpected test task hang"));
}
rt.shutdown_timeout(Duration::from_secs(3));
match test_task_handle.now_or_never() {
Some(Ok(result)) => result,
Some(Err(error)) => Err(eyre!("join error: {:?}", error)),
None => Err(eyre!("unexpected test task hang")),
}
}
/// Make sure `lightwalletd` works with Zebra, when both their states are empty.
///
/// This test only runs when the `ZEBRA_TEST_LIGHTWALLETD` env var is set.