feat(ui): Add a terminal-based progress bar to Zebra (#6235)

* Implement Display and to_string() for NetworkUpgrade

* Add a progress-bar feature to zebrad

* Add the progress bar writer to the tracing component

* Add a block progress bar transmitter

* Correctly shut down the progress bar, and shut it down on an interrupt

* Make it clearer that the progress task never exits

* Add a config for writing logs to a file

* Add a progress-bar feature to zebra-network

* Add a progress bar for the address book size

* Add progress bars for never attempted and failed peers

* Add an optional limit and label to connection counters

* Add open connection progress bars

* Improve CheckpointList API and CheckpointVerifier debugging

* Add checkpoint index and checkpoint queue progress bars

* Security: Limit the number of non-finalized chains tracked by Zebra

* Make some NonFinalizedState methods available with proptest-impl

* Add a non-finalized chain count progress bar

* Track the last fork height for newly forked chains

* Add a should_count_metrics to Chain

* Add a display method for PartialCumulativeWork

* Add a progress bar for each chain fork

* Add a NonFinalizedState::disable_metrics() method and switch to using it

* Move metrics out of Chain because we can't update Arc<Chain>

* Fix: consistently use best chain order when searching chains

* Track Chain progress bars in NonFinalizedState

* Display work as bits, not a multiple of the target difficulty

* Handle negative fork lengths by reporting "No fork"

* Correctly disable unused fork bars

* clippy: rewrite using `match _.cmp(_) { ... }`

* Initial mempool progress bar implementation

* Update Cargo.lock

* Add the actual transaction size as a description to the cost bar

* Only show mempool progress bars after first activation

* Add queued and rejected mempool progress bars

* Clarify cost note is actual size

* Add tracing.log_file config and progress-bar feature to zebrad docs

* Derive Clone for Chain

* Upgrade to howudoin 0.1.2 and remove some bug workarounds

* Directly call the debug formatter to Display a Network

Co-authored-by: Arya <aryasolhi@gmail.com>

* Rename the address count metric to num_addresses

Co-authored-by: Arya <aryasolhi@gmail.com>

* Simplify reverse checkpoint lookup

Co-authored-by: Arya <aryasolhi@gmail.com>

* Simplify progress bar shutdown code

Co-authored-by: Arya <aryasolhi@gmail.com>

* Remove unused MIN_TRANSPARENT_TX_MEMPOOL_SIZE

* Document that the progress task runs forever

* Fix progress log formatting

* If progress-bar is on, log to a file by default

* Create missing directories for log files

* Add file security docs for running Zebra with elevated permissions

* Document automatic log file, spell progress-bar correctly

---------

Co-authored-by: Arya <aryasolhi@gmail.com>
This commit is contained in:
teor 2023-04-13 18:42:17 +10:00 committed by GitHub
parent 804b63553c
commit 166526a088
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1149 additions and 87 deletions

View File

@ -806,6 +806,7 @@ dependencies = [
"encode_unicode",
"lazy_static",
"libc",
"unicode-width",
"windows-sys 0.42.0",
]
@ -1456,6 +1457,19 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "flume"
version = "0.10.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"pin-project",
"spin 0.9.7",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1632,8 +1646,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
"libc",
"wasi 0.11.0+wasi-snapshot-preview1",
"wasm-bindgen",
]
[[package]]
@ -1890,6 +1906,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "howudoin"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34059280f617a59ee59a0455e93460d67e5c76dec42dd262d38f0f390f437b2"
dependencies = [
"flume",
"indicatif",
"parking_lot 0.12.1",
]
[[package]]
name = "http"
version = "0.2.9"
@ -2088,6 +2115,18 @@ dependencies = [
"serde",
]
[[package]]
name = "indicatif"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"unicode-width",
]
[[package]]
name = "inferno"
version = "0.11.15"
@ -2604,6 +2643,15 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "nanorand"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
dependencies = [
"getrandom 0.2.8",
]
[[package]]
name = "native-tls"
version = "0.2.11"
@ -2710,6 +2758,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.30.3"
@ -3676,7 +3730,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -4273,6 +4327,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0959fd6f767df20b231736396e4f602171e00d95205676286e79d4a4eb67bef"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -5732,6 +5795,7 @@ dependencies = [
"futures-util",
"halo2_proofs",
"hex",
"howudoin",
"jubjub 0.9.0",
"lazy_static",
"metrics",
@ -5771,6 +5835,7 @@ dependencies = [
"chrono",
"futures",
"hex",
"howudoin",
"humantime-serde",
"indexmap",
"lazy_static",
@ -5861,6 +5926,7 @@ dependencies = [
"futures",
"halo2_proofs",
"hex",
"howudoin",
"indexmap",
"insta",
"itertools",
@ -5948,9 +6014,11 @@ dependencies = [
"futures",
"gumdrop",
"hex",
"howudoin",
"humantime-serde",
"hyper",
"indexmap",
"indicatif",
"inferno",
"jsonrpc-core",
"lazy_static",

View File

@ -50,6 +50,13 @@ pub enum NetworkUpgrade {
Nu5,
}
impl fmt::Display for NetworkUpgrade {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// Same as the debug representation for now
fmt::Debug::fmt(self, f)
}
}
/// Mainnet network upgrade activation heights.
///
/// This is actually a bijective map, but it is const, so we use a vector, and

View File

@ -29,9 +29,12 @@ pub use joinsplit::JoinSplitData;
pub use lock_time::LockTime;
pub use memo::Memo;
pub use sapling::FieldNotPresent;
pub use serialize::SerializedTransaction;
pub use serialize::{
SerializedTransaction, MIN_TRANSPARENT_TX_SIZE, MIN_TRANSPARENT_TX_V4_SIZE,
MIN_TRANSPARENT_TX_V5_SIZE,
};
pub use sighash::{HashType, SigHash};
pub use unmined::{UnminedTx, UnminedTxId, VerifiedUnminedTx};
pub use unmined::{UnminedTx, UnminedTxId, VerifiedUnminedTx, MEMPOOL_TRANSACTION_COST_THRESHOLD};
use crate::{
amount::{Amount, Error as AmountError, NegativeAllowed, NonNegative},

View File

@ -962,9 +962,19 @@ pub(crate) const MIN_TRANSPARENT_OUTPUT_SIZE: u64 = 8 + 1;
///
/// Shielded transfers are much larger than transparent transfers,
/// so this is the minimum transaction size.
pub(crate) const MIN_TRANSPARENT_TX_SIZE: u64 =
pub const MIN_TRANSPARENT_TX_SIZE: u64 =
MIN_TRANSPARENT_INPUT_SIZE + 4 + MIN_TRANSPARENT_OUTPUT_SIZE;
/// The minimum transaction size for v4 transactions.
///
/// v4 transactions also have an expiry height.
pub const MIN_TRANSPARENT_TX_V4_SIZE: u64 = MIN_TRANSPARENT_TX_SIZE + 4;
/// The minimum transaction size for v5 transactions.
///
/// v5 transactions also have an expiry height and a consensus branch ID.
pub const MIN_TRANSPARENT_TX_V5_SIZE: u64 = MIN_TRANSPARENT_TX_SIZE + 4 + 4;
/// No valid Zcash message contains more transactions than can fit in a single block
///
/// `tx` messages contain a single transaction, and `block` messages are limited to the maximum

View File

@ -54,7 +54,7 @@ mod zip317;
/// > transparent transactions because of their size.
///
/// [ZIP-401]: https://zips.z.cash/zip-0401
const MEMPOOL_TRANSACTION_COST_THRESHOLD: u64 = 4000;
pub const MEMPOOL_TRANSACTION_COST_THRESHOLD: u64 = 4000;
/// When a transaction pays a fee less than the conventional fee,
/// this low fee penalty is added to its cost for mempool eviction.

View File

@ -664,6 +664,34 @@ impl PartialCumulativeWork {
pub fn as_u128(self) -> u128 {
self.0
}
/// Returns a floating-point work multiplier that can be used for display.
/// The returned value is the work as a multiple of the target difficulty limit for `network`.
pub fn difficulty_multiplier_for_display(&self, network: Network) -> f64 {
// This calculation is similar to the `getdifficulty` RPC, see that code for details.
let pow_limit = ExpandedDifficulty::target_difficulty_limit(network)
.to_compact()
.to_work()
.expect("target difficult limit is valid work");
// Convert to u128 then f64.
let pow_limit = pow_limit.as_u128() as f64;
let work = self.as_u128() as f64;
work / pow_limit
}
/// Returns floating-point work bits that can be used for display.
/// The returned value is the number of hash bits represented by the work.
pub fn difficulty_bits_for_display(&self) -> f64 {
// This calculation is similar to `zcashd`'s bits display in its logs.
// Convert to u128 then f64.
let work = self.as_u128() as f64;
work.log2()
}
}
impl From<Work> for PartialCumulativeWork {

View File

@ -10,6 +10,11 @@ default = []
# Production features that activate extra dependencies, or extra features in dependencies
progress-bar = [
"howudoin",
"zebra-state/progress-bar",
]
# Experimental mining RPC support
getblocktemplate-rpcs = [
"zebra-state/getblocktemplate-rpcs",
@ -56,6 +61,9 @@ zebra-state = { path = "../zebra-state" }
zebra-node-services = { path = "../zebra-node-services" }
zebra-chain = { path = "../zebra-chain" }
# prod feature progress-bar
howudoin = { version = "0.1.2", optional = true }
# Test-only dependencies
proptest = { version = "1.1.0", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

View File

@ -117,7 +117,6 @@ fn progress_from_tip(
///
/// Verifies blocks using a supplied list of checkpoints. There must be at
/// least one checkpoint for the genesis block.
#[derive(Debug)]
pub struct CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
@ -156,6 +155,30 @@ where
/// A channel to send requests to reset the verifier,
/// passing the tip of the state.
reset_sender: mpsc::Sender<Option<(block::Height, block::Hash)>>,
/// Queued block height progress transmitter.
#[cfg(feature = "progress-bar")]
queued_blocks_bar: howudoin::Tx,
/// Verified checkpoint progress transmitter.
#[cfg(feature = "progress-bar")]
verified_checkpoint_bar: howudoin::Tx,
}
impl<S> std::fmt::Debug for CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CheckpointVerifier")
.field("checkpoint_list", &self.checkpoint_list)
.field("network", &self.network)
.field("initial_tip_hash", &self.initial_tip_hash)
.field("queued", &self.queued)
.field("verifier_progress", &self.verifier_progress)
.finish()
}
}
impl<S> CheckpointVerifier<S>
@ -240,7 +263,14 @@ where
progress_from_tip(&checkpoint_list, initial_tip);
let (sender, receiver) = mpsc::channel();
CheckpointVerifier {
#[cfg(feature = "progress-bar")]
let queued_blocks_bar = howudoin::new().label("Queued Checkpoint Blocks");
#[cfg(feature = "progress-bar")]
let verified_checkpoint_bar = howudoin::new().label("Verified Checkpoints");
let verifier = CheckpointVerifier {
checkpoint_list,
network,
initial_tip_hash,
@ -249,6 +279,82 @@ where
verifier_progress,
reset_receiver: receiver,
reset_sender: sender,
#[cfg(feature = "progress-bar")]
queued_blocks_bar,
#[cfg(feature = "progress-bar")]
verified_checkpoint_bar,
};
if verifier_progress.is_final_checkpoint() {
verifier.finish_diagnostics();
} else {
verifier.verified_checkpoint_diagnostics(verifier_progress.height());
}
verifier
}
/// Update diagnostics for queued blocks.
fn queued_block_diagnostics(&self, height: block::Height, hash: block::Hash) {
let max_queued_height = self
.queued
.keys()
.next_back()
.expect("queued has at least one entry");
metrics::gauge!("checkpoint.queued.max.height", max_queued_height.0 as f64);
let is_checkpoint = self.checkpoint_list.contains(height);
tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
self.finish_diagnostics();
} else {
self.queued_blocks_bar
.set_pos(max_queued_height.0)
.set_len(u64::from(self.checkpoint_list.max_height().0));
}
}
/// Update diagnostics for verified checkpoints.
fn verified_checkpoint_diagnostics(&self, verified_height: impl Into<Option<block::Height>>) {
let Some(verified_height) = verified_height.into() else {
// We don't know if we have already finished, or haven't started yet,
// so don't register any progress
return;
};
metrics::gauge!("checkpoint.verified.height", verified_height.0 as f64);
let checkpoint_index = self.checkpoint_list.prev_checkpoint_index(verified_height);
let checkpoint_count = self.checkpoint_list.len();
metrics::gauge!("checkpoint.verified.count", checkpoint_index as f64);
tracing::debug!(
?verified_height,
?checkpoint_index,
?checkpoint_count,
"verified checkpoint",
);
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
self.finish_diagnostics();
} else {
self.verified_checkpoint_bar
.set_pos(u64::try_from(checkpoint_index).expect("fits in u64"))
.set_len(u64::try_from(checkpoint_count).expect("fits in u64"));
}
}
/// Finish checkpoint verifier diagnostics.
fn finish_diagnostics(&self) {
#[cfg(feature = "progress-bar")]
{
self.queued_blocks_bar.close();
self.verified_checkpoint_bar.close();
}
}
@ -257,6 +363,8 @@ where
let (initial_tip_hash, verifier_progress) = progress_from_tip(&self.checkpoint_list, tip);
self.initial_tip_hash = initial_tip_hash;
self.verifier_progress = verifier_progress;
self.verified_checkpoint_diagnostics(verifier_progress.height());
}
/// Return the current verifier's progress.
@ -452,18 +560,21 @@ where
// Ignore heights that aren't checkpoint heights
if verified_height == self.checkpoint_list.max_height() {
metrics::gauge!("checkpoint.verified.height", verified_height.0 as f64);
self.verifier_progress = FinalCheckpoint;
tracing::info!(
final_checkpoint_height = ?verified_height,
"verified final checkpoint: starting full validation",
);
self.verified_checkpoint_diagnostics(verified_height);
self.finish_diagnostics();
} else if self.checkpoint_list.contains(verified_height) {
metrics::gauge!("checkpoint.verified.height", verified_height.0 as f64);
self.verifier_progress = PreviousCheckpoint(verified_height);
// We're done with the initial tip hash now
self.initial_tip_hash = None;
self.verified_checkpoint_diagnostics(verified_height);
}
}
@ -568,17 +679,7 @@ where
qblocks.reserve_exact(1);
qblocks.push(new_qblock);
metrics::gauge!(
"checkpoint.queued.max.height",
self.queued
.keys()
.next_back()
.expect("queued has at least one entry")
.0 as f64,
);
let is_checkpoint = self.checkpoint_list.contains(height);
tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
self.queued_block_diagnostics(height, hash);
Ok(req_block)
}
@ -818,6 +919,8 @@ where
/// We can't implement `Drop` on QueuedBlock, because `send()` consumes
/// `tx`. And `tx` doesn't implement `Copy` or `Default` (for `take()`).
fn drop(&mut self) {
self.finish_diagnostics();
let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
for key in drop_keys {
let mut qblocks = self

View File

@ -201,4 +201,21 @@ impl CheckpointList {
pub fn iter(&self) -> impl Iterator<Item = (&block::Height, &block::Hash)> {
self.0.iter()
}
/// Returns the checkpoint at `height`, as a zero-based index.
/// If `height` is not a checkpoint height, returns the checkpoint immediately before that height.
pub fn prev_checkpoint_index(&self, height: block::Height) -> usize {
self.0
.keys()
.rposition(|&key| key <= height)
.expect("checkpoints must start at the genesis block height 0")
}
/// Returns the number of checkpoints in the list.
//
// Checkpoint lists are never empty by construction.
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
self.0.len()
}
}

View File

@ -62,6 +62,31 @@ impl PartialOrd for Progress<block::Height> {
}
}
impl Progress<block::Height> {
/// Returns the contained height, or `None` if the progress has finished, or has not started.
pub fn height(&self) -> Option<block::Height> {
match self {
BeforeGenesis => None,
InitialTip(height) => Some(*height),
PreviousCheckpoint(height) => Some(*height),
FinalCheckpoint => None,
}
}
}
impl<HeightOrHash> Progress<HeightOrHash> {
/// Returns `true` if the progress is before the genesis block.
#[allow(dead_code)]
pub fn is_before_genesis(&self) -> bool {
matches!(self, BeforeGenesis)
}
/// Returns `true` if the progress is at or after the final checkpoint block.
pub fn is_final_checkpoint(&self) -> bool {
matches!(self, FinalCheckpoint)
}
}
/// A `CheckpointVerifier`'s target checkpoint height, based on the current
/// queue.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]

View File

@ -4,6 +4,18 @@ use serde::{Deserialize, Serialize};
/// Configuration for parallel semantic verification:
/// <https://zebra.zfnd.org/dev/rfcs/0002-parallel-verification.html#definitions>
///
/// Automatically downloads the Zcash Sprout and Sapling parameters to the default directory:
/// - Linux: `$HOME/.zcash-params`
/// - macOS: `$HOME/Library/Application Support/ZcashParams`
/// - Windows: `%APPDATA%\ZcashParams` or `C:\Users\%USERNAME%\AppData\ZcashParams`
///
/// # Security
///
/// If you are running Zebra with elevated permissions ("root"), create the
/// parameters directory before running Zebra, and make sure the Zebra user
/// account has exclusive access to that directory, and other users can't modify
/// its parent directories.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {

View File

@ -9,8 +9,17 @@ edition = "2021"
[features]
default = []
# Production features that activate extra dependencies, or extra features in dependencies
progress-bar = [
"howudoin",
]
# Wait until `arti-client`'s dependency `x25519-dalek v1.2.0` is updated to a higher version. (#5492)
# tor = ["arti-client", "tor-rtcompat"]
# Testing features that activate extra dependencies
proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"]
[dependencies]
@ -41,6 +50,9 @@ tracing-futures = "0.2.5"
tracing-error = { version = "0.2.0", features = ["traced-error"] }
tracing = "0.1.37"
# prod feature progress-bar
howudoin = { version = "0.1.2", optional = true }
# tor dependencies
# Wait until `arti-client`'s dependency `x25519-dalek v1.2.0` is updated to a higher version. (#5492)
# arti-client = { version = "0.0.2", optional = true }

View File

@ -94,25 +94,31 @@ pub struct AddressBook {
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct AddressMetrics {
/// The number of addresses in the `Responded` state.
responded: usize,
pub responded: usize,
/// The number of addresses in the `NeverAttemptedGossiped` state.
never_attempted_gossiped: usize,
pub never_attempted_gossiped: usize,
/// The number of addresses in the `NeverAttemptedAlternate` state.
never_attempted_alternate: usize,
pub never_attempted_alternate: usize,
/// The number of addresses in the `Failed` state.
failed: usize,
pub failed: usize,
/// The number of addresses in the `AttemptPending` state.
attempt_pending: usize,
pub attempt_pending: usize,
/// The number of `Responded` addresses within the liveness limit.
recently_live: usize,
pub recently_live: usize,
/// The number of `Responded` addresses outside the liveness limit.
recently_stopped_responding: usize,
pub recently_stopped_responding: usize,
/// The number of addresses in the address book, regardless of their states.
pub num_addresses: usize,
/// The maximum number of addresses in the address book.
pub address_limit: usize,
}
#[allow(clippy::len_without_is_empty)]
@ -497,6 +503,8 @@ impl AddressBook {
.checked_sub(recently_live)
.expect("all recently live peers must have responded");
let num_addresses = self.len();
AddressMetrics {
responded,
never_attempted_gossiped,
@ -505,6 +513,8 @@ impl AddressBook {
attempt_pending,
recently_live,
recently_stopped_responding,
num_addresses,
address_limit: self.addr_limit,
}
}

View File

@ -7,7 +7,7 @@ use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};
use tracing::Span;
use tracing::{Level, Span};
use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
@ -44,8 +44,6 @@ impl AddressBookUpdater {
watch::Receiver<AddressMetrics>,
JoinHandle<Result<(), BoxError>>,
) {
use tracing::Level;
// Create an mpsc channel for peerset address book updates,
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
@ -58,6 +56,18 @@ impl AddressBookUpdater {
let address_metrics = address_book.address_metrics_watcher();
let address_book = Arc::new(std::sync::Mutex::new(address_book));
#[cfg(feature = "progress-bar")]
let (mut address_info, address_bar, never_bar, failed_bar) = {
let address_bar = howudoin::new().label("Known Peers");
(
address_metrics.clone(),
address_bar,
howudoin::new_with_parent(address_bar.id()).label("Never Attempted Peers"),
howudoin::new_with_parent(address_bar.id()).label("Failed Peers"),
)
};
let worker_address_book = address_book.clone();
let worker = move || {
info!("starting the address book updater");
@ -73,6 +83,42 @@ impl AddressBookUpdater {
.lock()
.expect("mutex should be unpoisoned")
.update(event);
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
address_bar.close();
never_bar.close();
failed_bar.close();
} else if address_info.has_changed()? {
// We don't track:
// - attempt pending because it's always small
// - responded because it's the remaining attempted-but-not-failed peers
// - recently live because it's similar to the connected peer counts
let address_info = *address_info.borrow_and_update();
address_bar
.set_pos(u64::try_from(address_info.num_addresses).expect("fits in u64"))
.set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
let never_attempted = address_info.never_attempted_alternate
+ address_info.never_attempted_gossiped;
never_bar
.set_pos(u64::try_from(never_attempted).expect("fits in u64"))
.set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
failed_bar
.set_pos(u64::try_from(address_info.failed).expect("fits in u64"))
.set_len(u64::try_from(address_info.address_limit).expect("fits in u64"));
}
}
#[cfg(feature = "progress-bar")]
{
address_bar.close();
never_bar.close();
failed_bar.close();
}
let error = Err(AllAddressBookUpdaterSendersClosed.into());

View File

@ -259,7 +259,10 @@ where
let mut handshake_success_total: usize = 0;
let mut handshake_error_total: usize = 0;
let mut active_outbound_connections = ActiveConnectionCounter::new_counter();
let mut active_outbound_connections = ActiveConnectionCounter::new_counter_with(
config.peerset_outbound_connection_limit(),
"Outbound Connections",
);
info!(
initial_peer_count = ?initial_peers.len(),
@ -517,7 +520,10 @@ where
+ Clone,
S::Future: Send + 'static,
{
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
let mut active_inbound_connections = ActiveConnectionCounter::new_counter_with(
config.peerset_inbound_connection_limit(),
"Inbound Connections",
);
let mut handshakes = FuturesUnordered::new();
// Keeping an unresolved future in the pool means the stream never terminates.

View File

@ -3,7 +3,7 @@
//! These types can be used to count any kind of active resource.
//! But they are currently used to track the number of open connections.
use std::fmt;
use std::{fmt, sync::Arc};
use tokio::sync::mpsc;
@ -23,17 +23,30 @@ pub struct ActiveConnectionCounter {
/// The number of active peers tracked using this counter.
count: usize,
/// The limit for this type of connection, for diagnostics only.
/// The caller must enforce the limit by ignoring, delaying, or dropping connections.
limit: usize,
/// The label for this connection counter, typically its type.
label: Arc<str>,
/// The channel used to send closed connection notifications.
close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
/// The channel used to receive closed connection notifications.
close_notification_rx: mpsc::UnboundedReceiver<ConnectionClosed>,
/// Active connection count progress transmitter.
#[cfg(feature = "progress-bar")]
connection_bar: howudoin::Tx,
}
impl fmt::Debug for ActiveConnectionCounter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ActiveConnectionCounter")
.field("label", &self.label)
.field("count", &self.count)
.field("limit", &self.limit)
.finish()
}
}
@ -41,13 +54,28 @@ impl fmt::Debug for ActiveConnectionCounter {
impl ActiveConnectionCounter {
/// Create and return a new active connection counter.
pub fn new_counter() -> Self {
Self::new_counter_with(usize::MAX, "Active Connections")
}
/// Create and return a new active connection counter with `limit` and `label`.
/// The caller must check and enforce limits using [`update_count()`](Self::update_count).
pub fn new_counter_with<S: ToString>(limit: usize, label: S) -> Self {
// The number of items in this channel is bounded by the connection limit.
let (close_notification_tx, close_notification_rx) = mpsc::unbounded_channel();
let label = label.to_string();
#[cfg(feature = "progress-bar")]
let connection_bar = howudoin::new().label(label.clone());
Self {
count: 0,
limit,
label: label.into(),
close_notification_rx,
close_notification_tx,
#[cfg(feature = "progress-bar")]
connection_bar,
}
}
@ -71,20 +99,36 @@ impl ActiveConnectionCounter {
debug!(
open_connections = ?self.count,
?previous_connections,
"a peer connection was closed"
limit = ?self.limit,
label = ?self.label,
"a peer connection was closed",
);
}
trace!(
open_connections = ?self.count,
?previous_connections,
"updated active connection count"
limit = ?self.limit,
label = ?self.label,
"updated active connection count",
);
#[cfg(feature = "progress-bar")]
self.connection_bar
.set_pos(u64::try_from(self.count).expect("fits in u64"))
.set_len(u64::try_from(self.limit).expect("fits in u64"));
self.count
}
}
impl Drop for ActiveConnectionCounter {
fn drop(&mut self) {
#[cfg(feature = "progress-bar")]
self.connection_bar.close();
}
}
/// A per-connection tracker.
///
/// [`ActiveConnectionCounter`] creates a tracker instance for each active connection.
@ -92,25 +136,37 @@ impl ActiveConnectionCounter {
pub struct ConnectionTracker {
/// The channel used to send closed connection notifications on drop.
close_notification_tx: mpsc::UnboundedSender<ConnectionClosed>,
/// The label for this connection counter, typically its type.
label: Arc<str>,
}
impl fmt::Debug for ConnectionTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConnectionTracker").finish()
f.debug_tuple("ConnectionTracker")
.field(&self.label)
.finish()
}
}
impl ConnectionTracker {
/// Create and return a new active connection tracker, and add 1 to `counter`.
/// All connection trackers share a label with their connection counter.
///
/// When the returned tracker is dropped, `counter` will be notified, and decreased by 1.
fn new(counter: &mut ActiveConnectionCounter) -> Self {
counter.count += 1;
debug!(open_connections = ?counter.count, "opening a new peer connection");
debug!(
open_connections = ?counter.count,
limit = ?counter.limit,
label = ?counter.label,
"opening a new peer connection",
);
Self {
close_notification_tx: counter.close_notification_tx.clone(),
label: counter.label.clone(),
}
}
}
@ -118,6 +174,8 @@ impl ConnectionTracker {
impl Drop for ConnectionTracker {
/// Notifies the corresponding connection counter that the connection has closed.
fn drop(&mut self) {
debug!(label = ?self.label, "closing a peer connection");
// We ignore disconnected errors, because the receiver can be dropped
// before some connections are dropped.
//

View File

@ -9,6 +9,10 @@ edition = "2021"
# Production features that activate extra dependencies, or extra features in dependencies
progress-bar = [
"howudoin",
]
# Experimental mining RPC support
getblocktemplate-rpcs = [
"zebra-chain/getblocktemplate-rpcs",
@ -56,8 +60,12 @@ elasticsearch = { version = "8.5.0-alpha.1", package = "elasticsearch", optional
serde_json = { version = "1.0.95", package = "serde_json", optional = true }
zebra-chain = { path = "../zebra-chain" }
zebra-test = { path = "../zebra-test/", optional = true }
# prod feature progress-bar
howudoin = { version = "0.1.2", optional = true }
# test feature proptest-impl
zebra-test = { path = "../zebra-test/", optional = true }
proptest = { version = "1.1.0", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

View File

@ -44,6 +44,13 @@ pub struct Config {
/// | macOS | `$HOME/Library/Caches/zebra` | `/Users/Alice/Library/Caches/zebra` |
/// | Windows | `{FOLDERID_LocalAppData}\zebra` | `C:\Users\Alice\AppData\Local\zebra` |
/// | Other | `std::env::current_dir()/cache/zebra` | `/cache/zebra` |
///
/// # Security
///
/// If you are running Zebra with elevated permissions ("root"), create the
/// directory for this file before running Zebra, and make sure the Zebra user
/// account has exclusive access to that directory, and other users can't modify
/// its parent directories.
pub cache_dir: PathBuf,
/// Whether to use an ephemeral database.

View File

@ -1779,7 +1779,8 @@ impl Service<ReadRequest> for ReadStateService {
// blocks into the db) is not mutated here.
//
// TODO: Convert `CommitBlockError` to a new `ValidateProposalError`?
latest_non_finalized_state.should_count_metrics = false;
latest_non_finalized_state.disable_metrics();
write::validate_and_commit_non_finalized(
&state.db,
&mut latest_non_finalized_state,

View File

@ -35,23 +35,71 @@ pub(crate) use chain::Chain;
/// which returns a shared reference to the database.
///
/// Most chain data is clone-on-write using [`Arc`].
#[derive(Clone, Debug)]
pub struct NonFinalizedState {
/// Verified, non-finalized chains, in ascending order.
/// Verified, non-finalized chains, in ascending work order.
///
/// The best chain is `chain_iter().next()`.
/// Using `chain_set.last()` or `chain_set.iter().next_back()` is deprecated, and should migrate to `chain_iter().next()`.
/// The best chain is [`NonFinalizedState::best_chain()`], or `chain_iter().next()`.
/// Using `chain_set.last()` or `chain_set.iter().next_back()` is deprecated,
/// callers should migrate to `chain_iter().next()`.
chain_set: BTreeSet<Arc<Chain>>,
/// The configured Zcash network.
pub network: Network,
#[cfg(feature = "getblocktemplate-rpcs")]
// Diagnostics
//
/// Configures the non-finalized state to count metrics.
///
/// Used for skipping metrics counting when testing block proposals
/// Used for skipping metrics and progress bars when testing block proposals
/// with a commit to a cloned non-finalized state.
pub should_count_metrics: bool,
//
// TODO: make this field private and set it via an argument to NonFinalizedState::new()
#[cfg(feature = "getblocktemplate-rpcs")]
should_count_metrics: bool,
/// Number of chain forks transmitter.
#[cfg(feature = "progress-bar")]
chain_count_bar: Option<howudoin::Tx>,
/// A chain fork length transmitter for each [`Chain`] in [`chain_set`](Self.chain_set).
///
/// Because `chain_set` contains `Arc<Chain>`s, it is difficult to update the metrics state
/// on each chain. ([`Arc`]s are read-only, and we don't want to clone them just for metrics.)
#[cfg(feature = "progress-bar")]
chain_fork_length_bars: Vec<howudoin::Tx>,
}
impl std::fmt::Debug for NonFinalizedState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("NonFinalizedState");
f.field("chain_set", &self.chain_set)
.field("network", &self.network);
#[cfg(feature = "getblocktemplate-rpcs")]
f.field("should_count_metrics", &self.should_count_metrics);
f.finish()
}
}
impl Clone for NonFinalizedState {
fn clone(&self) -> Self {
Self {
chain_set: self.chain_set.clone(),
network: self.network,
#[cfg(feature = "getblocktemplate-rpcs")]
should_count_metrics: self.should_count_metrics,
// Don't track progress in clones.
#[cfg(feature = "progress-bar")]
chain_count_bar: None,
#[cfg(feature = "progress-bar")]
chain_fork_length_bars: Vec::new(),
}
}
}
impl NonFinalizedState {
@ -62,6 +110,10 @@ impl NonFinalizedState {
network,
#[cfg(feature = "getblocktemplate-rpcs")]
should_count_metrics: true,
#[cfg(feature = "progress-bar")]
chain_count_bar: None,
#[cfg(feature = "progress-bar")]
chain_fork_length_bars: Vec::new(),
}
}
@ -75,9 +127,11 @@ impl NonFinalizedState {
///
/// If the internal states are different, it returns `false`,
/// even if the chains and blocks are equal.
#[cfg(test)]
pub(crate) fn eq_internal_state(&self, other: &NonFinalizedState) -> bool {
// this method must be updated every time a field is added to NonFinalizedState
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub fn eq_internal_state(&self, other: &NonFinalizedState) -> bool {
// this method must be updated every time a consensus-critical field is added to NonFinalizedState
// (diagnostic fields can be ignored)
self.chain_set.len() == other.chain_set.len()
&& self
@ -109,6 +163,8 @@ impl NonFinalizedState {
// The first chain is the chain with the lowest work.
self.chain_set.pop_first();
}
self.update_metrics_bars();
}
/// Insert `chain` into `self.chain_set`, then limit the number of tracked chains.
@ -145,7 +201,7 @@ impl NonFinalizedState {
}
// for each remaining chain in side_chains
for mut side_chain in side_chains {
for mut side_chain in side_chains.rev() {
if side_chain.non_finalized_root_hash() != best_chain_root.hash {
// If we popped the root, the chain would be empty or orphaned,
// so just drop it now.
@ -230,6 +286,7 @@ impl NonFinalizedState {
finalized_state.history_tree(),
finalized_state.finalized_value_pool(),
);
let (height, hash) = (prepared.height, prepared.hash);
// If the block is invalid, return the error, and drop the newly created chain fork
@ -458,7 +515,8 @@ impl NonFinalizedState {
}
/// Returns `true` if the best chain contains `sprout_nullifier`.
#[cfg(test)]
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool {
self.best_chain()
.map(|best_chain| best_chain.sprout_nullifiers.contains(sprout_nullifier))
@ -466,7 +524,8 @@ impl NonFinalizedState {
}
/// Returns `true` if the best chain contains `sapling_nullifier`.
#[cfg(test)]
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub fn best_contains_sapling_nullifier(
&self,
sapling_nullifier: &zebra_chain::sapling::Nullifier,
@ -477,7 +536,8 @@ impl NonFinalizedState {
}
/// Returns `true` if the best chain contains `orchard_nullifier`.
#[cfg(test)]
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub fn best_contains_orchard_nullifier(
&self,
orchard_nullifier: &zebra_chain::orchard::Nullifier,
@ -489,7 +549,7 @@ impl NonFinalizedState {
/// Return the non-finalized portion of the current best chain.
pub fn best_chain(&self) -> Option<&Arc<Chain>> {
self.chain_set.iter().next_back()
self.chain_set.iter().rev().next()
}
/// Return the number of chains.
@ -516,6 +576,7 @@ impl NonFinalizedState {
let fork_chain = self
.chain_set
.iter()
.rev()
.find_map(|chain| chain.fork(parent_hash))
.ok_or(ValidateContextError::NotReadyToBeCommitted)?;
@ -524,10 +585,19 @@ impl NonFinalizedState {
}
}
/// Should this `NonFinalizedState` instance track metrics and progress bars?
#[allow(dead_code)]
fn should_count_metrics(&self) -> bool {
#[cfg(feature = "getblocktemplate-rpcs")]
return self.should_count_metrics;
#[cfg(not(feature = "getblocktemplate-rpcs"))]
return true;
}
/// Update the metrics after `block` is committed
fn update_metrics_for_committed_block(&self, height: block::Height, hash: block::Hash) {
#[cfg(feature = "getblocktemplate-rpcs")]
if !self.should_count_metrics {
if !self.should_count_metrics() {
return;
}
@ -536,13 +606,8 @@ impl NonFinalizedState {
if self
.best_chain()
.unwrap()
.blocks
.iter()
.next_back()
.unwrap()
.1
.hash
.expect("metrics are only updated after initialization")
.non_finalized_tip_hash()
== hash
{
metrics::counter!("state.memory.best.committed.block.count", 1);
@ -554,8 +619,7 @@ impl NonFinalizedState {
/// Update the metrics after `self.chain_set` is modified
fn update_metrics_for_chains(&self) {
#[cfg(feature = "getblocktemplate-rpcs")]
if !self.should_count_metrics {
if !self.should_count_metrics() {
return;
}
@ -565,4 +629,122 @@ impl NonFinalizedState {
self.best_chain_len() as f64,
);
}
/// Update the progress bars after any chain is modified.
/// This includes both chain forks and committed blocks.
fn update_metrics_bars(&mut self) {
// TODO: make chain_count_bar interior mutable, move to update_metrics_for_committed_block()
if !self.should_count_metrics() {
#[allow(clippy::needless_return)]
return;
}
#[cfg(feature = "progress-bar")]
{
use std::cmp::Ordering::*;
if matches!(howudoin::cancelled(), Some(true)) {
self.disable_metrics();
return;
}
// Update the chain count bar
if self.chain_count_bar.is_none() {
self.chain_count_bar = Some(howudoin::new().label("Chain Forks"));
}
let chain_count_bar = self
.chain_count_bar
.as_ref()
.expect("just initialized if missing");
let finalized_tip_height = self
.best_chain()
.map(|chain| chain.non_finalized_root_height().0 - 1);
chain_count_bar
.set_pos(u64::try_from(self.chain_count()).expect("fits in u64"))
.set_len(u64::try_from(MAX_NON_FINALIZED_CHAIN_FORKS).expect("fits in u64"));
if let Some(finalized_tip_height) = finalized_tip_height {
chain_count_bar.desc(format!("Finalized Root {finalized_tip_height}"));
}
// Update each chain length bar, creating or deleting bars as needed
let prev_length_bars = self.chain_fork_length_bars.len();
match self.chain_count().cmp(&prev_length_bars) {
Greater => self
.chain_fork_length_bars
.resize_with(self.chain_count(), howudoin::new),
Less => {
let redundant_bars = self.chain_fork_length_bars.split_off(prev_length_bars);
for bar in redundant_bars {
bar.close();
}
}
Equal => {}
}
// It doesn't matter what chain the bar was previously used for,
// because we update everything based on the latest chain in that position.
for (chain_length_bar, chain) in
std::iter::zip(self.chain_fork_length_bars.iter(), self.chain_iter())
{
let fork_height = chain
.last_fork_height
.unwrap_or_else(|| chain.non_finalized_tip_height())
.0;
// We need to initialize and set all the values of the bar here, because:
// - the bar might have been newly created, or
// - the chain this bar was previously assigned to might have changed position.
chain_length_bar
.label(format!("Fork {fork_height}"))
.set_pos(u64::try_from(chain.len()).expect("fits in u64"))
.set_len(u64::from(
zebra_chain::transparent::MIN_TRANSPARENT_COINBASE_MATURITY,
));
// display work as bits
let mut desc = format!(
"Work {:.1} bits",
chain.partial_cumulative_work.difficulty_bits_for_display(),
);
if let Some(recent_fork_height) = chain.recent_fork_height() {
let recent_fork_length = chain
.recent_fork_length()
.expect("just checked recent fork height");
desc.push_str(&format!(
" at {recent_fork_height:?} + {recent_fork_length} blocks"
));
}
chain_length_bar.desc(desc);
}
}
}
/// Stop tracking metrics for this non-finalized state and all its chains.
pub fn disable_metrics(&mut self) {
#[cfg(feature = "getblocktemplate-rpcs")]
{
self.should_count_metrics = false;
}
#[cfg(feature = "progress-bar")]
{
let count_bar = self.chain_count_bar.take().into_iter();
let fork_bars = self.chain_fork_length_bars.drain(..);
count_bar.chain(fork_bars).for_each(howudoin::Tx::close);
}
}
}
impl Drop for NonFinalizedState {
fn drop(&mut self) {
self.disable_metrics();
}
}

View File

@ -36,7 +36,9 @@ use self::index::TransparentTransfers;
pub mod index;
#[derive(Debug, Clone)]
/// A single non-finalized partial chain, from the child of the finalized tip,
/// to a non-finalized chain tip.
#[derive(Clone, Debug)]
pub struct Chain {
// Note: `eq_internal_state()` must be updated every time a field is added to [`Chain`].
@ -176,6 +178,22 @@ pub struct Chain {
/// When a new chain is created from the finalized tip,
/// it is initialized with the finalized tip chain value pool balances.
pub(crate) chain_value_pools: ValueBalance<NonNegative>,
// Diagnostics
//
/// The last height this chain forked at. Diagnostics only.
///
/// This field is only used for metrics, it is not consensus-critical, and it is not checked
/// for equality.
///
/// We keep the same last fork height in both sides of a clone, because every new block clones
/// a chain, even if it's just growing that chain.
pub(super) last_fork_height: Option<Height>,
// # Note
//
// Most diagnostics are implemented on the NonFinalizedState, rather than each chain.
// Some diagnostics only use the best chain, and others need to modify the Chain state,
// but that's difficult with `Arc<Chain>`s.
}
impl Chain {
@ -213,6 +231,7 @@ impl Chain {
partial_cumulative_work: Default::default(),
history_trees_by_height: Default::default(),
chain_value_pools: finalized_tip_chain_value_pools,
last_fork_height: None,
};
chain.add_sprout_tree_and_anchor(finalized_tip_height, sprout_note_commitment_tree);
@ -233,8 +252,8 @@ impl Chain {
///
/// If the internal states are different, it returns `false`,
/// even if the blocks in the two chains are equal.
#[cfg(test)]
pub(crate) fn eq_internal_state(&self, other: &Chain) -> bool {
#[cfg(any(test, feature = "proptest-impl"))]
pub fn eq_internal_state(&self, other: &Chain) -> bool {
// blocks, heights, hashes
self.blocks == other.blocks &&
self.height_by_hash == other.height_by_hash &&
@ -276,6 +295,25 @@ impl Chain {
self.chain_value_pools == other.chain_value_pools
}
/// Returns the last fork height if that height is still in the non-finalized state.
/// Otherwise, if that fork has been finalized, returns `None`.
#[allow(dead_code)]
pub fn recent_fork_height(&self) -> Option<Height> {
self.last_fork_height
.filter(|last| last >= &self.non_finalized_root_height())
}
/// Returns this chain fork's length, if its fork is still in the non-finalized state.
/// Otherwise, if the fork has been finalized, returns `None`.
#[allow(dead_code)]
pub fn recent_fork_length(&self) -> Option<u32> {
let fork_length = self.non_finalized_tip_height() - self.recent_fork_height()?;
// If the fork is above the tip, it is invalid, so just return `None`
// (Ignoring invalid data is ok because this is metrics-only code.)
fork_length.try_into().ok()
}
/// Push a contextually valid non-finalized block into this chain as the new tip.
///
/// If the block is invalid, drops this chain, and returns an error.
@ -338,6 +376,8 @@ impl Chain {
// Revert blocks above the fork
while forked.non_finalized_tip_hash() != fork_tip {
forked.pop_tip();
forked.last_fork_height = Some(forked.non_finalized_tip_height());
}
Some(forked)

View File

@ -43,6 +43,14 @@ flamegraph = ["tracing-flame", "inferno"]
journald = ["tracing-journald"]
filter-reload = ["hyper"]
progress-bar = [
"howudoin",
"indicatif",
"zebra-consensus/progress-bar",
"zebra-state/progress-bar",
"zebra-network/progress-bar",
]
prometheus = ["metrics-exporter-prometheus"]
# Production features that modify dependency behaviour
@ -160,6 +168,10 @@ metrics-exporter-prometheus = { version = "0.11.0", default-features = false, fe
# we only use `log` to set and print the static log levels in transitive dependencies
log = "0.4.17"
# prod feature progress-bar
howudoin = { version = "0.1.2", features = ["term-line"], optional = true }
indicatif = { version = "0.17.3", optional = true }
# test feature proptest-impl
proptest = { version = "1.1.0", optional = true }
proptest-derive = { version = "0.3.0", optional = true }

View File

@ -61,9 +61,10 @@
//! * answers RPC client requests using the State Service and Mempool Service
//! * submits client transactions to the node's mempool
//!
//! Zebra also has diagnostic support
//! Zebra also has diagnostic support:
//! * [metrics](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/metrics.md)
//! * [tracing](https://github.com/ZcashFoundation/zebra/blob/main/book/src/user/tracing.md)
//! * [progress-bar](https://docs.rs/howudoin/0.1.1/howudoin)
//!
//! Some of the diagnostic features are optional, and need to be enabled at compile-time.
@ -310,11 +311,12 @@ impl StartCmd {
.map(|_| info!("transaction gossip task exited"))
.map_err(|e| eyre!(e)),
// The progress task runs forever, unless it panics.
// So we don't need to provide an exit status for it.
progress_result = &mut progress_task_handle => {
info!("chain progress task exited");
progress_result
.expect("unexpected panic in the chain progress task");
info!("chain progress task exited");
Ok(())
}
// Unlike other tasks, we expect the download task to finish while Zebra is running.

View File

@ -63,7 +63,7 @@ pub use error::MempoolError;
pub use gossip::gossip_mempool_transaction_id;
pub use queue_checker::QueueChecker;
pub use storage::{
ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError,
ExactTipRejectionError, SameEffectsChainRejectionError, SameEffectsTipRejectionError, Storage,
};
#[cfg(test)]
@ -102,7 +102,7 @@ enum ActiveState {
///
/// Only components internal to the [`Mempool`] struct are allowed to
/// inject transactions into `storage`, as transactions must be verified beforehand.
storage: storage::Storage,
storage: Storage,
/// The transaction download and verify stream.
tx_downloads: Pin<Box<InboundTxDownloads>>,
@ -141,6 +141,57 @@ impl ActiveState {
}
}
}
/// Returns the number of pending transactions waiting for download or verify,
/// or zero if the mempool is disabled.
#[cfg(feature = "progress-bar")]
fn queued_transaction_count(&self) -> usize {
match self {
ActiveState::Disabled => 0,
ActiveState::Enabled { tx_downloads, .. } => tx_downloads.in_flight(),
}
}
/// Returns the number of transactions in storage, or zero if the mempool is disabled.
#[cfg(feature = "progress-bar")]
fn transaction_count(&self) -> usize {
match self {
ActiveState::Disabled => 0,
ActiveState::Enabled { storage, .. } => storage.transaction_count(),
}
}
/// Returns the cost of the transactions in the mempool, according to ZIP-401.
/// Returns zero if the mempool is disabled.
#[cfg(feature = "progress-bar")]
fn total_cost(&self) -> u64 {
match self {
ActiveState::Disabled => 0,
ActiveState::Enabled { storage, .. } => storage.total_cost(),
}
}
/// Returns the total serialized size of the verified transactions in the set,
/// or zero if the mempool is disabled.
///
/// See [`Storage::total_serialized_size()`] for details.
#[cfg(feature = "progress-bar")]
pub fn total_serialized_size(&self) -> usize {
match self {
ActiveState::Disabled => 0,
ActiveState::Enabled { storage, .. } => storage.total_serialized_size(),
}
}
/// Returns the number of rejected transaction hashes in storage,
/// or zero if the mempool is disabled.
#[cfg(feature = "progress-bar")]
fn rejected_transaction_count(&mut self) -> usize {
match self {
ActiveState::Disabled => 0,
ActiveState::Enabled { storage, .. } => storage.rejected_transaction_count(),
}
}
}
/// Mempool async management and query service.
@ -183,6 +234,28 @@ pub struct Mempool {
/// Sender part of a gossip transactions channel.
/// Used to broadcast transaction ids to peers.
transaction_sender: watch::Sender<HashSet<UnminedTxId>>,
// Diagnostics
//
/// Queued transactions pending download or verification transmitter.
/// Only displayed after the mempool's first activation.
#[cfg(feature = "progress-bar")]
queued_count_bar: Option<howudoin::Tx>,
/// Number of mempool transactions transmitter.
/// Only displayed after the mempool's first activation.
#[cfg(feature = "progress-bar")]
transaction_count_bar: Option<howudoin::Tx>,
/// Mempool transaction cost transmitter.
/// Only displayed after the mempool's first activation.
#[cfg(feature = "progress-bar")]
transaction_cost_bar: Option<howudoin::Tx>,
/// Rejected transactions transmitter.
/// Only displayed after the mempool's first activation.
#[cfg(feature = "progress-bar")]
rejected_count_bar: Option<howudoin::Tx>,
}
impl Mempool {
@ -209,6 +282,14 @@ impl Mempool {
state,
tx_verifier,
transaction_sender,
#[cfg(feature = "progress-bar")]
queued_count_bar: None,
#[cfg(feature = "progress-bar")]
transaction_count_bar: None,
#[cfg(feature = "progress-bar")]
transaction_cost_bar: None,
#[cfg(feature = "progress-bar")]
rejected_count_bar: None,
};
// Make sure `is_enabled` is accurate.
@ -312,6 +393,118 @@ impl Mempool {
.copied()
.collect()
}
/// Update metrics for the mempool.
fn update_metrics(&mut self) {
// Shutdown if needed
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
self.disable_metrics();
return;
}
// Initialize if just activated
#[cfg(feature = "progress-bar")]
if self.is_enabled()
&& (self.queued_count_bar.is_none()
|| self.transaction_count_bar.is_none()
|| self.transaction_cost_bar.is_none()
|| self.rejected_count_bar.is_none())
{
let max_transaction_count = self.config.tx_cost_limit
/ zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
self.queued_count_bar = Some(
howudoin::new()
.label("Mempool Queue")
.set_pos(0u64)
.set_len(
u64::try_from(downloads::MAX_INBOUND_CONCURRENCY).expect("fits in u64"),
),
);
self.transaction_count_bar = Some(
howudoin::new()
.label("Mempool Txs")
.set_pos(0u64)
.set_len(max_transaction_count),
);
self.transaction_cost_bar = Some(
howudoin::new()
.label("Mempool Cost")
.set_pos(0u64)
.set_len(self.config.tx_cost_limit)
.fmt_as_bytes(true),
);
self.rejected_count_bar = Some(
howudoin::new()
.label("Mempool Rejects")
.set_pos(0u64)
.set_len(
u64::try_from(storage::MAX_EVICTION_MEMORY_ENTRIES).expect("fits in u64"),
),
);
}
// Update if the mempool has ever been active
#[cfg(feature = "progress-bar")]
if let (
Some(queued_count_bar),
Some(transaction_count_bar),
Some(transaction_cost_bar),
Some(rejected_count_bar),
) = (
self.queued_count_bar,
self.transaction_count_bar,
self.transaction_cost_bar,
self.rejected_count_bar,
) {
let queued_count = self.active_state.queued_transaction_count();
let transaction_count = self.active_state.transaction_count();
let transaction_cost = self.active_state.total_cost();
let transaction_size = self.active_state.total_serialized_size();
let transaction_size =
indicatif::HumanBytes(transaction_size.try_into().expect("fits in u64"));
let rejected_count = self.active_state.rejected_transaction_count();
queued_count_bar.set_pos(u64::try_from(queued_count).expect("fits in u64"));
transaction_count_bar.set_pos(u64::try_from(transaction_count).expect("fits in u64"));
// Display the cost and cost limit, with the actual size as a description.
//
// Costs can be much higher than the transaction size due to the
// MEMPOOL_TRANSACTION_COST_THRESHOLD minimum cost.
transaction_cost_bar
.set_pos(transaction_cost)
.desc(format!("Actual size {transaction_size}"));
rejected_count_bar.set_pos(u64::try_from(rejected_count).expect("fits in u64"));
}
}
/// Disable metrics for the mempool.
fn disable_metrics(&self) {
#[cfg(feature = "progress-bar")]
{
if let Some(bar) = self.queued_count_bar {
bar.close()
}
if let Some(bar) = self.transaction_count_bar {
bar.close()
}
if let Some(bar) = self.transaction_cost_bar {
bar.close()
}
if let Some(bar) = self.rejected_count_bar {
bar.close()
}
}
}
}
impl Service<Request> for Mempool {
@ -329,6 +522,8 @@ impl Service<Request> for Mempool {
// When the mempool is disabled we still return that the service is ready.
// Otherwise, callers could block waiting for the mempool to be enabled.
if !self.is_enabled() {
self.update_metrics();
return Poll::Ready(Ok(()));
}
@ -371,6 +566,8 @@ impl Service<Request> for Mempool {
}
}
self.update_metrics();
return Poll::Ready(Ok(()));
}
@ -469,6 +666,8 @@ impl Service<Request> for Mempool {
}
}
self.update_metrics();
Poll::Ready(Ok(()))
}
@ -564,6 +763,10 @@ impl Service<Request> for Mempool {
})
.map(|result| result.map_err(BoxError::from))
.collect();
// We've added transactions to the queue
self.update_metrics();
async move { Ok(Response::Queued(rsp)) }.boxed()
}
@ -626,3 +829,9 @@ impl Service<Request> for Mempool {
}
}
}
impl Drop for Mempool {
fn drop(&mut self) {
self.disable_metrics();
}
}

View File

@ -450,7 +450,6 @@ where
}
/// Get the number of currently in-flight download tasks.
// Note: copied from zebrad/src/components/sync/downloads.rs
#[allow(dead_code)]
pub fn in_flight(&self) -> usize {
self.pending.len()

View File

@ -433,6 +433,19 @@ impl Storage {
self.verified.transaction_count()
}
/// Returns the cost of the transactions in the mempool, according to ZIP-401.
#[allow(dead_code)]
pub fn total_cost(&self) -> u64 {
self.verified.total_cost()
}
/// Returns the total serialized size of the verified transactions in the set.
///
/// See [`VerifiedSet::total_serialized_size()`] for details.
pub fn total_serialized_size(&self) -> usize {
self.verified.total_serialized_size()
}
/// Returns the set of [`UnminedTx`]es with exactly matching `tx_ids` in the
/// mempool.
///

View File

@ -1,3 +1,5 @@
//! The set of verified transactions in the mempool.
use std::{
borrow::Cow,
collections::{HashSet, VecDeque},
@ -12,6 +14,10 @@ use zebra_chain::{
use super::super::SameEffectsTipRejectionError;
// Imports for doc links
#[allow(unused_imports)]
use zebra_chain::transaction::MEMPOOL_TRANSACTION_COST_THRESHOLD;
/// The set of verified transactions stored in the mempool.
///
/// This also caches the all the spent outputs from the transactions in the mempool. The spent
@ -30,7 +36,7 @@ pub struct VerifiedSet {
/// serialized.
transactions_serialized_size: usize,
/// The total cost of the verified transactons in the set.
/// The total cost of the verified transactions in the set.
total_cost: u64,
/// The set of spent out points by the verified transactions.
@ -82,6 +88,14 @@ impl VerifiedSet {
self.total_cost
}
/// Returns the total serialized size of the verified transactions in the set.
///
/// This can be less than the total cost, because the minimum transaction cost
/// is based on the [`MEMPOOL_TRANSACTION_COST_THRESHOLD`].
pub fn total_serialized_size(&self) -> usize {
self.transactions_serialized_size
}
/// Returns `true` if the set of verified transactions contains the transaction with the
/// specified [`UnminedTxId`].
pub fn contains(&self, id: &UnminedTxId) -> bool {

View File

@ -1,8 +1,8 @@
//! Progress tracking for blockchain syncing.
use std::{ops::Add, time::Duration};
use std::{cmp::min, ops::Add, time::Duration};
use chrono::Utc;
use chrono::{TimeZone, Utc};
use num_integer::div_ceil;
use zebra_chain::{
@ -20,6 +20,9 @@ use crate::components::sync::SyncStatus;
/// The amount of time between progress logs.
const LOG_INTERVAL: Duration = Duration::from_secs(60);
/// The amount of time between progress bar updates.
const PROGRESS_BAR_INTERVAL: Duration = Duration::from_secs(5);
/// The number of blocks we consider to be close to the tip.
///
/// Most chain forks are 1-7 blocks long.
@ -53,7 +56,8 @@ const SYNC_PERCENT_FRAC_DIGITS: usize = 3;
// TODO: change to HeightDiff?
const MIN_BLOCKS_MINED_AFTER_CHECKPOINT_UPDATE: u32 = 10;
/// Logs Zebra's estimated progress towards the chain tip every minute or so.
/// Logs Zebra's estimated progress towards the chain tip every minute or so, and
/// updates a terminal progress bar every few seconds.
///
/// TODO:
/// - log progress towards, remaining blocks before, and remaining time to next network upgrade
@ -62,7 +66,7 @@ pub async fn show_block_chain_progress(
network: Network,
latest_chain_tip: impl ChainTip,
sync_status: SyncStatus,
) {
) -> ! {
// The minimum number of extra blocks after the highest checkpoint, based on:
// - the non-finalized state limit, and
// - the minimum number of extra blocks mined between a checkpoint update,
@ -91,9 +95,16 @@ pub async fn show_block_chain_progress(
// after fixing slow syncing near tip (#3375)
let min_state_block_interval = max_block_spacing.unwrap_or(target_block_spacing * 4) * 2;
// Formatted string for logging.
// Formatted strings for logging.
let target_block_spacing = humantime_seconds(
target_block_spacing
.to_std()
.expect("constant fits in std::Duration"),
);
let max_block_spacing = max_block_spacing
.map(|duration| duration.to_string())
.map(|duration| {
humantime_seconds(duration.to_std().expect("constant fits in std::Duration"))
})
.unwrap_or_else(|| "None".to_string());
// The last time we downloaded and verified at least one block.
@ -106,6 +117,16 @@ pub async fn show_block_chain_progress(
// Initialized to the genesis height to simplify the code.
let mut last_state_change_height = Height(0);
// The last time we logged an update.
// Initialised with the unix epoch, to simplify the code while still staying in the std range.
let mut last_log_time = Utc
.timestamp_opt(0, 0)
.single()
.expect("in-range number of seconds and valid nanosecond");
#[cfg(feature = "progress-bar")]
let block_bar = howudoin::new().label("Blocks");
loop {
let now = Utc::now();
let is_syncer_stopped = sync_status.is_close_to_tip();
@ -120,6 +141,27 @@ pub async fn show_block_chain_progress(
.expect("unexpected empty state: estimate requires a block height");
let network_upgrade = NetworkUpgrade::current(network, current_height);
// Send progress reports for block height
#[cfg(feature = "progress-bar")]
if matches!(howudoin::cancelled(), Some(true)) {
block_bar.close();
} else {
block_bar
.set_pos(current_height.0)
.set_len(u64::from(estimated_height.0))
.desc(network_upgrade.to_string());
}
// Skip logging if it isn't time for it yet
let elapsed_since_log = (now - last_log_time)
.to_std()
.expect("elapsed times are in range");
if elapsed_since_log < LOG_INTERVAL {
continue;
} else {
last_log_time = now;
}
// Work out the sync progress towards the estimated tip.
let sync_progress = f64::from(current_height.0) / f64::from(estimated_height.0);
let sync_percent = format!(
@ -167,6 +209,10 @@ pub async fn show_block_chain_progress(
and your computer clock and time zone",
time_since_last_state_block_chrono.num_minutes(),
);
// TODO: use add_warn(), but only add each warning once
#[cfg(feature = "progress-bar")]
block_bar.desc("chain updates have stalled");
} else if is_syncer_stopped && remaining_sync_blocks > MIN_SYNC_WARNING_BLOCKS {
// We've stopped syncing blocks, but we estimate we're a long way from the tip.
//
@ -182,6 +228,9 @@ pub async fn show_block_chain_progress(
Hint: check your network connection, \
and your computer clock and time zone",
);
#[cfg(feature = "progress-bar")]
block_bar.desc("sync is very slow, or estimated tip is wrong");
} else if is_syncer_stopped && current_height <= after_checkpoint_height {
// We've stopped syncing blocks,
// but we're below the minimum height estimated from our checkpoints.
@ -203,6 +252,9 @@ pub async fn show_block_chain_progress(
Dev Hint: were the checkpoints updated in the last {} minutes?",
min_minutes_after_checkpoint_update,
);
#[cfg(feature = "progress-bar")]
block_bar.desc("sync is very slow");
} else if is_syncer_stopped {
// We've stayed near the tip for a while, and we've stopped syncing lots of blocks.
// So we're mostly using gossiped blocks now.
@ -214,6 +266,9 @@ pub async fn show_block_chain_progress(
%time_since_last_state_block,
"finished initial sync to chain tip, using gossiped blocks",
);
#[cfg(feature = "progress-bar")]
block_bar.desc(format!("{}: initial sync finished", network_upgrade));
} else if remaining_sync_blocks <= MAX_CLOSE_TO_TIP_BLOCKS {
// We estimate we're near the tip, but we have been syncing lots of blocks recently.
// We might also be using some gossiped blocks.
@ -226,6 +281,9 @@ pub async fn show_block_chain_progress(
"close to finishing initial sync, \
confirming using syncer and gossiped blocks",
);
#[cfg(feature = "progress-bar")]
block_bar.desc(format!("{}: initial sync almost finished", network_upgrade));
} else {
// We estimate we're far from the tip, and we've been syncing lots of blocks.
info!(
@ -250,6 +308,9 @@ pub async fn show_block_chain_progress(
Hint: check your network connection, \
and your computer clock and time zone",
);
#[cfg(feature = "progress-bar")]
block_bar.desc("can't download genesis block");
} else {
// We're waiting for the genesis block to be committed to the state,
// before we can estimate the best chain tip.
@ -258,9 +319,12 @@ pub async fn show_block_chain_progress(
current_height = %"None",
"initial sync is waiting to download the genesis block",
);
#[cfg(feature = "progress-bar")]
block_bar.desc("waiting to download genesis block");
}
}
tokio::time::sleep(LOG_INTERVAL).await;
tokio::time::sleep(min(LOG_INTERVAL, PROGRESS_BAR_INTERVAL)).await;
}
}

View File

@ -111,8 +111,12 @@ mod imp {
.expect("Failed to register signal handler")
.recv()
.await;
zebra_chain::shutdown::set_shutting_down();
#[cfg(feature = "progress-bar")]
howudoin::disable();
info!(
// use target to remove 'imp' from output
target: "zebrad::signal",
@ -131,8 +135,12 @@ mod imp {
tokio::signal::ctrl_c()
.await
.expect("listening for ctrl-c signal should never fail");
zebra_chain::shutdown::set_shutting_down();
#[cfg(feature = "progress-bar")]
howudoin::disable();
info!(
// use target to remove 'imp' from output
target: "zebrad::signal",

View File

@ -92,6 +92,13 @@ pub struct Config {
/// The need to create two files means that we will slightly manipulate the
/// path given to us to create the two representations.
///
/// # Security
///
/// If you are running Zebra with elevated permissions ("root"), create the
/// directory for this file before running Zebra, and make sure the Zebra user
/// account has exclusive access to that directory, and other users can't modify
/// its parent directories.
///
/// # Example
///
/// Given `flamegraph = "flamegraph"` we will generate a `flamegraph.svg` and
@ -102,7 +109,19 @@ pub struct Config {
pub flamegraph: Option<PathBuf>,
/// If set to a path, write the tracing logs to that path.
///
/// By default, logs are sent to the terminal standard output.
/// But if the `progress-bar` feature is activated, logs are sent to the standard log file path:
/// - Linux: `$XDG_STATE_HOME/zebrad.log` or `$HOME/.local/state/zebrad.log`
/// - macOS: `$HOME/Library/Application Support/zebrad.log`
/// - Windows: `%LOCALAPPDATA%\zebrad.log` or `C:\Users\%USERNAME%\AppData\Local\zebrad.log`
///
/// # Security
///
/// If you are running Zebra with elevated permissions ("root"), create the
/// directory for this file before running Zebra, and make sure the Zebra user
/// account has exclusive access to that directory, and other users can't modify
/// its parent directories.
pub log_file: Option<PathBuf>,
/// The use_journald flag sends tracing events to systemd-journald, on Linux
@ -114,6 +133,11 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
#[cfg(feature = "progress-bar")]
let default_log_file = dirs::state_dir()
.or_else(dirs::data_local_dir)
.map(|dir| dir.join("zebrad.log"));
Self {
use_color: true,
force_use_color: false,
@ -121,7 +145,10 @@ impl Default for Config {
buffer_limit: 128_000,
endpoint_addr: None,
flamegraph: None,
#[cfg(not(feature = "progress-bar"))]
log_file: None,
#[cfg(feature = "progress-bar")]
log_file: default_log_file,
use_journald: false,
}
}

View File

@ -1,8 +1,12 @@
//! The Abscissa component for Zebra's `tracing` implementation.
use std::{fs::File, io::Write};
use std::{
fs::{self, File},
io::Write,
};
use abscissa_core::{Component, FrameworkError, Shutdown};
use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
use tracing_error::ErrorLayer;
use tracing_subscriber::{
fmt::{format, Formatter},
@ -12,8 +16,6 @@ use tracing_subscriber::{
EnvFilter,
};
use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
use crate::{application::app_version, components::tracing::Config};
#[cfg(feature = "flamegraph")]
@ -42,7 +44,9 @@ pub struct Tracing {
flamegrapher: Option<flame::Grapher>,
/// Drop guard for worker thread of non-blocking logger,
/// responsible for flushing any remaining logs when the program terminates
/// responsible for flushing any remaining logs when the program terminates.
//
// Correctness: must be listed last in the struct, so it drops after other drops have logged.
_guard: WorkerGuard,
}
@ -54,7 +58,32 @@ impl Tracing {
let flame_root = &config.flamegraph;
let writer = if let Some(log_file) = config.log_file.as_ref() {
println!("running zebra, sending logs to {log_file:?}...");
println!("running zebra");
// Make sure the directory for the log file exists.
// If the log is configured in the current directory, it won't have a parent directory.
//
// # Security
//
// If the user is running Zebra with elevated permissions ("root"), they should
// create the log file directory before running Zebra, and make sure the Zebra user
// account has exclusive access to that directory, and other users can't modify
// its parent directories.
//
// This avoids a TOCTOU security issue in the Rust filesystem API.
let log_file_dir = log_file.parent();
if let Some(log_file_dir) = log_file_dir {
if !log_file_dir.exists() {
println!("directory for log file {log_file:?} does not exist, trying to create it...");
if let Err(create_dir_error) = fs::create_dir_all(log_file_dir) {
println!("failed to create directory for log file: {create_dir_error}");
println!("trying log file anyway...");
}
}
}
println!("sending logs to {log_file:?}...");
let log_file = File::options().append(true).create(true).open(log_file)?;
Box::new(log_file) as BoxWrite
} else {
@ -220,6 +249,27 @@ impl Tracing {
"installed tokio-console tracing layer",
);
// Write any progress reports sent by other tasks to the terminal
//
// TODO: move this to its own module?
#[cfg(feature = "progress-bar")]
{
use howudoin::consumers::TermLine;
use std::time::Duration;
// Stops flickering during the initial sync.
const PROGRESS_BAR_DEBOUNCE: Duration = Duration::from_secs(2);
let terminal_consumer = TermLine::with_debounce(PROGRESS_BAR_DEBOUNCE);
howudoin::init(terminal_consumer);
info!("activated progress bar");
if config.log_file.is_some() {
eprintln!("waiting for initial progress reports...");
}
}
Ok(Self {
filter_handle,
initial_filter: filter,
@ -295,6 +345,16 @@ impl<A: abscissa_core::Application> Component<A> for Tracing {
.map_err(|e| FrameworkErrorKind::ComponentError.context(e))?
}
#[cfg(feature = "progress-bar")]
howudoin::disable();
Ok(())
}
}
impl Drop for Tracing {
fn drop(&mut self) {
#[cfg(feature = "progress-bar")]
howudoin::disable();
}
}

View File

@ -62,6 +62,8 @@
//! ### Metrics
//!
//! * `prometheus`: export metrics to prometheus.
//! * `progress-bar`: shows key metrics in the terminal using progress bars,
//! and automatically configures Zebra to send logs to a file.
//!
//! Read the [metrics](https://zebra.zfnd.org/user/metrics.html) section of the book
//! for more details.
@ -69,6 +71,7 @@
//! ### Tracing
//!
//! Sending traces to different subscribers:
//! * configuring a `tracing.log_file`: appends traces to a file on disk.
//! * `journald`: send tracing spans and events to `systemd-journald`.
//! * `sentry`: send crash and panic events to sentry.io.
//! * `flamegraph`: generate a flamegraph of tracing spans.