Separate state commits into checkpoint and full block verify

* stop committing to the state in the ChainVerifier
* commit to the state in the BlockVerifier
* commit to the state in the CheckpointVerifier

Co-authored-by: Jane Lusby <jlusby42@gmail.com>
This commit is contained in:
teor 2020-09-03 14:23:57 +10:00 committed by Henry de Valence
parent 1b76cb0250
commit 6a79953ab6
4 changed files with 156 additions and 76 deletions

View File

@ -39,9 +39,7 @@ where
+ 'static,
S::Future: Send + 'static,
{
/// The underlying `ZebraState`, possibly wrapped in other services.
// TODO: contextual verification
#[allow(dead_code)]
/// The underlying state service, possibly wrapped in other services.
state_service: S,
}
@ -74,7 +72,7 @@ where
}
fn call(&mut self, block: Arc<Block>) -> Self::Future {
let mut state = self.state_service.clone();
let mut state_service = self.state_service.clone();
// TODO(jlusby): Error = Report, handle errors from state_service.
async move {
@ -147,7 +145,7 @@ where
metrics::counter!("block.waiting.count", 1);
let previous_block = BlockVerifier::await_block(
&mut state,
&mut state_service,
previous_block_hash,
expected_height,
)
@ -170,7 +168,20 @@ where
);
metrics::counter!("block.verified.block.count", 1);
Ok(hash)
// We need to add the block after the previous block is in the state,
// and before this future returns. Otherwise, blocks could be
// committed out of order.
let ready_state = state_service
.ready_and()
.await?;
match ready_state.call(zebra_state::Request::AddBlock { block }).await? {
zebra_state::Response::Added { hash: committed_hash } => {
assert_eq!(committed_hash, hash, "state returned wrong hash: hashes must be equal");
Ok(hash)
}
_ => Err(format!("adding block {:?} {:?} to state failed", height, hash))?,
}
}
.boxed()
}
@ -191,9 +202,12 @@ where
/// Get the block for `hash`, using `state`.
///
/// If there is no block for that hash, returns `Ok(None)`.
/// Returns an error if `state.poll_ready` errors.
async fn get_block(state: &mut S, hash: block::Hash) -> Result<Option<Arc<Block>>, Report> {
let block = state
/// Returns an error if `state_service.poll_ready` errors.
async fn get_block(
state_service: &mut S,
hash: block::Hash,
) -> Result<Option<Arc<Block>>, Report> {
let block = state_service
.ready_and()
.await
.map_err(|e| eyre!(e))?
@ -208,16 +222,16 @@ where
Ok(block)
}
/// Wait until a block with `hash` is in `state`.
/// Wait until a block with `hash` is in `state_service`.
///
/// Returns an error if `state.poll_ready` errors.
/// Returns an error if `state_service.poll_ready` errors.
async fn await_block(
state: &mut S,
state_service: &mut S,
hash: block::Hash,
height: block::Height,
) -> Result<Arc<Block>, Report> {
loop {
match BlockVerifier::get_block(state, hash).await? {
match BlockVerifier::get_block(state_service, hash).await? {
Some(block) => return Ok(block),
// Busy-waiting is only a temporary solution to waiting for blocks.
// Replace with the contextual verification RFC design
@ -232,9 +246,9 @@ where
/// Return a block verification service, using the provided state service.
///
/// The block verifier holds a state service of type `S`, used as context for
/// block validation. This state is pluggable to allow for testing or
/// instrumentation.
/// The block verifier holds a state service of type `S`, into which newly
/// verified blocks will be committed. This state is pluggable to allow for
/// testing or instrumentation.
///
/// The returned type is opaque to allow instrumentation or other wrappers, but
/// can be boxed for storage. It is also `Clone` to allow sharing of a

View File

@ -39,10 +39,17 @@ const MAX_EXPECTED_BLOCK_GAP: u32 = 100_000;
/// A wrapper type that holds the `ChainVerifier`'s `CheckpointVerifier`, and
/// its associated state.
#[derive(Clone)]
struct ChainCheckpointVerifier {
struct ChainCheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
/// The underlying `CheckpointVerifier`, wrapped in a buffer, so we can
/// clone and share it with futures.
verifier: Buffer<CheckpointVerifier, Arc<Block>>,
verifier: Buffer<CheckpointVerifier<S>, Arc<Block>>,
/// The maximum checkpoint height for `checkpoint_verifier`.
max_height: block::Height,
@ -67,10 +74,7 @@ where
/// associated state.
///
/// None if all the checkpoints have been verified.
checkpoint: Option<ChainCheckpointVerifier>,
/// The underlying `ZebraState`, possibly wrapped in other services.
state_service: S,
checkpoint: Option<ChainCheckpointVerifier<S>>,
/// The most recent block height that was submitted to the verifier.
///
@ -103,20 +107,19 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// We don't expect the state or verifiers to exert backpressure on our
// users, so we don't need to call `state_service.poll_ready()` here.
// We don't expect the verifiers to exert backpressure on our
// users, so we don't need to call the verifier's `poll_ready` here.
// (And we don't know which verifier to choose at this point, anyway.)
Poll::Ready(Ok(()))
}
fn call(&mut self, block: Arc<Block>) -> Self::Future {
// TODO(jlusby): Error = Report, handle errors from state_service.
// TODO(jlusby): Error = Report
let height = block.coinbase_height();
let hash = block.hash();
let span = tracing::debug_span!("block_verify", ?height, ?hash,);
let mut block_verifier = self.block_verifier.clone();
let mut state_service = self.state_service.clone();
let checkpoint_verifier = self.checkpoint.clone().map(|c| c.verifier);
let max_checkpoint_height = self.checkpoint.clone().map(|c| c.max_height);
@ -148,18 +151,20 @@ where
tracing::debug!("large block height gap: this block or the previous block is out of order");
}
block_verifier
let verified_hash = block_verifier
.ready_and()
.await?
.call(block.clone())
.await?;
assert_eq!(verified_hash, hash, "block verifier returned wrong hash: hashes must be equal");
} else {
checkpoint_verifier
let verified_hash = checkpoint_verifier
.expect("missing checkpoint verifier: verifier must be Some if max checkpoint height is Some")
.ready_and()
.await?
.call(block.clone())
.await?;
assert_eq!(verified_hash, hash, "checkpoint verifier returned wrong hash: hashes must be equal");
}
tracing::trace!(?height, ?hash, "verified block");
@ -169,15 +174,7 @@ where
);
metrics::counter!("chain.verified.block.count", 1);
let add_block = state_service
.ready_and()
.await?
.call(zebra_state::Request::AddBlock { block });
match add_block.await? {
zebra_state::Response::Added { hash } => Ok(hash),
_ => Err("adding block to zebra-state failed".into()),
}
Ok(hash)
}
.instrument(span)
.boxed()
@ -253,10 +250,6 @@ where
/// Return a chain verification service, using the provided block verifier,
/// checkpoint list, and state service.
///
/// The chain verifier holds a state service of type `S`, used as context for
/// block validation and to which newly verified blocks will be committed. This
/// state is pluggable to allow for testing or instrumentation.
///
/// The returned type is opaque to allow instrumentation or other wrappers, but
/// can be boxed for storage. It is also `Clone` to allow sharing of a
/// verification service.
@ -320,12 +313,12 @@ where
(Some(initial_height), _, Some(max_checkpoint_height)) if (initial_height > max_checkpoint_height) => None,
// No list, no checkpoint verifier
(_, None, _) => None,
(_, Some(_), None) => panic!("Missing max checkpoint height: height must be Some if verifier is Some"),
// We've done all the checks we need to create a checkpoint verifier
(_, Some(list), Some(max_height)) => Some(
ChainCheckpointVerifier {
verifier: Buffer::new(CheckpointVerifier::from_checkpoint_list(list, initial_tip), 1),
verifier: Buffer::new(CheckpointVerifier::from_checkpoint_list(list, initial_tip, state_service), 1),
max_height,
}),
};
@ -334,7 +327,6 @@ where
ChainVerifier {
block_verifier,
checkpoint,
state_service,
last_block_height: initial_height,
},
1,

View File

@ -36,7 +36,7 @@ use std::{
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower::Service;
use tower::{Service, ServiceExt};
use zebra_chain::{
block::{self, Block},
@ -87,7 +87,14 @@ pub const MAX_CHECKPOINT_HEIGHT_GAP: usize = 2_000;
/// Verifies blocks using a supplied list of checkpoints. There must be at
/// least one checkpoint for the genesis block.
#[derive(Debug)]
pub struct CheckpointVerifier {
pub struct CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
// Inputs
//
/// The checkpoint list for this verifier.
@ -96,6 +103,9 @@ pub struct CheckpointVerifier {
/// The hash of the initial tip, if any.
initial_tip_hash: Option<block::Hash>,
/// The underlying state service, possibly wrapped in other services.
state_service: S,
// Queued Blocks
//
/// A queue of unverified blocks.
@ -117,17 +127,30 @@ pub struct CheckpointVerifier {
/// The CheckpointVerifier implementation.
///
/// Contains non-service utility functions for CheckpointVerifiers.
impl CheckpointVerifier {
impl<S> CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
/// Return a checkpoint verification service for `network`, using the
/// hard-coded checkpoint list. If `initial_tip` is Some(_), the
/// verifier starts at that initial tip, which does not have to be in the
/// hard-coded checkpoint list.
/// hard-coded checkpoint list, and the provided `state_service`.
///
/// If `initial_tip` is Some(_), the verifier starts at that initial tip.
/// The initial tip can be between the checkpoints in the hard-coded
/// checkpoint list.
///
/// The checkpoint verifier holds a state service of type `S`, into which newly
/// verified blocks will be committed. This state is pluggable to allow for
/// testing or instrumentation.
///
/// This function should be called only once for a particular network, rather
/// than constructing multiple verification services for the same network. To
/// clone a CheckpointVerifier, you might need to wrap it in a
/// `tower::Buffer` service.
pub fn new(network: Network, initial_tip: Option<Arc<Block>>) -> Self {
pub fn new(network: Network, initial_tip: Option<Arc<Block>>, state_service: S) -> Self {
let checkpoint_list = CheckpointList::new(network);
let max_height = checkpoint_list.max_height();
let initial_height = initial_tip.clone().map(|b| b.coinbase_height()).flatten();
@ -137,38 +160,44 @@ impl CheckpointVerifier {
?initial_height,
"initialising CheckpointVerifier"
);
Self::from_checkpoint_list(checkpoint_list, initial_tip)
Self::from_checkpoint_list(checkpoint_list, initial_tip, state_service)
}
/// Return a checkpoint verification service using `list` and `initial_tip`.
/// Return a checkpoint verification service using `list`, `initial_tip`,
/// and `state_service`.
///
/// Assumes that the provided genesis checkpoint is correct.
///
/// Callers should prefer `CheckpointVerifier::new`, which uses the
/// hard-coded checkpoint lists. See `CheckpointVerifier::new` and
/// `CheckpointList::from_list` for more details.
//
// This function is designed for use in tests.
/// hard-coded checkpoint lists, or `CheckpointList::from_list` if you need
/// to specify a custom checkpoint list. See those functions for more
/// details.
///
/// This function is designed for use in tests.
#[allow(dead_code)]
pub(crate) fn from_list(
list: impl IntoIterator<Item = (block::Height, block::Hash)>,
initial_tip: Option<Arc<Block>>,
state_service: S,
) -> Result<Self, Error> {
Ok(Self::from_checkpoint_list(
CheckpointList::from_list(list)?,
initial_tip,
state_service,
))
}
/// Return a checkpoint verification service using `checkpoint_list` and
/// `initial_tip`.
/// Return a checkpoint verification service using `checkpoint_list`,
/// `initial_tip`, and `state_service`.
///
/// Assumes that the provided genesis checkpoint is correct.
///
/// Callers should prefer `CheckpointVerifier::new`, which uses the
/// hard-coded checkpoint lists. See `CheckpointVerifier::new` and
/// `CheckpointList::from_list` for more details.
/// hard-coded checkpoint lists. See that function for more details.
pub(crate) fn from_checkpoint_list(
checkpoint_list: CheckpointList,
initial_tip: Option<Arc<Block>>,
state_service: S,
) -> Self {
// All the initialisers should call this function, so we only have to
// change fields or default values in one place.
@ -193,6 +222,7 @@ impl CheckpointVerifier {
CheckpointVerifier {
checkpoint_list,
initial_tip_hash,
state_service,
queued: BTreeMap::new(),
verifier_progress,
}
@ -702,7 +732,14 @@ impl CheckpointVerifier {
}
/// CheckpointVerifier rejects pending futures on drop.
impl Drop for CheckpointVerifier {
impl<S> Drop for CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
/// Send an error on `tx` for any `QueuedBlock`s that haven't been verified.
///
/// We can't implement `Drop` on QueuedBlock, because `send()` consumes
@ -727,7 +764,14 @@ impl Drop for CheckpointVerifier {
/// The CheckpointVerifier service implementation.
///
/// After verification, the block futures resolve to their hashes.
impl Service<Arc<Block>> for CheckpointVerifier {
impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
type Response = block::Hash;
type Error = Error;
type Future =
@ -741,11 +785,11 @@ impl Service<Arc<Block>> for CheckpointVerifier {
}
fn call(&mut self, block: Arc<Block>) -> Self::Future {
// TODO(jlusby): Error = Report
let state_service = self.state_service.clone();
// Queue the block for verification, until we receive all the blocks for
// the current checkpoint range.
let rx = self.queue_block(block);
let rx = self.queue_block(block.clone());
// Try to verify from the previous checkpoint to a target checkpoint.
//
@ -754,15 +798,31 @@ impl Service<Arc<Block>> for CheckpointVerifier {
// on the next call(). Failures always reject a block, so we know
// there will be at least one more call().
//
// TODO(teor): retry on failure (low priority, failures should be rare)
// We don't retry with a smaller range on failure, because failures
// should be rare.
self.process_checkpoint_range();
metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64);
async move {
// Remove the Result<..., RecvError> wrapper from the channel future
rx.await
.expect("CheckpointVerifier does not leave dangling receivers")
match rx.await.expect(
"unexpected closed receiver: CheckpointVerifier does not leave dangling receivers",
) {
Ok(hash) => {
let verified_hash = match state_service
.oneshot(zebra_state::Request::AddBlock { block })
.await? {
zebra_state::Response::Added { hash } => hash,
_ => unreachable!("unexpected response type: state service should return the correct response type for each request"),
};
assert_eq!(
verified_hash, hash,
"state service returned wrong hash: hashes must be equal"
);
Ok(hash)
}
Err(e) => Err(e)?,
}
}
.boxed()
}

View File

@ -12,6 +12,7 @@ use tokio::{stream::StreamExt, time::timeout};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::parameters::Network::*;
use zebra_chain::serialization::ZcashDeserialize;
/// The timeout we apply to each verify future during testing.
@ -44,8 +45,10 @@ async fn single_item_checkpoint_list() -> Result<(), Report> {
.cloned()
.collect();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
let mut checkpoint_verifier =
CheckpointVerifier::from_list(genesis_checkpoint_list, None).map_err(|e| eyre!(e))?;
CheckpointVerifier::from_list(genesis_checkpoint_list, None, state_service)
.map_err(|e| eyre!(e))?;
assert_eq!(
checkpoint_verifier.previous_checkpoint_height(),
@ -124,8 +127,10 @@ async fn multi_item_checkpoint_list() -> Result<(), Report> {
.map(|(_block, height, hash)| (*height, *hash))
.collect();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
let mut checkpoint_verifier =
CheckpointVerifier::from_list(checkpoint_list, None).map_err(|e| eyre!(e))?;
CheckpointVerifier::from_list(checkpoint_list, None, state_service)
.map_err(|e| eyre!(e))?;
assert_eq!(
checkpoint_verifier.previous_checkpoint_height(),
@ -261,8 +266,10 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
let initial_tip = restart_height
.map(|block::Height(height)| &blockchain[height as usize].0)
.cloned();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
let mut checkpoint_verifier =
CheckpointVerifier::from_list(checkpoint_list, initial_tip).map_err(|e| eyre!(e))?;
CheckpointVerifier::from_list(checkpoint_list, initial_tip, state_service)
.map_err(|e| eyre!(e))?;
// Setup checks
if restart_height
@ -382,8 +389,10 @@ async fn block_higher_than_max_checkpoint_fail() -> Result<(), Report> {
.cloned()
.collect();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
let mut checkpoint_verifier =
CheckpointVerifier::from_list(genesis_checkpoint_list, None).map_err(|e| eyre!(e))?;
CheckpointVerifier::from_list(genesis_checkpoint_list, None, state_service)
.map_err(|e| eyre!(e))?;
assert_eq!(
checkpoint_verifier.previous_checkpoint_height(),
@ -457,8 +466,10 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> {
.cloned()
.collect();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
let mut checkpoint_verifier =
CheckpointVerifier::from_list(genesis_checkpoint_list, None).map_err(|e| eyre!(e))?;
CheckpointVerifier::from_list(genesis_checkpoint_list, None, state_service)
.map_err(|e| eyre!(e))?;
assert_eq!(
checkpoint_verifier.previous_checkpoint_height(),
@ -637,8 +648,10 @@ async fn checkpoint_drop_cancel() -> Result<(), Report> {
.map(|(_block, height, hash)| (*height, *hash))
.collect();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
let mut checkpoint_verifier =
CheckpointVerifier::from_list(checkpoint_list, None).map_err(|e| eyre!(e))?;
CheckpointVerifier::from_list(checkpoint_list, None, state_service)
.map_err(|e| eyre!(e))?;
assert_eq!(
checkpoint_verifier.previous_checkpoint_height(),
@ -721,8 +734,9 @@ async fn hard_coded_mainnet() -> Result<(), Report> {
Arc::<Block>::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])?;
let hash0 = block0.hash();
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet);
// Use the hard-coded checkpoint list
let mut checkpoint_verifier = CheckpointVerifier::new(Network::Mainnet, None);
let mut checkpoint_verifier = CheckpointVerifier::new(Network::Mainnet, None, state_service);
assert_eq!(
checkpoint_verifier.previous_checkpoint_height(),