Improve tracing output in chain verifier

This commit is contained in:
Henry de Valence 2020-07-22 11:54:52 -07:00
parent 7d4e717182
commit c2c2a28e8b
5 changed files with 36 additions and 77 deletions

View File

@ -25,6 +25,7 @@ use std::{
task::{Context, Poll},
};
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::block::{Block, BlockHeaderHash};
use zebra_chain::types::BlockHeight;
@ -90,15 +91,12 @@ where
let mut state_service = self.state_service.clone();
let max_checkpoint_height = self.max_checkpoint_height;
let span = tracing::debug_span!(
"block_verify",
height = ?block.coinbase_height(),
hash = ?BlockHeaderHash::from(block.as_ref())
);
let height = block.coinbase_height();
// Report each 1000th block at info level
let info_log = matches!(height, Some(BlockHeight(height)) if (height % 1000 == 0));
if info_log {
tracing::info!(?height, "ChainVerifier received block");
} else {
tracing::debug!(?height, "ChainVerifier received block");
}
// Log a warning on unexpected high blocks
let is_unexpected_high_block = match height {
@ -122,11 +120,6 @@ where
// Call a verifier based on the block height and checkpoints.
let hash = match height {
Some(height) if (height <= max_checkpoint_height) => {
if info_log {
tracing::info!(?height, "sending block to CheckpointVerifier");
} else {
tracing::debug!(?height, "sending block to CheckpointVerifier");
}
checkpoint_verifier
.ready_and()
.await?
@ -140,11 +133,6 @@ where
tracing::warn!(?height, "unexpected high block");
}
if info_log {
tracing::info!(?height, "sending block to BlockVerifier");
} else {
tracing::debug!(?height, "sending block to BlockVerifier");
}
block_verifier
.ready_and()
.await?
@ -161,14 +149,6 @@ where
// - handle chain reorgs
// - adjust state_service "unique block height" conditions
if info_log {
tracing::info!(?height, ?hash, "ChainVerifier sent block to state");
} else {
tracing::debug!(?height, ?hash, "ChainVerifier sent block to state");
}
// `Tower::Buffer` requires a 1:1 relationship between `poll()`s
// and `call()`s, because it reserves a buffer slot in each
// `call()`.
let add_block = state_service
.ready_and()
.await?
@ -179,6 +159,7 @@ where
_ => Err("adding block to zebra-state failed".into()),
}
}
.instrument(span)
.boxed()
}
}

View File

@ -198,7 +198,11 @@ impl CheckpointVerifier {
// Find the height we want to start searching at
let mut pending_height = match self.previous_checkpoint_height() {
// Check if we have the genesis block as a special case, to simplify the loop
BeforeGenesis if !self.queued.contains_key(&BlockHeight(0)) => return WaitingForBlocks,
BeforeGenesis if !self.queued.contains_key(&BlockHeight(0)) => {
// XXX scratch tracing line for debugging, delete this
tracing::debug!("beforegenesis if !self.queued.contains_key(&BlockHeight(0))");
return WaitingForBlocks;
}
BeforeGenesis => BlockHeight(0),
PreviousCheckpoint(height) => height,
FinalCheckpoint => return FinishedVerifying,
@ -235,6 +239,12 @@ impl CheckpointVerifier {
.checkpoint_list
.max_height_in_range((start, Included(pending_height)));
tracing::debug!(
checkpoint_start = ?start,
highest_contiguous_block = ?pending_height,
?target_checkpoint
);
target_checkpoint
.map(Checkpoint)
.unwrap_or(WaitingForBlocks)
@ -331,24 +341,11 @@ impl CheckpointVerifier {
// Set up a oneshot channel to send results
let (tx, rx) = oneshot::channel();
let height = block.coinbase_height();
// Report each 1000th block at info level
let info_log = matches!(height, Some(BlockHeight(height)) if (height % 1000 == 0));
if info_log {
tracing::info!(?height, "queue_block received block");
} else {
tracing::debug!(?height, "queue_block received block");
}
// Check for a valid height
let height = match self.check_block(&block) {
Ok(height) => height,
Err(error) => {
tracing::warn!(
?height,
?error,
"queue_block rejected block with block height error"
);
tracing::warn!(?error);
// Sending might fail, depending on what the caller does with rx,
// but there's nothing we can do about it.
let _ = tx.send(Err(error));
@ -366,11 +363,9 @@ impl CheckpointVerifier {
// Memory DoS resistance: limit the queued blocks at each height
if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
tracing::warn!(
?height,
"queue_block rejected block with too many blocks at height error"
);
let _ = tx.send(Err("too many queued blocks at this height".into()));
let e = "too many queued blocks at this height".into();
tracing::warn!(?e);
let _ = tx.send(Err(e));
return rx;
}
@ -381,12 +376,7 @@ impl CheckpointVerifier {
qblocks.reserve_exact(1);
qblocks.push(new_qblock);
if info_log {
tracing::info!(?height, "queue_block added block to queue");
} else {
tracing::debug!(?height, "queue_block added block to queue");
}
tracing::debug!("queued block");
rx
}
@ -480,6 +470,11 @@ impl CheckpointVerifier {
.hash(height)
.expect("every checkpoint height must have a hash"),
),
WaitingForBlocks => {
tracing::debug!("waiting for blocks to complete checkpoint range");
return;
}
// XXX(hdevalence) should this be unreachable!("called after finished") ?
_ => return,
};
@ -642,26 +637,10 @@ impl Service<Arc<Block>> for CheckpointVerifier {
fn call(&mut self, block: Arc<Block>) -> Self::Future {
// TODO(jlusby): Error = Report
let height = block.coinbase_height();
// Report each 1000th block at info level
let info_log = matches!(height, Some(BlockHeight(height)) if (height % 1000 == 0));
if info_log {
tracing::info!(?height, "CheckpointVerifier received block");
} else {
tracing::debug!(?height, "CheckpointVerifier received block");
}
// Queue the block for verification, until we receive all the blocks for
// the current checkpoint range.
let rx = self.queue_block(block);
if info_log {
tracing::info!(?height, "CheckpointVerifier added block to queue");
} else {
tracing::debug!(?height, "CheckpointVerifier added block to queue");
}
// Try to verify from the previous checkpoint to a target checkpoint.
//
// If there are multiple checkpoints in the target range, and one of
@ -672,12 +651,6 @@ impl Service<Arc<Block>> for CheckpointVerifier {
// TODO(teor): retry on failure (low priority, failures should be rare)
self.process_checkpoint_range();
if info_log {
tracing::info!(?height, "CheckpointVerifier processed checkpoint range");
} else {
tracing::debug!(?height, "CheckpointVerifier processed checkpoint range");
}
async move {
// Remove the Result<..., RecvError> wrapper from the channel future
rx.await

View File

@ -15,13 +15,13 @@ lazy_static = "1.4.0"
hex = "0.4.2"
sled = "0.33.0"
serde = { version = "1", features = ["serde_derive"] }
tracing = "0.1"
tracing-futures = "0.2"
[dev-dependencies]
tokio = { version = "0.2.22", features = ["full"] }
zebra-test = { path = "../zebra-test/" }
spandoc = "0.2"
tracing = "0.1.16"
tracing-futures = "0.2.4"
tempdir = "0.3.7"
color-eyre = "0.5"
once_cell = "1.4"

View File

@ -38,6 +38,7 @@ impl Service<Request> for InMemoryState {
}
fn call(&mut self, req: Request) -> Self::Future {
tracing::debug!(?req);
match req {
Request::AddBlock { block } => {
let result = self

View File

@ -72,8 +72,12 @@ where
//
// If there are any prospective tips, call ExtendTips. Continue this step until there are no more prospective tips.
while !self.prospective_tips.is_empty() {
info!("extending prospective tips");
tracing::debug!("extending prospective tips");
self.extend_tips().await?;
tracing::debug!(
pending.len = self.pending_blocks.len(),
limit = LOOKAHEAD_LIMIT
);
// Check whether we need to wait for existing block download tasks to finish
while self.pending_blocks.len() > LOOKAHEAD_LIMIT {