zebra/zebra-consensus/src/checkpoint.rs

915 lines
36 KiB
Rust
Raw Normal View History

//! Checkpoint-based block verification.
//!
//! Checkpoint-based verification uses a list of checkpoint hashes to
//! speed up the initial chain sync for Zebra. This list is distributed
//! with Zebra.
//!
//! The checkpoint verifier queues pending blocks. Once there is a
//! chain from the previous checkpoint to a target checkpoint, it
//! verifies all the blocks in that chain, and sends accepted blocks to
//! the state service as finalized chain state, skipping contextual
//! verification checks.
//!
//! Verification starts at the first checkpoint, which is the genesis
//! block for the configured network.
use std::{
collections::{BTreeMap, HashSet},
future::Future,
ops::{Bound, Bound::*},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures_util::FutureExt;
use thiserror::Error;
use tokio::sync::oneshot;
use tower::{Service, ServiceExt};
use tracing::instrument;
use zebra_chain::{
block::{self, Block},
parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
};
use zebra_state as zs;
use crate::BoxError;
pub(crate) mod list;
mod types;
#[cfg(test)]
mod tests;
pub(crate) use list::CheckpointList;
use types::{Progress, Progress::*};
use types::{TargetHeight, TargetHeight::*};
/// An unverified block, which is in the queue for checkpoint verification.
#[derive(Debug)]
struct QueuedBlock {
/// The block data.
block: Arc<Block>,
/// `block`'s cached header hash.
hash: block::Hash,
/// The transmitting end of the oneshot channel for this block's result.
tx: oneshot::Sender<Result<block::Hash, VerifyCheckpointError>>,
}
/// A list of unverified blocks at a particular height.
///
/// Typically contains a single block, but might contain more if a peer
/// has an old chain fork. (Or sends us a bad block.)
///
/// The CheckpointVerifier avoids creating zero-block lists.
type QueuedBlockList = Vec<QueuedBlock>;
/// The maximum number of queued blocks at any one height.
///
/// This value is a tradeoff between:
/// - rejecting bad blocks: if we queue more blocks, we need fewer network
/// retries, but use a bit more CPU when verifying,
/// - avoiding a memory DoS: if we queue fewer blocks, we use less memory.
///
/// Memory usage is controlled by the sync service, because it controls block
/// downloads. When the verifier services process blocks, they reduce memory
/// usage by committing blocks to the disk state. (Or dropping invalid blocks.)
pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4;
/// We limit the maximum number of blocks in each checkpoint. Each block uses a
/// constant amount of memory for the supporting data structures and futures.
///
/// We choose a checkpoint gap that allows us to verify one checkpoint for
/// every `ObtainTips` or `ExtendTips` response.
///
/// `zcashd`'s maximum `FindBlocks` response size is 500 hashes. `zebrad` uses
/// 1 hash to verify the tip, and discards 1-2 hashes to work around `zcashd`
/// bugs. So the most efficient gap is slightly less than 500 blocks.
pub const MAX_CHECKPOINT_HEIGHT_GAP: usize = 400;
/// We limit the memory usage and download contention for each checkpoint,
/// based on the cumulative size of the serialized blocks in the chain.
///
/// Deserialized blocks (in memory) are slightly larger than serialized blocks
/// (on the network or disk). But they should be within a constant factor of the
/// serialized size.
pub const MAX_CHECKPOINT_BYTE_COUNT: u64 = 32 * 1024 * 1024;
/// A checkpointing block verifier.
///
/// Verifies blocks using a supplied list of checkpoints. There must be at
/// least one checkpoint for the genesis block.
#[derive(Debug)]
pub struct CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// The checkpoint list for this verifier.
checkpoint_list: CheckpointList,
/// The hash of the initial tip, if any.
initial_tip_hash: Option<block::Hash>,
/// The underlying state service, possibly wrapped in other services.
state_service: S,
/// A queue of unverified blocks.
///
/// Contains a list of unverified blocks at each block height. In most cases,
/// the checkpoint verifier will store zero or one block at each height.
///
/// Blocks are verified in order, once there is a chain from the previous
/// checkpoint to a target checkpoint.
///
/// The first checkpoint does not have any ancestors, so it only verifies the
/// genesis block.
queued: BTreeMap<block::Height, QueuedBlockList>,
/// The current progress of this verifier.
verifier_progress: Progress<block::Height>,
}
impl<S> CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// Return a checkpoint verification service for `network`, using the
/// hard-coded checkpoint list, and the provided `state_service`.
///
/// If `initial_tip` is Some(_), the verifier starts at that initial tip.
/// The initial tip can be between the checkpoints in the hard-coded
/// checkpoint list.
///
/// The checkpoint verifier holds a state service of type `S`, into which newly
/// verified blocks will be committed. This state is pluggable to allow for
/// testing or instrumentation.
///
/// This function should be called only once for a particular network, rather
/// than constructing multiple verification services for the same network. To
/// clone a CheckpointVerifier, you might need to wrap it in a
/// `tower::Buffer` service.
#[allow(dead_code)]
pub fn new(
network: Network,
initial_tip: Option<(block::Height, block::Hash)>,
state_service: S,
) -> Self {
let checkpoint_list = CheckpointList::new(network);
let max_height = checkpoint_list.max_height();
tracing::info!(
?max_height,
?network,
?initial_tip,
"initialising CheckpointVerifier"
);
Self::from_checkpoint_list(checkpoint_list, initial_tip, state_service)
}
/// Return a checkpoint verification service using `list`, `initial_tip`,
/// and `state_service`.
///
/// Assumes that the provided genesis checkpoint is correct.
///
2020-07-20 21:09:19 -07:00
/// Callers should prefer `CheckpointVerifier::new`, which uses the
/// hard-coded checkpoint lists, or `CheckpointList::from_list` if you need
/// to specify a custom checkpoint list. See those functions for more
/// details.
///
/// This function is designed for use in tests.
#[allow(dead_code)]
pub(crate) fn from_list(
list: impl IntoIterator<Item = (block::Height, block::Hash)>,
initial_tip: Option<(block::Height, block::Hash)>,
state_service: S,
) -> Result<Self, VerifyCheckpointError> {
Ok(Self::from_checkpoint_list(
CheckpointList::from_list(list).map_err(VerifyCheckpointError::CheckpointList)?,
initial_tip,
state_service,
))
}
/// Return a checkpoint verification service using `checkpoint_list`,
/// `initial_tip`, and `state_service`.
///
/// Assumes that the provided genesis checkpoint is correct.
///
2020-07-20 21:09:19 -07:00
/// Callers should prefer `CheckpointVerifier::new`, which uses the
/// hard-coded checkpoint lists. See that function for more details.
pub(crate) fn from_checkpoint_list(
checkpoint_list: CheckpointList,
initial_tip: Option<(block::Height, block::Hash)>,
state_service: S,
) -> Self {
// All the initialisers should call this function, so we only have to
// change fields or default values in one place.
let (initial_tip_hash, verifier_progress) = match initial_tip {
Some((height, hash)) => {
if height >= checkpoint_list.max_height() {
(None, Progress::FinalCheckpoint)
} else {
metrics::gauge!("checkpoint.verified.height", height.0 as f64);
metrics::gauge!("checkpoint.processing.next.height", height.0 as f64);
(Some(hash), Progress::InitialTip(height))
}
}
// We start by verifying the genesis block, by itself
None => (None, Progress::BeforeGenesis),
};
CheckpointVerifier {
checkpoint_list,
initial_tip_hash,
state_service,
queued: BTreeMap::new(),
verifier_progress,
}
}
/// Return the current verifier's progress.
///
/// If verification has not started yet, returns `BeforeGenesis`,
/// or `InitialTip(height)` if there were cached verified blocks.
///
/// If verification is ongoing, returns `PreviousCheckpoint(height)`.
/// `height` increases as checkpoints are verified.
///
/// If verification has finished, returns `FinalCheckpoint`.
fn previous_checkpoint_height(&self) -> Progress<block::Height> {
self.verifier_progress
}
/// Return the start of the current checkpoint range.
///
/// Returns None if verification has finished.
fn current_start_bound(&self) -> Option<Bound<block::Height>> {
match self.previous_checkpoint_height() {
BeforeGenesis => Some(Unbounded),
InitialTip(height) | PreviousCheckpoint(height) => Some(Excluded(height)),
FinalCheckpoint => None,
}
}
/// Return the target checkpoint height that we want to verify.
///
/// If we need more blocks, returns `WaitingForBlocks`.
///
/// If the queued blocks are continuous from the previous checkpoint to a
/// target checkpoint, returns `Checkpoint(height)`. The target checkpoint
/// can be multiple checkpoints ahead of the previous checkpoint.
///
/// `height` increases as checkpoints are verified.
///
/// If verification has finished, returns `FinishedVerifying`.
fn target_checkpoint_height(&self) -> TargetHeight {
// Find the height we want to start searching at
let start_height = match self.previous_checkpoint_height() {
// Check if we have the genesis block as a special case, to simplify the loop
BeforeGenesis if !self.queued.contains_key(&block::Height(0)) => {
tracing::trace!("Waiting for genesis block");
metrics::counter!("checkpoint.waiting.count", 1);
return WaitingForBlocks;
}
BeforeGenesis => block::Height(0),
InitialTip(height) | PreviousCheckpoint(height) => height,
FinalCheckpoint => return FinishedVerifying,
};
// Find the end of the continuous sequence of blocks, starting at the
// last verified checkpoint. If there is no verified checkpoint, start
// *after* the genesis block (which we checked above).
//
// If `btree_map::Range` implements `ExactSizeIterator`, it would be
// much faster to walk the checkpoint list, and compare the length of
// the `btree_map::Range` to the block height difference between
// checkpoints. (In maps, keys are unique, so we don't need to check
// each height value.)
//
// But at the moment, this implementation is slightly faster, because
// it stops after the first gap.
let mut pending_height = start_height;
for (&height, _) in self.queued.range((Excluded(pending_height), Unbounded)) {
// If the queued blocks are continuous.
if height == block::Height(pending_height.0 + 1) {
pending_height = height;
} else {
let gap = height.0 - pending_height.0;
// Try to log a useful message when checkpointing has issues
tracing::trace!(contiguous_height = ?pending_height,
next_height = ?height,
?gap,
"Waiting for more checkpoint blocks");
break;
}
}
metrics::gauge!(
"checkpoint.queued.continuous.height",
pending_height.0 as f64
);
// Now find the start of the checkpoint range
let start = self.current_start_bound().expect(
"if verification has finished, we should have returned earlier in the function",
);
// Find the highest checkpoint below pending_height, excluding any
// previously verified checkpoints
let target_checkpoint = self
.checkpoint_list
.max_height_in_range((start, Included(pending_height)));
tracing::trace!(
checkpoint_start = ?start,
highest_contiguous_block = ?pending_height,
?target_checkpoint
);
if let Some(block::Height(target_checkpoint)) = target_checkpoint {
metrics::gauge!(
"checkpoint.processing.next.height",
target_checkpoint as f64
);
} else {
// Use the start height if there is no potential next checkpoint
metrics::gauge!("checkpoint.processing.next.height", start_height.0 as f64);
metrics::counter!("checkpoint.waiting.count", 1);
}
target_checkpoint
.map(Checkpoint)
.unwrap_or(WaitingForBlocks)
}
/// Return the most recently verified checkpoint's hash.
///
/// See `previous_checkpoint_height()` for details.
fn previous_checkpoint_hash(&self) -> Progress<block::Hash> {
match self.previous_checkpoint_height() {
BeforeGenesis => BeforeGenesis,
InitialTip(_) => self
.initial_tip_hash
.map(InitialTip)
.expect("initial tip height must have an initial tip hash"),
PreviousCheckpoint(height) => self
.checkpoint_list
.hash(height)
.map(PreviousCheckpoint)
.expect("every checkpoint height must have a hash"),
FinalCheckpoint => FinalCheckpoint,
}
}
/// Check that `height` is valid and able to be verified.
///
/// Returns an error if:
/// - the block's height is greater than the maximum checkpoint
/// - there are no checkpoints
/// - the block's height is less than or equal to the previously verified
/// checkpoint
/// - verification has finished
fn check_height(&self, height: block::Height) -> Result<(), VerifyCheckpointError> {
if height > self.checkpoint_list.max_height() {
Err(VerifyCheckpointError::TooHigh {
height,
max_height: self.checkpoint_list.max_height(),
})?;
}
match self.previous_checkpoint_height() {
// Any height is valid
BeforeGenesis => {}
// Greater heights are valid
InitialTip(previous_height) | PreviousCheckpoint(previous_height)
if (height <= previous_height) =>
{
let e = Err(VerifyCheckpointError::AlreadyVerified {
height,
verified_height: previous_height,
});
tracing::trace!(?e);
e?;
}
InitialTip(_) | PreviousCheckpoint(_) => {}
// We're finished, so no checkpoint height is valid
FinalCheckpoint => Err(VerifyCheckpointError::Finished)?,
};
Ok(())
}
/// Increase the current checkpoint height to `verified_height`,
fn update_progress(&mut self, verified_height: block::Height) {
if let Some(max_height) = self.queued.keys().next_back() {
metrics::gauge!("checkpoint.queued.max.height", max_height.0 as f64);
} else {
// use f64::NAN as a sentinel value for "None", because 0 is a valid height
metrics::gauge!("checkpoint.queued.max.height", f64::NAN);
}
metrics::gauge!("checkpoint.queued_slots", self.queued.len() as f64);
// Ignore blocks that are below the previous checkpoint, or otherwise
// have invalid heights.
//
// We ignore out-of-order verification, such as:
// - the height is less than the previous checkpoint height, or
// - the previous checkpoint height is the maximum height (checkpoint verifies are finished),
// because futures might not resolve in height order.
if self.check_height(verified_height).is_err() {
return;
}
// Ignore heights that aren't checkpoint heights
if verified_height == self.checkpoint_list.max_height() {
metrics::gauge!("checkpoint.verified.height", verified_height.0 as f64);
self.verifier_progress = FinalCheckpoint;
} else if self.checkpoint_list.contains(verified_height) {
metrics::gauge!("checkpoint.verified.height", verified_height.0 as f64);
self.verifier_progress = PreviousCheckpoint(verified_height);
// We're done with the initial tip hash now
self.initial_tip_hash = None;
}
}
/// Check that the block height and Merkle root are valid.
///
/// Checking the Merkle root ensures that the block hash binds the block
/// contents. To prevent malleability (CVE-2012-2459), we also need to check
/// whether the transaction hashes are unique.
fn check_block(&self, block: &Block) -> Result<block::Height, VerifyCheckpointError> {
let block_height = block
.coinbase_height()
.ok_or(VerifyCheckpointError::CoinbaseHeight { hash: block.hash() })?;
self.check_height(block_height)?;
let transaction_hashes = block
.transactions
.iter()
.map(|tx| tx.hash())
.collect::<Vec<_>>();
let merkle_root = transaction_hashes.iter().cloned().collect();
// Check that the Merkle root is valid.
if block.header.merkle_root != merkle_root {
return Err(VerifyCheckpointError::BadMerkleRoot {
expected: block.header.merkle_root,
actual: merkle_root,
});
}
// To prevent malleability (CVE-2012-2459), we also need to check
// whether the transaction hashes are unique. Collecting into a HashSet
// deduplicates, so this checks that there are no duplicate transaction
// hashes, preventing Merkle root malleability.
if transaction_hashes.len() != transaction_hashes.iter().collect::<HashSet<_>>().len() {
return Err(VerifyCheckpointError::DuplicateTransaction);
}
Ok(block_height)
}
/// Queue `block` for verification, and return the `Receiver` for the
/// block's verification result.
///
/// Verification will finish when the chain to the next checkpoint is
/// complete, and the caller will be notified via the channel.
///
/// If the block does not have a coinbase height, sends an error on `tx`,
/// and does not queue the block.
fn queue_block(
&mut self,
block: Arc<Block>,
) -> oneshot::Receiver<Result<block::Hash, VerifyCheckpointError>> {
// Set up a oneshot channel to send results
let (tx, rx) = oneshot::channel();
// Check that the height and Merkle roots are valid.
let height = match self.check_block(&block) {
Ok(height) => height,
Err(error) => {
tx.send(Err(error)).expect("rx has not been dropped yet");
return rx;
}
};
// Since we're using Arc<Block>, each entry is a single pointer to the
// Arc. But there are a lot of QueuedBlockLists in the queue, so we keep
// allocations as small as possible.
let qblocks = self
.queued
.entry(height)
.or_insert_with(|| QueuedBlockList::with_capacity(1));
Fix sync algorithm. (#887) * checkpoint: reject older of duplicate verification requests. If we get a duplicate block verification request, we should drop the older one in favor of the newer one, because the older request is likely to have been canceled. Previously, this code would accept up to four duplicate verification requests, then fail all subsequent ones. * sync: add a timeout layer to block requests. Note that if this timeout is too short, we'll bring down the peer set in a retry storm. * sync: restart syncing on error Restart the syncing process when an error occurs, rather than ignoring it. Restarting means we discard all tips and start over with a new block locator, so we can have another chance to "unstuck" ourselves. * sync: additional debug info * sync: handle lookahead limit correctly. Instead of extracting all the completed task results, the previous code pulled results out until there were fewer tasks than the lookahead limit, then stopped. This meant that completed tasks could be left until the limit was exceeded again. Instead, extract all completed results, and use the number of pending tasks to decide whether to extend the tip or wait for blocks to finish. * network: add debug instrumentation to retry policy * sync: instrument the spawned task * sync: streamline ObtainTips/ExtendTips logic & tracing This change does three things: 1. It aligns the implementation of ObtainTips and ExtendTips so that they use the same deduplication method. This means that when debugging we only have one deduplication algorithm to focus on. 2. It streamlines the tracing output to not include information already included in spans. Both obtain_tips and extend_tips have their own spans attached to the events, so it's not necessary to add Scope: prefixes in messages. 3. It changes the messages to be focused on reporting the actual events rather than the interpretation of the events (e.g., "got genesis hash in response" rather than "peer could not extend tip"). The motivation for this change is that when debugging, the interpretation of events is already known to be incorrect, in the sense that the mental model of the code (no bug) does not match its behavior (has bug), so presenting minimally-interpreted events forces interpretation relative to the actual code. * sync: hack to work around zcashd behavior * sync: localize debug statement in extend_tips * sync: change algorithm to define tips as pairs of hashes. This is different enough from the existing description that its comments no longer apply, so I removed them. A further chunk of work is to change the sync RFC to document this algorithm. * sync: reduce block timeout * state: add resource limits for sled Closes #888 * sync: add a restart timeout constant * sync: de-pub constants
2020-08-12 16:48:01 -07:00
let hash = block.hash();
// Replace older requests by newer ones by swapping the oneshot.
Fix sync algorithm. (#887) * checkpoint: reject older of duplicate verification requests. If we get a duplicate block verification request, we should drop the older one in favor of the newer one, because the older request is likely to have been canceled. Previously, this code would accept up to four duplicate verification requests, then fail all subsequent ones. * sync: add a timeout layer to block requests. Note that if this timeout is too short, we'll bring down the peer set in a retry storm. * sync: restart syncing on error Restart the syncing process when an error occurs, rather than ignoring it. Restarting means we discard all tips and start over with a new block locator, so we can have another chance to "unstuck" ourselves. * sync: additional debug info * sync: handle lookahead limit correctly. Instead of extracting all the completed task results, the previous code pulled results out until there were fewer tasks than the lookahead limit, then stopped. This meant that completed tasks could be left until the limit was exceeded again. Instead, extract all completed results, and use the number of pending tasks to decide whether to extend the tip or wait for blocks to finish. * network: add debug instrumentation to retry policy * sync: instrument the spawned task * sync: streamline ObtainTips/ExtendTips logic & tracing This change does three things: 1. It aligns the implementation of ObtainTips and ExtendTips so that they use the same deduplication method. This means that when debugging we only have one deduplication algorithm to focus on. 2. It streamlines the tracing output to not include information already included in spans. Both obtain_tips and extend_tips have their own spans attached to the events, so it's not necessary to add Scope: prefixes in messages. 3. It changes the messages to be focused on reporting the actual events rather than the interpretation of the events (e.g., "got genesis hash in response" rather than "peer could not extend tip"). The motivation for this change is that when debugging, the interpretation of events is already known to be incorrect, in the sense that the mental model of the code (no bug) does not match its behavior (has bug), so presenting minimally-interpreted events forces interpretation relative to the actual code. * sync: hack to work around zcashd behavior * sync: localize debug statement in extend_tips * sync: change algorithm to define tips as pairs of hashes. This is different enough from the existing description that its comments no longer apply, so I removed them. A further chunk of work is to change the sync RFC to document this algorithm. * sync: reduce block timeout * state: add resource limits for sled Closes #888 * sync: add a restart timeout constant * sync: de-pub constants
2020-08-12 16:48:01 -07:00
for qb in qblocks.iter_mut() {
if qb.hash == hash {
let e = VerifyCheckpointError::NewerRequest { height, hash };
tracing::trace!(?e, "failing older of duplicate requests");
let old_tx = std::mem::replace(&mut qb.tx, tx);
Fix sync algorithm. (#887) * checkpoint: reject older of duplicate verification requests. If we get a duplicate block verification request, we should drop the older one in favor of the newer one, because the older request is likely to have been canceled. Previously, this code would accept up to four duplicate verification requests, then fail all subsequent ones. * sync: add a timeout layer to block requests. Note that if this timeout is too short, we'll bring down the peer set in a retry storm. * sync: restart syncing on error Restart the syncing process when an error occurs, rather than ignoring it. Restarting means we discard all tips and start over with a new block locator, so we can have another chance to "unstuck" ourselves. * sync: additional debug info * sync: handle lookahead limit correctly. Instead of extracting all the completed task results, the previous code pulled results out until there were fewer tasks than the lookahead limit, then stopped. This meant that completed tasks could be left until the limit was exceeded again. Instead, extract all completed results, and use the number of pending tasks to decide whether to extend the tip or wait for blocks to finish. * network: add debug instrumentation to retry policy * sync: instrument the spawned task * sync: streamline ObtainTips/ExtendTips logic & tracing This change does three things: 1. It aligns the implementation of ObtainTips and ExtendTips so that they use the same deduplication method. This means that when debugging we only have one deduplication algorithm to focus on. 2. It streamlines the tracing output to not include information already included in spans. Both obtain_tips and extend_tips have their own spans attached to the events, so it's not necessary to add Scope: prefixes in messages. 3. It changes the messages to be focused on reporting the actual events rather than the interpretation of the events (e.g., "got genesis hash in response" rather than "peer could not extend tip"). The motivation for this change is that when debugging, the interpretation of events is already known to be incorrect, in the sense that the mental model of the code (no bug) does not match its behavior (has bug), so presenting minimally-interpreted events forces interpretation relative to the actual code. * sync: hack to work around zcashd behavior * sync: localize debug statement in extend_tips * sync: change algorithm to define tips as pairs of hashes. This is different enough from the existing description that its comments no longer apply, so I removed them. A further chunk of work is to change the sync RFC to document this algorithm. * sync: reduce block timeout * state: add resource limits for sled Closes #888 * sync: add a restart timeout constant * sync: de-pub constants
2020-08-12 16:48:01 -07:00
let _ = old_tx.send(Err(e));
return rx;
}
}
// Memory DoS resistance: limit the queued blocks at each height
if qblocks.len() >= MAX_QUEUED_BLOCKS_PER_HEIGHT {
let e = VerifyCheckpointError::QueuedLimit;
tracing::warn!(?e);
let _ = tx.send(Err(e));
return rx;
}
// Add the block to the list of queued blocks at this height
let new_qblock = QueuedBlock { block, hash, tx };
// This is a no-op for the first block in each QueuedBlockList.
qblocks.reserve_exact(1);
qblocks.push(new_qblock);
metrics::gauge!(
"checkpoint.queued.max.height",
self.queued
.keys()
.next_back()
.expect("queued has at least one entry")
.0 as f64
);
let is_checkpoint = self.checkpoint_list.contains(height);
tracing::debug!(?height, ?hash, ?is_checkpoint, "queued block");
rx
}
/// During checkpoint range processing, process all the blocks at `height`.
///
/// Returns the first valid block. If there is no valid block, returns None.
fn process_height(
&mut self,
height: block::Height,
expected_hash: block::Hash,
) -> Option<QueuedBlock> {
let mut qblocks = self
.queued
.remove(&height)
.expect("the current checkpoint range has continuous Vec<QueuedBlock>s");
assert!(
!qblocks.is_empty(),
"the current checkpoint range has continous Blocks"
);
// Check interim checkpoints
if let Some(checkpoint_hash) = self.checkpoint_list.hash(height) {
// We assume the checkpoints are valid. And we have verified back
// from the target checkpoint, so the last block must also be valid.
// This is probably a bad checkpoint list, a zebra bug, or a bad
// chain (in a testing mode like regtest).
assert_eq!(expected_hash, checkpoint_hash,
"checkpoints in the range should match: bad checkpoint list, zebra bug, or bad chain"
);
}
// Find a queued block at this height, which is part of the hash chain.
//
// There are two possible outcomes here:
// - one of the blocks matches the chain (the common case)
// - no blocks match the chain, verification has failed for this range
// If there are any side-chain blocks, they fail validation.
let mut valid_qblock = None;
for qblock in qblocks.drain(..) {
if qblock.hash == expected_hash {
if valid_qblock.is_none() {
// The first valid block at the current height
valid_qblock = Some(qblock);
} else {
unreachable!("unexpected duplicate block {:?} {:?}: duplicate blocks should be rejected before being queued",
height, qblock.hash);
}
} else {
tracing::info!(?height, ?qblock.hash, ?expected_hash,
"Side chain hash at height in CheckpointVerifier");
let _ = qblock
.tx
.send(Err(VerifyCheckpointError::UnexpectedSideChain {
found: qblock.hash,
expected: expected_hash,
}));
}
}
valid_qblock
}
/// Try to verify from the previous checkpoint to a target checkpoint.
///
/// Send `Ok` for the blocks that are in the chain, and `Err` for side-chain
/// blocks.
///
/// Does nothing if we are waiting for more blocks, or if verification has
/// finished.
fn process_checkpoint_range(&mut self) {
// If this code shows up in profiles, we can try the following
// optimisations:
// - only check the chain when the length of the queue is greater
// than or equal to the length of a checkpoint interval
// (note: the genesis checkpoint interval is only one block long)
// - cache the height of the last continuous chain as a new field in
// self, and start at that height during the next check.
// Return early if verification has finished
let previous_checkpoint_hash = match self.previous_checkpoint_hash() {
// Since genesis blocks are hard-coded in zcashd, and not verified
// like other blocks, the genesis parent hash is set by the
// consensus parameters.
BeforeGenesis => GENESIS_PREVIOUS_BLOCK_HASH,
InitialTip(hash) | PreviousCheckpoint(hash) => hash,
FinalCheckpoint => return,
};
// Return early if we're still waiting for more blocks
let (target_checkpoint_height, mut expected_hash) = match self.target_checkpoint_height() {
Checkpoint(height) => (
height,
self.checkpoint_list
.hash(height)
.expect("every checkpoint height must have a hash"),
),
WaitingForBlocks => {
return;
}
FinishedVerifying => {
unreachable!("the FinalCheckpoint case should have returned earlier")
}
};
// Keep the old previous checkpoint height, to make sure we're making
// progress
let old_prev_check_height = self.previous_checkpoint_height();
// Work out which blocks and checkpoints we're checking
let current_range = (
self.current_start_bound()
.expect("earlier code checks if verification has finished"),
Included(target_checkpoint_height),
);
let range_heights: Vec<block::Height> = self
.queued
.range_mut(current_range)
.rev()
.map(|(key, _)| *key)
.collect();
// A list of pending valid blocks, in reverse chain order
let mut rev_valid_blocks = Vec::new();
// Check all the blocks, and discard all the bad blocks
for current_height in range_heights {
let valid_qblock = self.process_height(current_height, expected_hash);
if let Some(qblock) = valid_qblock {
expected_hash = qblock.block.header.previous_block_hash;
// Add the block to the end of the pending block list
// (since we're walking the chain backwards, the list is
// in reverse chain order)
rev_valid_blocks.push(qblock);
} else {
// The last block height we processed did not have any blocks
// with a matching hash, so chain verification has failed.
tracing::info!(
?current_height,
?current_range,
"No valid blocks at height in CheckpointVerifier"
);
// We kept all the matching blocks down to this height, in
// anticipation of the chain verifying. But the chain is
// incomplete, so we have to put them back in the queue.
//
// The order here shouldn't matter, but add the blocks in
// height order, for consistency.
for vblock in rev_valid_blocks.drain(..).rev() {
let height = vblock
.block
.coinbase_height()
.expect("queued blocks have a block height");
self.queued.entry(height).or_default().push(vblock);
}
// Make sure the current progress hasn't changed
assert_eq!(
self.previous_checkpoint_height(),
old_prev_check_height,
"we must not change the previous checkpoint on failure"
);
// We've reduced the target
//
// This check should be cheap, because we just reduced the target
let current_target = self.target_checkpoint_height();
assert!(
current_target == WaitingForBlocks
|| current_target < Checkpoint(target_checkpoint_height),
"we must decrease or eliminate our target on failure"
);
// Stop verifying, and wait for the next valid block
return;
}
}
// The checkpoint and the parent hash must match.
// See the detailed checkpoint comparison comment above.
assert_eq!(
expected_hash, previous_checkpoint_hash,
"the previous checkpoint should match: bad checkpoint list, zebra bug, or bad chain"
);
let block_count = rev_valid_blocks.len();
tracing::info!(?block_count, ?current_range, "verified checkpoint range");
metrics::counter!("checkpoint.verified.block.count", block_count as _);
// All the blocks we've kept are valid, so let's verify them
// in height order.
for qblock in rev_valid_blocks.drain(..).rev() {
// Sending can fail, but there's nothing we can do about it.
let _ = qblock.tx.send(Ok(qblock.hash));
}
// Finally, update the checkpoint bounds
self.update_progress(target_checkpoint_height);
// Ensure that we're making progress
let new_progress = self.previous_checkpoint_height();
assert!(
new_progress > old_prev_check_height,
"we must make progress on success"
);
// We met the old target
if new_progress == FinalCheckpoint {
assert_eq!(
target_checkpoint_height,
self.checkpoint_list.max_height(),
"we finish at the maximum checkpoint"
);
} else {
assert_eq!(
new_progress,
PreviousCheckpoint(target_checkpoint_height),
"the new previous checkpoint must match the old target"
);
}
// We processed all available checkpoints
//
// We've cleared the target range, so this check should be cheap
let new_target = self.target_checkpoint_height();
assert!(
new_target == WaitingForBlocks || new_target == FinishedVerifying,
"processing must cover all available checkpoints"
);
}
}
/// CheckpointVerifier rejects pending futures on drop.
impl<S> Drop for CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// Send an error on `tx` for any `QueuedBlock`s that haven't been verified.
///
/// We can't implement `Drop` on QueuedBlock, because `send()` consumes
/// `tx`. And `tx` doesn't implement `Copy` or `Default` (for `take()`).
fn drop(&mut self) {
let drop_keys: Vec<_> = self.queued.keys().cloned().collect();
for key in drop_keys {
let mut qblocks = self
.queued
.remove(&key)
.expect("each entry is only removed once");
for qblock in qblocks.drain(..) {
// Sending can fail, but there's nothing we can do about it.
let _ = qblock.tx.send(Err(VerifyCheckpointError::Dropped));
}
}
}
}
#[derive(Debug, Error)]
pub enum VerifyCheckpointError {
#[error("checkpoint request after checkpointing finished")]
Finished,
#[error("block at {height:?} is higher than the maximum checkpoint {max_height:?}")]
TooHigh {
height: block::Height,
max_height: block::Height,
},
#[error("block {height:?} is less than or equal to the verified tip {verified_height:?}")]
AlreadyVerified {
height: block::Height,
verified_height: block::Height,
},
#[error("rejected older of duplicate verification requests for block at {height:?} {hash:?}")]
NewerRequest {
height: block::Height,
hash: block::Hash,
},
#[error("the block {hash:?} does not have a coinbase height")]
CoinbaseHeight { hash: block::Hash },
#[error("merkle root {actual:?} does not match expected {expected:?}")]
BadMerkleRoot {
actual: block::merkle::Root,
expected: block::merkle::Root,
},
#[error("duplicate transactions in block")]
DuplicateTransaction,
#[error("checkpoint verifier was dropped")]
Dropped,
#[error(transparent)]
CommitFinalized(BoxError),
#[error(transparent)]
CheckpointList(BoxError),
#[error("too many queued blocks at this height")]
QueuedLimit,
#[error("the block hash does not match the chained checkpoint hash, expected {expected:?} found {found:?}")]
UnexpectedSideChain {
expected: block::Hash,
found: block::Hash,
},
}
/// The CheckpointVerifier service implementation.
///
/// After verification, the block futures resolve to their hashes.
impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
type Response = block::Hash;
type Error = VerifyCheckpointError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
#[instrument(name = "checkpoint", skip(self, block))]
fn call(&mut self, block: Arc<Block>) -> Self::Future {
// Immediately reject all incoming blocks that arrive after we've finished.
if let FinalCheckpoint = self.previous_checkpoint_height() {
return async { Err(VerifyCheckpointError::Finished) }.boxed();
}
let rx = self.queue_block(block.clone());
self.process_checkpoint_range();
metrics::gauge!("checkpoint.queued_slots", self.queued.len() as f64);
// Because the checkpoint verifier duplicates state from the state
// service (it tracks which checkpoints have been verified), we must
// commit blocks transactionally on a per-checkpoint basis. Otherwise,
// the checkpoint verifier's state could desync from the underlying
// state service. Among other problems, this could cause the checkpoint
// verifier to reject blocks not already in the state as
// already-verified.
//
// To commit blocks transactionally on a per-checkpoint basis, we must
// commit all verified blocks in a checkpoint range, regardless of
// whether or not the response futures for each block were dropped.
//
// We accomplish this by spawning a new task containing the
// commit-if-verified logic. This task will always execute, except if
// the program is interrupted, in which case there is no longer a
// checkpoint verifier to keep in sync with the state.
let mut state_service = self.state_service.clone();
let commit_finalized_block = tokio::spawn(async move {
let hash = rx
.await
.expect("CheckpointVerifier does not leave dangling receivers")?;
// Once we get a verified hash, we must commit it to the chain state
// as a finalized block, or exit the program, so .expect rather than
// propagate errors from the state service.
match state_service
.ready_and()
.await
.expect("Verified checkpoints must be committed transactionally")
.call(zs::Request::CommitFinalizedBlock(block.into()))
.await
.expect("Verified checkpoints must be committed transactionally")
{
zs::Response::Committed(committed_hash) => {
assert_eq!(committed_hash, hash, "state must commit correct hash");
Ok(hash)
}
_ => unreachable!("wrong response for CommitFinalizedBlock"),
}
});
async move {
commit_finalized_block
.await
.expect("commit_finalized_block should not panic")
}
.boxed()
}
}