From b8b1239ac4e0ca829efea217634adfcff4815aac Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 21 Jul 2020 11:34:23 +1000 Subject: [PATCH] feature: Implement a basic ChainVerifier service The ChainVerifier service chooses between CheckpointVerifier and BlockVerifier, based on the block's height. --- zebra-consensus/src/chain.rs | 178 ++++++++++++ zebra-consensus/src/chain/tests.rs | 368 +++++++++++++++++++++++++ zebra-consensus/src/checkpoint.rs | 10 +- zebra-consensus/src/checkpoint/list.rs | 2 +- zebra-consensus/src/lib.rs | 1 + 5 files changed, 555 insertions(+), 4 deletions(-) create mode 100644 zebra-consensus/src/chain.rs create mode 100644 zebra-consensus/src/chain/tests.rs diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs new file mode 100644 index 000000000..d8f0b1946 --- /dev/null +++ b/zebra-consensus/src/chain.rs @@ -0,0 +1,178 @@ +//! Chain state updates for Zebra. +//! +//! Chain state updates occur in multiple stages: +//! - verify blocks (using `BlockVerifier` or `CheckpointVerifier`) +//! - update the list of verified blocks on disk +//! - create the chain state needed to verify child blocks +//! - choose the best tip from all the available chain tips +//! - update the mature chain state on disk +//! - prune orphaned side-chains +//! +//! Chain state updates are provided via a `tower::Service`, to support +//! backpressure and batch verification. + +#[cfg(test)] +mod tests; + +use crate::checkpoint::CheckpointVerifier; + +use futures_util::FutureExt; +use std::{ + error, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tower::{buffer::Buffer, Service, ServiceExt}; + +use zebra_chain::block::{Block, BlockHeaderHash}; +use zebra_chain::types::BlockHeight; + +struct ChainVerifier { + /// The underlying `BlockVerifier`, possibly wrapped in other services. + block_verifier: BV, + + /// The underlying `CheckpointVerifier`, wrapped in a buffer, so we can + /// clone and share it with futures. + checkpoint_verifier: Buffer>, + /// The maximum checkpoint height for `checkpoint_verifier`. + max_checkpoint_height: BlockHeight, + + /// The underlying `ZebraState`, possibly wrapped in other services. + state_service: S, +} + +/// The error type for the ChainVerifier Service. +// TODO(jlusby): Error = Report ? +type Error = Box; + +/// The ChainVerifier service implementation. +/// +/// After verification, blocks are added to the underlying state service. +impl Service> for ChainVerifier +where + BV: Service, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, + BV::Future: Send + 'static, + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ + type Response = BlockHeaderHash; + type Error = Error; + type Future = + Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + // We don't expect the state or verifiers to exert backpressure on our + // users, so we don't need to call `state_service.poll_ready()` here. + // (And we don't know which verifier to choose at this point, anyway.) + Poll::Ready(Ok(())) + } + + fn call(&mut self, block: Arc) -> Self::Future { + // TODO(jlusby): Error = Report, handle errors from state_service. + let mut block_verifier = self.block_verifier.clone(); + let mut checkpoint_verifier = self.checkpoint_verifier.clone(); + let mut state_service = self.state_service.clone(); + let max_checkpoint_height = self.max_checkpoint_height; + + async move { + // Call a verifier based on the block height and checkpoints + // + // TODO(teor): for post-sapling checkpoint blocks, allow callers + // to use BlockVerifier, CheckpointVerifier, or both. + match block.coinbase_height() { + Some(height) if (height <= max_checkpoint_height) => { + checkpoint_verifier + .ready_and() + .await? + .call(block.clone()) + .await? + } + Some(_) => { + block_verifier + .ready_and() + .await? + .call(block.clone()) + .await? + } + None => return Err("Invalid block: must have a coinbase height".into()), + }; + + // TODO(teor): + // - handle chain reorgs + // - adjust state_service "unique block height" conditions + + // `Tower::Buffer` requires a 1:1 relationship between `poll()`s + // and `call()`s, because it reserves a buffer slot in each + // `call()`. + let add_block = state_service + .ready_and() + .await? + .call(zebra_state::Request::AddBlock { block }); + + match add_block.await? { + zebra_state::Response::Added { hash } => Ok(hash), + _ => Err("adding block to zebra-state failed".into()), + } + } + .boxed() + } +} + +/// Return a chain verification service, using the provided verifier and state +/// services. +/// +/// The chain verifier holds a state service of type `S`, used as context for +/// block validation and to which newly verified blocks will be committed. This +/// state is pluggable to allow for testing or instrumentation. +/// +/// The returned type is opaque to allow instrumentation or other wrappers, but +/// can be boxed for storage. It is also `Clone` to allow sharing of a +/// verification service. +/// +/// This function should only be called once for a particular state service and +/// verifiers (and the result be shared, cloning if needed). Constructing +/// multiple services from the same underlying state might cause synchronisation +/// bugs. +// +// Only used by tests and other modules +#[allow(dead_code)] +pub fn init( + block_verifier: BV, + checkpoint_verifier: CheckpointVerifier, + state_service: S, +) -> impl Service< + Arc, + Response = BlockHeaderHash, + Error = Error, + Future = impl Future>, +> + Send + + Clone + + 'static +where + BV: Service, Response = BlockHeaderHash, Error = Error> + Send + Clone + 'static, + BV::Future: Send + 'static, + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ + let max_checkpoint_height = checkpoint_verifier.list().max_height(); + // Wrap the checkpoint verifier in a buffer, so we can share it + let checkpoint_verifier = Buffer::new(checkpoint_verifier, 1); + + Buffer::new( + ChainVerifier { + block_verifier, + checkpoint_verifier, + max_checkpoint_height, + state_service, + }, + 1, + ) +} diff --git a/zebra-consensus/src/chain/tests.rs b/zebra-consensus/src/chain/tests.rs new file mode 100644 index 000000000..d7d286fc6 --- /dev/null +++ b/zebra-consensus/src/chain/tests.rs @@ -0,0 +1,368 @@ +//! Tests for chain verification + +use super::*; + +use crate::checkpoint::CheckpointList; + +use color_eyre::eyre::Report; +use color_eyre::eyre::{bail, eyre}; +use futures::future::TryFutureExt; +use std::mem::drop; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use tokio::time::timeout; +use tower::{util::ServiceExt, Service}; + +use zebra_chain::block::{Block, BlockHeader}; +use zebra_chain::serialization::ZcashDeserialize; +use zebra_chain::Network::{self, *}; + +/// The timeout we apply to each verify future during testing. +/// +/// The checkpoint verifier uses `tokio::sync::oneshot` channels as futures. +/// If the verifier doesn't send a message on the channel, any tests that +/// await the channel future will hang. +/// +/// This value is set to a large value, to avoid spurious failures due to +/// high system load. +const VERIFY_TIMEOUT_SECONDS: u64 = 10; + +/// Generate a block with no transactions (not even a coinbase transaction). +/// +/// The generated block should fail validation. +pub fn block_no_transactions() -> Block { + Block { + header: BlockHeader::zcash_deserialize(&zebra_test::vectors::DUMMY_HEADER[..]).unwrap(), + transactions: Vec::new(), + } +} + +/// Return a new `(chain_verifier, state_service)` using `checkpoint_list`. +/// +/// Also creates a new block verfier and checkpoint verifier, so it can +/// initialise the chain verifier. +fn verifiers_from_checkpoint_list( + checkpoint_list: CheckpointList, +) -> ( + impl Service< + Arc, + Response = BlockHeaderHash, + Error = Error, + Future = impl Future>, + > + Send + + Clone + + 'static, + impl Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = Error, + Future = impl Future>, + > + Send + + Clone + + 'static, +) { + let state_service = zebra_state::in_memory::init(); + let block_verifier = crate::block::init(state_service.clone()); + let checkpoint_verifier = + crate::checkpoint::CheckpointVerifier::from_checkpoint_list(checkpoint_list); + let chain_verifier = super::init(block_verifier, checkpoint_verifier, state_service.clone()); + + (chain_verifier, state_service) +} + +/// Return a new `(chain_verifier, state_service)` using the hard-coded +/// checkpoint list for `network`. +fn verifiers_from_network( + network: Network, +) -> ( + impl Service< + Arc, + Response = BlockHeaderHash, + Error = Error, + Future = impl Future>, + > + Send + + Clone + + 'static, + impl Service< + zebra_state::Request, + Response = zebra_state::Response, + Error = Error, + Future = impl Future>, + > + Send + + Clone + + 'static, +) { + verifiers_from_checkpoint_list(CheckpointList::new(network)) +} + +#[tokio::test] +async fn verify_block_test() -> Result<(), Report> { + verify_block().await +} + +/// Test that block verifies work +/// +/// Uses a custom checkpoint list, containing only the genesis block. Since the +/// maximum checkpoint height is 0, non-genesis blocks are verified using the +/// BlockVerifier. +#[spandoc::spandoc] +async fn verify_block() -> Result<(), Report> { + zebra_test::init(); + + // Parse the genesis block + let mut checkpoint_data = Vec::new(); + let block0 = + Arc::::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])?; + let hash0: BlockHeaderHash = block0.as_ref().into(); + checkpoint_data.push(( + block0.coinbase_height().expect("test block has height"), + hash0, + )); + + // Make a checkpoint list containing the genesis block + let checkpoint_list: BTreeMap = + checkpoint_data.iter().cloned().collect(); + let checkpoint_list = CheckpointList::from_list(checkpoint_list).map_err(|e| eyre!(e))?; + + let (mut chain_verifier, _) = verifiers_from_checkpoint_list(checkpoint_list); + + let block1 = Arc::::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_1_BYTES[..])?; + let hash1: BlockHeaderHash = block1.as_ref().into(); + + /// SPANDOC: Make sure the verifier service is ready + let ready_verifier_service = chain_verifier.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Set up the future + let verify_future = timeout( + Duration::from_secs(VERIFY_TIMEOUT_SECONDS), + ready_verifier_service.call(block1.clone()), + ); + /// SPANDOC: Verify the block + // TODO(teor || jlusby): check error kind + let verify_response = verify_future + .map_err(|e| eyre!(e)) + .await + .expect("timeout should not happen") + .expect("block should verify"); + + assert_eq!(verify_response, hash1); + + Ok(()) +} + +#[tokio::test] +async fn verify_checkpoint_test() -> Result<(), Report> { + verify_checkpoint().await +} + +/// Test that checkpoint verifies work +#[spandoc::spandoc] +async fn verify_checkpoint() -> Result<(), Report> { + zebra_test::init(); + + let block = + Arc::::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])?; + let hash: BlockHeaderHash = block.as_ref().into(); + + let (mut chain_verifier, _) = verifiers_from_network(Mainnet); + + /// SPANDOC: Make sure the verifier service is ready + let ready_verifier_service = chain_verifier.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Set up the future + let verify_future = timeout( + Duration::from_secs(VERIFY_TIMEOUT_SECONDS), + ready_verifier_service.call(block.clone()), + ); + /// SPANDOC: Verify the block + // TODO(teor || jlusby): check error kind + let verify_response = verify_future + .map_err(|e| eyre!(e)) + .await + .expect("timeout should not happen") + .expect("block should verify"); + + assert_eq!(verify_response, hash); + + Ok(()) +} + +#[tokio::test] +async fn verify_fail_no_coinbase_test() -> Result<(), Report> { + verify_fail_no_coinbase().await +} + +/// Test that blocks with no coinbase height are rejected by the ChainVerifier +/// +/// ChainVerifier uses the block height to decide between the CheckpointVerifier +/// and BlockVerifier. This is the error case, where there is no height. +#[spandoc::spandoc] +async fn verify_fail_no_coinbase() -> Result<(), Report> { + zebra_test::init(); + + let block = block_no_transactions(); + let hash: BlockHeaderHash = (&block).into(); + + let (mut chain_verifier, mut state_service) = verifiers_from_network(Mainnet); + + /// SPANDOC: Make sure the verifier service is ready + let ready_verifier_service = chain_verifier.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Set up the future to verify the block + let verify_future = timeout( + Duration::from_secs(VERIFY_TIMEOUT_SECONDS), + ready_verifier_service.call(block.into()), + ); + /// SPANDOC: Verify the block + // TODO(teor || jlusby): check error kind + let _ = verify_future + .map_err(|e| eyre!(e)) + .await + .expect("timeout should not happen") + .unwrap_err(); + + /// SPANDOC: Make sure the state service is ready + let ready_state_service = state_service.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: The state should not contain failed blocks + let _ = ready_state_service + .call(zebra_state::Request::GetBlock { hash }) + .await + .expect_err("failed block should not be in state"); + + Ok(()) +} + +#[tokio::test] +async fn round_trip_checkpoint_test() -> Result<(), Report> { + round_trip_checkpoint().await +} + +/// Test that state updates work +#[spandoc::spandoc] +async fn round_trip_checkpoint() -> Result<(), Report> { + zebra_test::init(); + + let block = + Arc::::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])?; + let hash: BlockHeaderHash = block.as_ref().into(); + + let (mut chain_verifier, mut state_service) = verifiers_from_network(Mainnet); + + /// SPANDOC: Make sure the verifier service is ready + let ready_verifier_service = chain_verifier.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Set up the future + let verify_future = timeout( + Duration::from_secs(VERIFY_TIMEOUT_SECONDS), + ready_verifier_service.call(block.clone()), + ); + /// SPANDOC: Verify the block + // TODO(teor || jlusby): check error kind + let verify_response = verify_future + .map_err(|e| eyre!(e)) + .await + .expect("timeout should not happen") + .expect("block should verify"); + + assert_eq!(verify_response, hash); + + /// SPANDOC: Make sure the state service is ready + let ready_state_service = state_service.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Make sure the block was added to the state + let state_response = ready_state_service + .call(zebra_state::Request::GetBlock { hash }) + .await + .map_err(|e| eyre!(e))?; + + if let zebra_state::Response::Block { + block: returned_block, + } = state_response + { + assert_eq!(block, returned_block); + } else { + bail!("unexpected response kind: {:?}", state_response); + } + + Ok(()) +} + +#[tokio::test] +async fn verify_fail_add_block_checkpoint_test() -> Result<(), Report> { + verify_fail_add_block_checkpoint().await +} + +/// Test that the state rejects duplicate block adds +#[spandoc::spandoc] +async fn verify_fail_add_block_checkpoint() -> Result<(), Report> { + zebra_test::init(); + + let block = + Arc::::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])?; + let hash: BlockHeaderHash = block.as_ref().into(); + + let (mut chain_verifier, mut state_service) = verifiers_from_network(Mainnet); + + /// SPANDOC: Make sure the verifier service is ready (1/2) + let ready_verifier_service = chain_verifier.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Set up the future to verify the block for the first time + let verify_future = timeout( + Duration::from_secs(VERIFY_TIMEOUT_SECONDS), + ready_verifier_service.call(block.clone()), + ); + /// SPANDOC: Verify the block for the first time + // TODO(teor || jlusby): check error kind + let verify_response = verify_future + .map_err(|e| eyre!(e)) + .await + .expect("timeout should not happen") + .expect("block should verify"); + + assert_eq!(verify_response, hash); + + /// SPANDOC: Make sure the state service is ready (1/2) + let ready_state_service = state_service.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Make sure the block was added to the state + let state_response = ready_state_service + .call(zebra_state::Request::GetBlock { hash }) + .await + .map_err(|e| eyre!(e))?; + + if let zebra_state::Response::Block { + block: returned_block, + } = state_response + { + assert_eq!(block, returned_block); + } else { + bail!("unexpected response kind: {:?}", state_response); + } + + /// SPANDOC: Make sure the verifier service is ready (2/2) + let ready_verifier_service = chain_verifier.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: Set up the future to verify the block for the first time + let verify_future = timeout( + Duration::from_secs(VERIFY_TIMEOUT_SECONDS), + ready_verifier_service.call(block.clone()), + ); + /// SPANDOC: Verify the block for the first time + // TODO(teor): ignore duplicate block verifies? + // TODO(teor || jlusby): check error kind + let _ = verify_future + .map_err(|e| eyre!(e)) + .await + .expect("timeout should not happen") + .unwrap_err(); + + /// SPANDOC: Make sure the state service is ready (2/2) + let ready_state_service = state_service.ready_and().await.map_err(|e| eyre!(e))?; + /// SPANDOC: But the state should still return the original block we added + let state_response = ready_state_service + .call(zebra_state::Request::GetBlock { hash }) + .await + .map_err(|e| eyre!(e))?; + + if let zebra_state::Response::Block { + block: returned_block, + } = state_response + { + assert_eq!(block, returned_block); + } else { + bail!("unexpected response kind: {:?}", state_response); + } + + Ok(()) +} diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index fe79b9ade..790fe1b36 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -13,13 +13,13 @@ //! Verification is provided via a `tower::Service`, to support backpressure and batch //! verification. -mod list; +pub(crate) mod list; mod types; #[cfg(test)] mod tests; -use list::CheckpointList; +pub(crate) use list::CheckpointList; use types::{Progress, Progress::*}; use types::{Target, Target::*}; @@ -82,7 +82,7 @@ pub const MAX_QUEUED_BLOCKS_PER_HEIGHT: usize = 4; /// Verifies blocks using a supplied list of checkpoints. There must be at /// least one checkpoint for the genesis block. #[derive(Debug)] -struct CheckpointVerifier { +pub struct CheckpointVerifier { // Inputs // /// The checkpoint list for this verifier. @@ -158,6 +158,10 @@ impl CheckpointVerifier { } } + pub(crate) fn list(&self) -> &CheckpointList { + &self.checkpoint_list + } + /// Return the current verifier's progress. /// /// If verification has not started yet, returns `BeforeGenesis`. diff --git a/zebra-consensus/src/checkpoint/list.rs b/zebra-consensus/src/checkpoint/list.rs index 45b1bdf47..14aecf34e 100644 --- a/zebra-consensus/src/checkpoint/list.rs +++ b/zebra-consensus/src/checkpoint/list.rs @@ -32,7 +32,7 @@ type Error = Box; /// (zcashd allows chain reorganizations up to 99 blocks, and prunes /// orphaned side-chains after 288 blocks.) #[derive(Debug)] -pub struct CheckpointList(BTreeMap); +pub(crate) struct CheckpointList(BTreeMap); impl FromStr for CheckpointList { type Err = Error; diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index 8f6f52af7..67c32991d 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -16,6 +16,7 @@ #![allow(clippy::try_err)] pub mod block; +pub mod chain; pub mod checkpoint; pub mod mempool; pub mod parameters;