From 93cc6957b1a9ca77df312d445eca294bcff7b40a Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Wed, 9 Sep 2020 18:53:40 -0700 Subject: [PATCH] consensus: partially update to new state API This disables one test that can't be easily fixed at the moment, because it tests the wrong thing: the checkpoint and block verifiers will produce different transcripts. It also disables the initial_tip logic for now, pending simplification of the ChainVerifier logic. --- zebra-consensus/src/block.rs | 117 +++++++----------------- zebra-consensus/src/block/check.rs | 8 +- zebra-consensus/src/chain.rs | 51 ++++++----- zebra-consensus/src/chain/tests.rs | 75 ++++++++------- zebra-consensus/src/checkpoint.rs | 100 +++++++++----------- zebra-consensus/src/checkpoint/tests.rs | 2 +- zebra-consensus/src/lib.rs | 3 + 7 files changed, 150 insertions(+), 206 deletions(-) diff --git a/zebra-consensus/src/block.rs b/zebra-consensus/src/block.rs index 57e4d6bd2..77a2153cd 100644 --- a/zebra-consensus/src/block.rs +++ b/zebra-consensus/src/block.rs @@ -8,57 +8,44 @@ //! Verification is provided via a `tower::Service`, to support backpressure and batch //! verification. -mod check; - -#[cfg(test)] -mod tests; - -use chrono::Utc; -use color_eyre::eyre::{eyre, Report}; -use futures_util::FutureExt; use std::{ - error, future::Future, pin::Pin, sync::Arc, task::{Context, Poll}, }; + +use chrono::Utc; +use futures_util::FutureExt; use tower::{buffer::Buffer, Service, ServiceExt}; use zebra_chain::block::{self, Block}; +use zebra_state as zs; + +use crate::BoxError; + +mod check; +#[cfg(test)] +mod tests; /// A service that verifies blocks. #[derive(Debug)] struct BlockVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { /// The underlying state service, possibly wrapped in other services. state_service: S, } -/// The error type for the BlockVerifier Service. -// TODO(jlusby): Error = Report ? -type Error = Box; - -/// The BlockVerifier service implementation. -/// -/// The state service is only used for contextual verification. -/// (The `ChainVerifier` updates the state.) impl Service> for BlockVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { type Response = block::Hash; - type Error = Error; + type Error = BoxError; type Future = Pin> + Send + 'static>>; @@ -76,6 +63,19 @@ where async move { let hash = block.hash(); + // Check that this block is actually a new block. + match state_service.ready_and().await?.call(zs::Request::Depth(hash)).await? { + zs::Response::Depth(Some(depth)) => { + return Err(format!( + "block {} is already in the chain at depth {:?}", + hash, + depth, + ).into()) + }, + zs::Response::Depth(None) => {}, + _ => unreachable!("wrong response to Request::Depth"), + } + // These checks only apply to generated blocks. We check the block // height for parsed blocks when we deserialize them. let height = block @@ -89,13 +89,6 @@ where block::Height::MAX))?; } - // Check that this block is actually a new block - if BlockVerifier::get_block(&mut state_service, hash).await?.is_some() { - Err(format!("duplicate block {:?} {:?}: block has already been verified", - height, - hash))?; - } - // Do the difficulty checks first, to raise the threshold for // attacks that use any other fields. let difficulty_threshold = block @@ -130,60 +123,19 @@ where ); metrics::counter!("block.verified.block.count", 1); - // Commit the block in the future - the state will handle out of - // order blocks. - let ready_state = state_service - .ready_and() - .await?; - - match ready_state.call(zebra_state::Request::AddBlock { block }).await? { - zebra_state::Response::Added { hash: committed_hash } => { + // Finally, submit the block for contextual verification. + match state_service.oneshot(zs::Request::CommitBlock{ block }).await? { + zs::Response::Committed(committed_hash) => { assert_eq!(committed_hash, hash, "state returned wrong hash: hashes must be equal"); Ok(hash) } - _ => Err(format!("adding block {:?} {:?} to state failed", height, hash))?, + _ => unreachable!("wrong response to CommitBlock"), } } .boxed() } } -/// The BlockVerifier implementation. -/// -/// The state service is only used for contextual verification. -/// (The `ChainVerifier` updates the state.) -impl BlockVerifier -where - S: Service - + Send - + Clone - + 'static, - S::Future: Send + 'static, -{ - /// Get the block for `hash`, using `state`. - /// - /// If there is no block for that hash, returns `Ok(None)`. - /// Returns an error if `state_service.poll_ready` errors. - async fn get_block( - state_service: &mut S, - hash: block::Hash, - ) -> Result>, Report> { - let block = state_service - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::GetBlock { hash }) - .await - .map(|response| match response { - zebra_state::Response::Block { block } => block, - _ => unreachable!("GetBlock request can only result in Response::Block"), - }) - .ok(); - - Ok(block) - } -} - /// Return a block verification service, using the provided state service. /// /// The block verifier holds a state service of type `S`, into which newly @@ -202,16 +154,13 @@ pub fn init( ) -> impl Service< Arc, Response = block::Hash, - Error = Error, - Future = impl Future>, + Error = BoxError, + Future = impl Future>, > + Send + Clone + 'static where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { Buffer::new(BlockVerifier { state_service }, 1) diff --git a/zebra-consensus/src/block/check.rs b/zebra-consensus/src/block/check.rs index b4f54ba7f..8533a7d72 100644 --- a/zebra-consensus/src/block/check.rs +++ b/zebra-consensus/src/block/check.rs @@ -1,12 +1,14 @@ //! Consensus check functions -use super::*; use chrono::{DateTime, Utc}; + use zebra_chain::{ block::{Block, Header}, work::equihash, }; +use crate::BoxError; + /// Check that there is exactly one coinbase transaction in `Block`, and that /// the coinbase transaction is the first transaction in the block. /// @@ -15,7 +17,7 @@ use zebra_chain::{ /// fees paid by transactions included in this block." [§3.10][3.10] /// /// [3.10]: https://zips.z.cash/protocol/protocol.pdf#coinbasetransactions -pub fn is_coinbase_first(block: &Block) -> Result<(), Error> { +pub fn is_coinbase_first(block: &Block) -> Result<(), BoxError> { let first = block .transactions .get(0) @@ -49,6 +51,6 @@ pub fn is_equihash_solution_valid(header: &Header) -> Result<(), equihash::Error /// accepted." [§7.5][7.5] /// /// [7.5]: https://zips.z.cash/protocol/protocol.pdf#blockheader -pub fn is_time_valid_at(header: &Header, now: DateTime) -> Result<(), Error> { +pub fn is_time_valid_at(header: &Header, now: DateTime) -> Result<(), BoxError> { header.is_time_valid_at(now) } diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 969729c70..5319f3ec9 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -28,8 +28,12 @@ use std::{ use tower::{buffer::Buffer, Service, ServiceExt}; use tracing_futures::Instrument; -use zebra_chain::block::{self, Block}; -use zebra_chain::parameters::{Network, NetworkUpgrade::Sapling}; +use zebra_chain::{ + block::{self, Block}, + parameters::{Network, NetworkUpgrade::Sapling}, +}; + +use zebra_state as zs; /// The maximum expected gap between blocks. /// @@ -41,10 +45,7 @@ const MAX_EXPECTED_BLOCK_GAP: u32 = 100_000; #[derive(Clone)] struct ChainCheckpointVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { /// The underlying `CheckpointVerifier`, wrapped in a buffer, so we can @@ -61,10 +62,7 @@ struct ChainVerifier where BV: Service, Response = block::Hash, Error = Error> + Send + Clone + 'static, BV::Future: Send + 'static, - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { /// The underlying `BlockVerifier`, possibly wrapped in other services. @@ -95,10 +93,7 @@ impl Service> for ChainVerifier where BV: Service, Response = block::Hash, Error = Error> + Send + Clone + 'static, BV::Future: Send + 'static, - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { type Response = block::Hash; @@ -222,15 +217,26 @@ pub async fn init( + Clone + 'static where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { - let initial_tip = zebra_state::current_tip(state_service.clone()) + /* + let initial_tip = if let zs::Response::Tip(tip) = state_service + .ready_and() .await - .expect("State service poll_ready is Ok"); + .unwrap() + .call(zs::Request::Tip) + .await + .unwrap() + { + tip + } else { + unreachable!("wrong response to Request::Tip"); + }; + */ + // TODO: restore this after figuring out what data is required, + // after simplification of the chainverifier code. + let initial_tip = None; let block_verifier = crate::block::init(state_service.clone()); let checkpoint_list = match config.checkpoint_sync { @@ -282,10 +288,7 @@ pub(crate) fn init_from_verifiers( where BV: Service, Response = block::Hash, Error = Error> + Send + Clone + 'static, BV::Future: Send + 'static, - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { let max_checkpoint_height = checkpoint_list.clone().map(|c| c.max_height()); diff --git a/zebra-consensus/src/chain/tests.rs b/zebra-consensus/src/chain/tests.rs index a213626ef..60caa9360 100644 --- a/zebra-consensus/src/chain/tests.rs +++ b/zebra-consensus/src/chain/tests.rs @@ -15,6 +15,7 @@ use zebra_chain::{ parameters::Network, serialization::ZcashDeserialize, }; +use zebra_state as zs; use zebra_test::transcript::{TransError, Transcript}; use crate::checkpoint::CheckpointList; @@ -62,15 +63,15 @@ fn verifiers_from_checkpoint_list( + Clone + 'static, impl Service< - zebra_state::Request, - Response = zebra_state::Response, + zs::Request, + Response = zs::Response, Error = Error, - Future = impl Future>, + Future = impl Future>, > + Send + Clone + 'static, ) { - let state_service = zebra_state::init(zebra_state::Config::ephemeral(), network); + let state_service = zs::init(zs::Config::ephemeral(), network); let block_verifier = crate::block::init(state_service.clone()); let chain_verifier = super::init_from_verifiers( network, @@ -97,10 +98,10 @@ fn verifiers_from_network( + Clone + 'static, impl Service< - zebra_state::Request, - Response = zebra_state::Response, + zs::Request, + Response = zs::Response, Error = Error, - Future = impl Future>, + Future = impl Future>, > + Send + Clone + 'static, @@ -154,38 +155,27 @@ static NO_COINBASE_TRANSCRIPT: Lazy, Result, - )>, -> = Lazy::new(|| { - let block = block_no_transactions(); - let hash = block.hash(); +static NO_COINBASE_STATE_TRANSCRIPT: Lazy)>> = + Lazy::new(|| { + let block = block_no_transactions(); + let hash = block.hash(); - vec![( - zebra_state::Request::GetBlock { hash }, - Err(TransError::Any), - )] -}); + vec![(zs::Request::Block(hash.into()), Err(TransError::Any))] + }); -static STATE_VERIFY_TRANSCRIPT_GENESIS: Lazy< - Vec<( - zebra_state::Request, - Result, - )>, -> = Lazy::new(|| { - let block: Arc<_> = - Block::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..]) - .unwrap() - .into(); - let hash = block.hash(); +static STATE_VERIFY_TRANSCRIPT_GENESIS: Lazy)>> = + Lazy::new(|| { + let block: Arc<_> = + Block::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..]) + .unwrap() + .into(); + let hash = block.hash(); - vec![( - zebra_state::Request::GetBlock { hash }, - Ok(zebra_state::Response::Block { block }), - )] -}); + vec![( + zs::Request::Block(hash.into()), + Ok(zs::Response::Block(Some(block))), + )] + }); #[tokio::test] async fn verify_block_test() -> Result<(), Report> { @@ -252,7 +242,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> { let chain_verifier = super::init( config.clone(), network, - zebra_state::init(zebra_state::Config::ephemeral(), network), + zs::init(zs::Config::ephemeral(), network), ) .await; @@ -350,6 +340,12 @@ async fn verify_fail_add_block_checkpoint() -> Result<(), Report> { Ok(()) } +/* +// This test is disabled because it doesn't test the right thing: +// the BlockVerifier and CheckpointVerifier make different requests +// and produce different transcripts. + + #[tokio::test] // Temporarily ignore this test, until the state can handle out-of-order blocks #[ignore] @@ -406,7 +402,7 @@ async fn continuous_blockchain(restart_height: Option) -> Result< .collect(); let checkpoint_list = CheckpointList::from_list(checkpoint_list).map_err(|e| eyre!(e))?; - let mut state_service = zebra_state::init(zebra_state::Config::ephemeral(), network); + let mut state_service = zs::init(zs::Config::ephemeral(), network); /// SPANDOC: Add blocks to the state from 0..=restart_height {?restart_height} if restart_height.is_some() { for block in blockchain @@ -418,7 +414,7 @@ async fn continuous_blockchain(restart_height: Option) -> Result< .ready_and() .map_err(|e| eyre!(e)) .await? - .call(zebra_state::Request::AddBlock { + .call(zs::Request::AddBlock { block: block.clone(), }) .map_err(|e| eyre!(e)) @@ -465,3 +461,4 @@ async fn continuous_blockchain(restart_height: Option) -> Result< Ok(()) } +*/ diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index 49af83fad..46a6eb92b 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -13,6 +13,27 @@ //! Verification is provided via a `tower::Service`, to support backpressure and batch //! verification. +use std::{ + collections::BTreeMap, + future::Future, + ops::{Bound, Bound::*}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures_util::FutureExt; +use tokio::sync::oneshot; +use tower::{Service, ServiceExt}; + +use zebra_chain::{ + block::{self, Block}, + parameters::Network, +}; +use zebra_state as zs; + +use crate::{parameters, BoxError}; + pub(crate) mod list; mod types; @@ -23,30 +44,6 @@ pub(crate) use list::CheckpointList; use types::{Progress, Progress::*}; use types::{Target, Target::*}; -use crate::parameters; - -use futures_util::FutureExt; -use std::{ - collections::BTreeMap, - error, - future::Future, - ops::{Bound, Bound::*}, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::sync::oneshot; -use tower::{Service, ServiceExt}; - -use zebra_chain::{ - block::{self, Block}, - parameters::Network, -}; - -/// The inner error type for CheckpointVerifier. -// TODO(jlusby): Error = Report ? -type Error = Box; - /// An unverified block, which is in the queue for checkpoint verification. #[derive(Debug)] struct QueuedBlock { @@ -55,7 +52,7 @@ struct QueuedBlock { /// `block`'s cached header hash. hash: block::Hash, /// The transmitting end of the oneshot channel for this block's result. - tx: oneshot::Sender>, + tx: oneshot::Sender>, } /// A list of unverified blocks at a particular height. @@ -89,10 +86,7 @@ pub const MAX_CHECKPOINT_HEIGHT_GAP: usize = 2_000; #[derive(Debug)] pub struct CheckpointVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { // Inputs @@ -129,10 +123,7 @@ where /// Contains non-service utility functions for CheckpointVerifiers. impl CheckpointVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { /// Return a checkpoint verification service for `network`, using the @@ -179,7 +170,7 @@ where list: impl IntoIterator, initial_tip: Option>, state_service: S, - ) -> Result { + ) -> Result { Ok(Self::from_checkpoint_list( CheckpointList::from_list(list)?, initial_tip, @@ -365,7 +356,7 @@ where /// - the block's height is less than or equal to the previously verified /// checkpoint /// - verification has finished - fn check_height(&self, height: block::Height) -> Result<(), Error> { + fn check_height(&self, height: block::Height) -> Result<(), BoxError> { if height > self.checkpoint_list.max_height() { Err("block is higher than the maximum checkpoint")?; } @@ -419,7 +410,7 @@ where /// /// Returns an error if the block's height is invalid, see `check_height()` /// for details. - fn check_block(&self, block: &Block) -> Result { + fn check_block(&self, block: &Block) -> Result { let block_height = block .coinbase_height() .ok_or("the block does not have a coinbase height")?; @@ -435,7 +426,10 @@ where /// /// If the block does not have a coinbase height, sends an error on `tx`, /// and does not queue the block. - fn queue_block(&mut self, block: Arc) -> oneshot::Receiver> { + fn queue_block( + &mut self, + block: Arc, + ) -> oneshot::Receiver> { // Set up a oneshot channel to send results let (tx, rx) = oneshot::channel(); @@ -734,10 +728,7 @@ where /// CheckpointVerifier rejects pending futures on drop. impl Drop for CheckpointVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { /// Send an error on `tx` for any `QueuedBlock`s that haven't been verified. @@ -766,14 +757,11 @@ where /// After verification, the block futures resolve to their hashes. impl Service> for CheckpointVerifier where - S: Service - + Send - + Clone - + 'static, + S: Service + Send + Clone + 'static, S::Future: Send + 'static, { type Response = block::Hash; - type Error = Error; + type Error = BoxError; type Future = Pin> + Send + 'static>>; @@ -805,23 +793,25 @@ where metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64); async move { - match rx.await.expect( - "unexpected closed receiver: CheckpointVerifier does not leave dangling receivers", - ) { + match rx + .await + .expect("CheckpointVerifier does not leave dangling receivers") + { Ok(hash) => { let verified_hash = match state_service - .oneshot(zebra_state::Request::AddBlock { block }) - .await? { - zebra_state::Response::Added { hash } => hash, - _ => unreachable!("unexpected response type: state service should return the correct response type for each request"), - }; + .oneshot(zs::Request::CommitFinalizedBlock { block }) + .await? + { + zs::Response::Committed(hash) => hash, + _ => unreachable!("wrong response for CommitFinalizedBlock"), + }; assert_eq!( verified_hash, hash, "state service returned wrong hash: hashes must be equal" ); Ok(hash) } - Err(e) => Err(e)?, + Err(e) => Err(e), } } .boxed() diff --git a/zebra-consensus/src/checkpoint/tests.rs b/zebra-consensus/src/checkpoint/tests.rs index afd340cfd..e4f164533 100644 --- a/zebra-consensus/src/checkpoint/tests.rs +++ b/zebra-consensus/src/checkpoint/tests.rs @@ -314,7 +314,7 @@ async fn continuous_blockchain(restart_height: Option) -> Result< /// SPANDOC: Add block to the state {?height} ready_state_service - .call(zebra_state::Request::AddBlock { + .call(zebra_state::Request::CommitFinalizedBlock { block: block.clone(), }) .await diff --git a/zebra-consensus/src/lib.rs b/zebra-consensus/src/lib.rs index f8a4c0278..cbbbc72c0 100644 --- a/zebra-consensus/src/lib.rs +++ b/zebra-consensus/src/lib.rs @@ -28,3 +28,6 @@ mod script; mod transaction; pub use crate::config::Config; + +/// A boxed [`std::error::Error`]. +pub type BoxError = Box;