feature: Add tracing to chain and checkpoint verifiers
This commit is contained in:
parent
9b97ebbd61
commit
52002ac3c5
|
@ -80,33 +80,64 @@ where
|
||||||
let mut state_service = self.state_service.clone();
|
let mut state_service = self.state_service.clone();
|
||||||
let max_checkpoint_height = self.max_checkpoint_height;
|
let max_checkpoint_height = self.max_checkpoint_height;
|
||||||
|
|
||||||
|
let height = block.coinbase_height();
|
||||||
|
// Report each 1000th block at info level
|
||||||
|
let info_log = match height {
|
||||||
|
Some(BlockHeight(height)) if (height % 1000 == 0) => true,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "ChainVerifier received block");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "ChainVerifier received block");
|
||||||
|
}
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
// Call a verifier based on the block height and checkpoints
|
|
||||||
//
|
|
||||||
// TODO(teor): for post-sapling checkpoint blocks, allow callers
|
// TODO(teor): for post-sapling checkpoint blocks, allow callers
|
||||||
// to use BlockVerifier, CheckpointVerifier, or both.
|
// to use BlockVerifier, CheckpointVerifier, or both.
|
||||||
match block.coinbase_height() {
|
|
||||||
|
// Call a verifier based on the block height and checkpoints.
|
||||||
|
let hash = match height {
|
||||||
Some(height) if (height <= max_checkpoint_height) => {
|
Some(height) if (height <= max_checkpoint_height) => {
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "sending block to CheckpointVerifier");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "sending block to CheckpointVerifier");
|
||||||
|
}
|
||||||
checkpoint_verifier
|
checkpoint_verifier
|
||||||
.ready_and()
|
.ready_and()
|
||||||
.await?
|
.await?
|
||||||
.call(block.clone())
|
.call(block.clone())
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
Some(_) => {
|
Some(height) => {
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "sending block to BlockVerifier");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "sending block to BlockVerifier");
|
||||||
|
}
|
||||||
block_verifier
|
block_verifier
|
||||||
.ready_and()
|
.ready_and()
|
||||||
.await?
|
.await?
|
||||||
.call(block.clone())
|
.call(block.clone())
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
None => return Err("Invalid block: must have a coinbase height".into()),
|
None => {
|
||||||
|
tracing::debug!("rejecting block with no coinbase height");
|
||||||
|
return Err("Invalid block: must have a coinbase height".into());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO(teor):
|
// TODO(teor):
|
||||||
// - handle chain reorgs
|
// - handle chain reorgs
|
||||||
// - adjust state_service "unique block height" conditions
|
// - adjust state_service "unique block height" conditions
|
||||||
|
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, ?hash, "ChainVerifier sent block to state");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, ?hash, "ChainVerifier sent block to state");
|
||||||
|
}
|
||||||
// `Tower::Buffer` requires a 1:1 relationship between `poll()`s
|
// `Tower::Buffer` requires a 1:1 relationship between `poll()`s
|
||||||
// and `call()`s, because it reserves a buffer slot in each
|
// and `call()`s, because it reserves a buffer slot in each
|
||||||
// `call()`.
|
// `call()`.
|
||||||
|
@ -153,6 +184,8 @@ where
|
||||||
+ 'static,
|
+ 'static,
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
|
tracing::debug!(?network, "initialising ChainVerifier from network");
|
||||||
|
|
||||||
let block_verifier = crate::block::init(state_service.clone());
|
let block_verifier = crate::block::init(state_service.clone());
|
||||||
let checkpoint_verifier = CheckpointVerifier::new(network);
|
let checkpoint_verifier = CheckpointVerifier::new(network);
|
||||||
|
|
||||||
|
@ -197,6 +230,11 @@ where
|
||||||
S::Future: Send + 'static,
|
S::Future: Send + 'static,
|
||||||
{
|
{
|
||||||
let max_checkpoint_height = checkpoint_verifier.list().max_height();
|
let max_checkpoint_height = checkpoint_verifier.list().max_height();
|
||||||
|
tracing::debug!(
|
||||||
|
?max_checkpoint_height,
|
||||||
|
"initialising ChainVerifier with max checkpoint height"
|
||||||
|
);
|
||||||
|
|
||||||
// Wrap the checkpoint verifier in a buffer, so we can share it
|
// Wrap the checkpoint verifier in a buffer, so we can share it
|
||||||
let checkpoint_verifier = Buffer::new(checkpoint_verifier, 1);
|
let checkpoint_verifier = Buffer::new(checkpoint_verifier, 1);
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,10 @@ impl CheckpointVerifier {
|
||||||
/// Clone a CheckpointVerifier, you might need to wrap it in a
|
/// Clone a CheckpointVerifier, you might need to wrap it in a
|
||||||
/// `tower::Buffer` service.
|
/// `tower::Buffer` service.
|
||||||
pub fn new(network: Network) -> Self {
|
pub fn new(network: Network) -> Self {
|
||||||
Self::from_checkpoint_list(CheckpointList::new(network))
|
let checkpoint_list = CheckpointList::new(network);
|
||||||
|
let max_height = checkpoint_list.max_height();
|
||||||
|
tracing::info!(?max_height, ?network, "initialising CheckpointVerifier");
|
||||||
|
Self::from_checkpoint_list(checkpoint_list)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return a checkpoint verification service using `list`.
|
/// Return a checkpoint verification service using `list`.
|
||||||
|
@ -328,10 +331,27 @@ impl CheckpointVerifier {
|
||||||
// Set up a oneshot channel to send results
|
// Set up a oneshot channel to send results
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let height = block.coinbase_height();
|
||||||
|
// Report each 1000th block at info level
|
||||||
|
let info_log = match height {
|
||||||
|
Some(BlockHeight(height)) if (height % 1000 == 0) => true,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "queue_block received block");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "queue_block received block");
|
||||||
|
}
|
||||||
|
|
||||||
// Check for a valid height
|
// Check for a valid height
|
||||||
let height = match self.check_block(&block) {
|
let height = match self.check_block(&block) {
|
||||||
Ok(height) => height,
|
Ok(height) => height,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
tracing::warn!(
|
||||||
|
?height,
|
||||||
|
?error,
|
||||||
|
"queue_block rejected block with block height error"
|
||||||
|
);
|
||||||
// Sending might fail, depending on what the caller does with rx,
|
// Sending might fail, depending on what the caller does with rx,
|
||||||
// but there's nothing we can do about it.
|
// but there's nothing we can do about it.
|
||||||
let _ = tx.send(Err(error));
|
let _ = tx.send(Err(error));
|
||||||
|
@ -349,6 +369,10 @@ impl CheckpointVerifier {
|
||||||
|
|
||||||
// Memory DoS resistance: limit the queued blocks at each height
|
// Memory DoS resistance: limit the queued blocks at each height
|
||||||
if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
|
if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
|
||||||
|
tracing::warn!(
|
||||||
|
?height,
|
||||||
|
"queue_block rejected block with too many blocks at height error"
|
||||||
|
);
|
||||||
let _ = tx.send(Err("too many queued blocks at this height".into()));
|
let _ = tx.send(Err("too many queued blocks at this height".into()));
|
||||||
return rx;
|
return rx;
|
||||||
}
|
}
|
||||||
|
@ -360,6 +384,12 @@ impl CheckpointVerifier {
|
||||||
qblocks.reserve_exact(1);
|
qblocks.reserve_exact(1);
|
||||||
qblocks.push(new_qblock);
|
qblocks.push(new_qblock);
|
||||||
|
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "queue_block added block to queue");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "queue_block added block to queue");
|
||||||
|
}
|
||||||
|
|
||||||
rx
|
rx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -615,10 +645,29 @@ impl Service<Arc<Block>> for CheckpointVerifier {
|
||||||
fn call(&mut self, block: Arc<Block>) -> Self::Future {
|
fn call(&mut self, block: Arc<Block>) -> Self::Future {
|
||||||
// TODO(jlusby): Error = Report
|
// TODO(jlusby): Error = Report
|
||||||
|
|
||||||
|
let height = block.coinbase_height();
|
||||||
|
// Report each 1000th block at info level
|
||||||
|
let info_log = match height {
|
||||||
|
Some(BlockHeight(height)) if (height % 1000 == 0) => true,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "CheckpointVerifier received block");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "CheckpointVerifier received block");
|
||||||
|
}
|
||||||
|
|
||||||
// Queue the block for verification, until we receive all the blocks for
|
// Queue the block for verification, until we receive all the blocks for
|
||||||
// the current checkpoint range.
|
// the current checkpoint range.
|
||||||
let rx = self.queue_block(block);
|
let rx = self.queue_block(block);
|
||||||
|
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "CheckpointVerifier added block to queue");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "CheckpointVerifier added block to queue");
|
||||||
|
}
|
||||||
|
|
||||||
// Try to verify from the previous checkpoint to a target checkpoint.
|
// Try to verify from the previous checkpoint to a target checkpoint.
|
||||||
//
|
//
|
||||||
// If there are multiple checkpoints in the target range, and one of
|
// If there are multiple checkpoints in the target range, and one of
|
||||||
|
@ -629,6 +678,12 @@ impl Service<Arc<Block>> for CheckpointVerifier {
|
||||||
// TODO(teor): retry on failure (low priority, failures should be rare)
|
// TODO(teor): retry on failure (low priority, failures should be rare)
|
||||||
self.process_checkpoint_range();
|
self.process_checkpoint_range();
|
||||||
|
|
||||||
|
if info_log {
|
||||||
|
tracing::info!(?height, "CheckpointVerifier processed checkpoint range");
|
||||||
|
} else {
|
||||||
|
tracing::debug!(?height, "CheckpointVerifier processed checkpoint range");
|
||||||
|
}
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
// Remove the Result<..., RecvError> wrapper from the channel future
|
// Remove the Result<..., RecvError> wrapper from the channel future
|
||||||
rx.await
|
rx.await
|
||||||
|
|
Loading…
Reference in New Issue