diff --git a/zebra-consensus/src/block.rs b/zebra-consensus/src/block.rs index 4fd16cc78..d9fc50e94 100644 --- a/zebra-consensus/src/block.rs +++ b/zebra-consensus/src/block.rs @@ -39,9 +39,7 @@ where + 'static, S::Future: Send + 'static, { - /// The underlying `ZebraState`, possibly wrapped in other services. - // TODO: contextual verification - #[allow(dead_code)] + /// The underlying state service, possibly wrapped in other services. state_service: S, } @@ -74,7 +72,7 @@ where } fn call(&mut self, block: Arc) -> Self::Future { - let mut state = self.state_service.clone(); + let mut state_service = self.state_service.clone(); // TODO(jlusby): Error = Report, handle errors from state_service. async move { @@ -147,7 +145,7 @@ where metrics::counter!("block.waiting.count", 1); let previous_block = BlockVerifier::await_block( - &mut state, + &mut state_service, previous_block_hash, expected_height, ) @@ -170,7 +168,20 @@ where ); metrics::counter!("block.verified.block.count", 1); - Ok(hash) + // We need to add the block after the previous block is in the state, + // and before this future returns. Otherwise, blocks could be + // committed out of order. + let ready_state = state_service + .ready_and() + .await?; + + match ready_state.call(zebra_state::Request::AddBlock { block }).await? { + zebra_state::Response::Added { hash: committed_hash } => { + assert_eq!(committed_hash, hash, "state returned wrong hash: hashes must be equal"); + Ok(hash) + } + _ => Err(format!("adding block {:?} {:?} to state failed", height, hash))?, + } } .boxed() } @@ -191,9 +202,12 @@ where /// Get the block for `hash`, using `state`. /// /// If there is no block for that hash, returns `Ok(None)`. - /// Returns an error if `state.poll_ready` errors. - async fn get_block(state: &mut S, hash: block::Hash) -> Result>, Report> { - let block = state + /// Returns an error if `state_service.poll_ready` errors. + async fn get_block( + state_service: &mut S, + hash: block::Hash, + ) -> Result>, Report> { + let block = state_service .ready_and() .await .map_err(|e| eyre!(e))? @@ -208,16 +222,16 @@ where Ok(block) } - /// Wait until a block with `hash` is in `state`. + /// Wait until a block with `hash` is in `state_service`. /// - /// Returns an error if `state.poll_ready` errors. + /// Returns an error if `state_service.poll_ready` errors. async fn await_block( - state: &mut S, + state_service: &mut S, hash: block::Hash, height: block::Height, ) -> Result, Report> { loop { - match BlockVerifier::get_block(state, hash).await? { + match BlockVerifier::get_block(state_service, hash).await? { Some(block) => return Ok(block), // Busy-waiting is only a temporary solution to waiting for blocks. // Replace with the contextual verification RFC design @@ -232,9 +246,9 @@ where /// Return a block verification service, using the provided state service. /// -/// The block verifier holds a state service of type `S`, used as context for -/// block validation. This state is pluggable to allow for testing or -/// instrumentation. +/// The block verifier holds a state service of type `S`, into which newly +/// verified blocks will be committed. This state is pluggable to allow for +/// testing or instrumentation. /// /// The returned type is opaque to allow instrumentation or other wrappers, but /// can be boxed for storage. It is also `Clone` to allow sharing of a diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 1cf0e87ea..969729c70 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -39,10 +39,17 @@ const MAX_EXPECTED_BLOCK_GAP: u32 = 100_000; /// A wrapper type that holds the `ChainVerifier`'s `CheckpointVerifier`, and /// its associated state. #[derive(Clone)] -struct ChainCheckpointVerifier { +struct ChainCheckpointVerifier +where + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ /// The underlying `CheckpointVerifier`, wrapped in a buffer, so we can /// clone and share it with futures. - verifier: Buffer>, + verifier: Buffer, Arc>, /// The maximum checkpoint height for `checkpoint_verifier`. max_height: block::Height, @@ -67,10 +74,7 @@ where /// associated state. /// /// None if all the checkpoints have been verified. - checkpoint: Option, - - /// The underlying `ZebraState`, possibly wrapped in other services. - state_service: S, + checkpoint: Option>, /// The most recent block height that was submitted to the verifier. /// @@ -103,20 +107,19 @@ where Pin> + Send + 'static>>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - // We don't expect the state or verifiers to exert backpressure on our - // users, so we don't need to call `state_service.poll_ready()` here. + // We don't expect the verifiers to exert backpressure on our + // users, so we don't need to call the verifier's `poll_ready` here. // (And we don't know which verifier to choose at this point, anyway.) Poll::Ready(Ok(())) } fn call(&mut self, block: Arc) -> Self::Future { - // TODO(jlusby): Error = Report, handle errors from state_service. + // TODO(jlusby): Error = Report let height = block.coinbase_height(); let hash = block.hash(); let span = tracing::debug_span!("block_verify", ?height, ?hash,); let mut block_verifier = self.block_verifier.clone(); - let mut state_service = self.state_service.clone(); let checkpoint_verifier = self.checkpoint.clone().map(|c| c.verifier); let max_checkpoint_height = self.checkpoint.clone().map(|c| c.max_height); @@ -148,18 +151,20 @@ where tracing::debug!("large block height gap: this block or the previous block is out of order"); } - block_verifier + let verified_hash = block_verifier .ready_and() .await? .call(block.clone()) .await?; + assert_eq!(verified_hash, hash, "block verifier returned wrong hash: hashes must be equal"); } else { - checkpoint_verifier + let verified_hash = checkpoint_verifier .expect("missing checkpoint verifier: verifier must be Some if max checkpoint height is Some") .ready_and() .await? .call(block.clone()) .await?; + assert_eq!(verified_hash, hash, "checkpoint verifier returned wrong hash: hashes must be equal"); } tracing::trace!(?height, ?hash, "verified block"); @@ -169,15 +174,7 @@ where ); metrics::counter!("chain.verified.block.count", 1); - let add_block = state_service - .ready_and() - .await? - .call(zebra_state::Request::AddBlock { block }); - - match add_block.await? { - zebra_state::Response::Added { hash } => Ok(hash), - _ => Err("adding block to zebra-state failed".into()), - } + Ok(hash) } .instrument(span) .boxed() @@ -253,10 +250,6 @@ where /// Return a chain verification service, using the provided block verifier, /// checkpoint list, and state service. /// -/// The chain verifier holds a state service of type `S`, used as context for -/// block validation and to which newly verified blocks will be committed. This -/// state is pluggable to allow for testing or instrumentation. -/// /// The returned type is opaque to allow instrumentation or other wrappers, but /// can be boxed for storage. It is also `Clone` to allow sharing of a /// verification service. @@ -320,12 +313,12 @@ where (Some(initial_height), _, Some(max_checkpoint_height)) if (initial_height > max_checkpoint_height) => None, // No list, no checkpoint verifier (_, None, _) => None, - (_, Some(_), None) => panic!("Missing max checkpoint height: height must be Some if verifier is Some"), + // We've done all the checks we need to create a checkpoint verifier (_, Some(list), Some(max_height)) => Some( ChainCheckpointVerifier { - verifier: Buffer::new(CheckpointVerifier::from_checkpoint_list(list, initial_tip), 1), + verifier: Buffer::new(CheckpointVerifier::from_checkpoint_list(list, initial_tip, state_service), 1), max_height, }), }; @@ -334,7 +327,6 @@ where ChainVerifier { block_verifier, checkpoint, - state_service, last_block_height: initial_height, }, 1, diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index cb5dd3148..49af83fad 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -36,7 +36,7 @@ use std::{ task::{Context, Poll}, }; use tokio::sync::oneshot; -use tower::Service; +use tower::{Service, ServiceExt}; use zebra_chain::{ block::{self, Block}, @@ -87,7 +87,14 @@ pub const MAX_CHECKPOINT_HEIGHT_GAP: usize = 2_000; /// Verifies blocks using a supplied list of checkpoints. There must be at /// least one checkpoint for the genesis block. #[derive(Debug)] -pub struct CheckpointVerifier { +pub struct CheckpointVerifier +where + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ // Inputs // /// The checkpoint list for this verifier. @@ -96,6 +103,9 @@ pub struct CheckpointVerifier { /// The hash of the initial tip, if any. initial_tip_hash: Option, + /// The underlying state service, possibly wrapped in other services. + state_service: S, + // Queued Blocks // /// A queue of unverified blocks. @@ -117,17 +127,30 @@ pub struct CheckpointVerifier { /// The CheckpointVerifier implementation. /// /// Contains non-service utility functions for CheckpointVerifiers. -impl CheckpointVerifier { +impl CheckpointVerifier +where + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ /// Return a checkpoint verification service for `network`, using the - /// hard-coded checkpoint list. If `initial_tip` is Some(_), the - /// verifier starts at that initial tip, which does not have to be in the - /// hard-coded checkpoint list. + /// hard-coded checkpoint list, and the provided `state_service`. + /// + /// If `initial_tip` is Some(_), the verifier starts at that initial tip. + /// The initial tip can be between the checkpoints in the hard-coded + /// checkpoint list. + /// + /// The checkpoint verifier holds a state service of type `S`, into which newly + /// verified blocks will be committed. This state is pluggable to allow for + /// testing or instrumentation. /// /// This function should be called only once for a particular network, rather /// than constructing multiple verification services for the same network. To /// clone a CheckpointVerifier, you might need to wrap it in a /// `tower::Buffer` service. - pub fn new(network: Network, initial_tip: Option>) -> Self { + pub fn new(network: Network, initial_tip: Option>, state_service: S) -> Self { let checkpoint_list = CheckpointList::new(network); let max_height = checkpoint_list.max_height(); let initial_height = initial_tip.clone().map(|b| b.coinbase_height()).flatten(); @@ -137,38 +160,44 @@ impl CheckpointVerifier { ?initial_height, "initialising CheckpointVerifier" ); - Self::from_checkpoint_list(checkpoint_list, initial_tip) + Self::from_checkpoint_list(checkpoint_list, initial_tip, state_service) } - /// Return a checkpoint verification service using `list` and `initial_tip`. + /// Return a checkpoint verification service using `list`, `initial_tip`, + /// and `state_service`. /// /// Assumes that the provided genesis checkpoint is correct. /// /// Callers should prefer `CheckpointVerifier::new`, which uses the - /// hard-coded checkpoint lists. See `CheckpointVerifier::new` and - /// `CheckpointList::from_list` for more details. - // - // This function is designed for use in tests. + /// hard-coded checkpoint lists, or `CheckpointList::from_list` if you need + /// to specify a custom checkpoint list. See those functions for more + /// details. + /// + /// This function is designed for use in tests. #[allow(dead_code)] pub(crate) fn from_list( list: impl IntoIterator, initial_tip: Option>, + state_service: S, ) -> Result { Ok(Self::from_checkpoint_list( CheckpointList::from_list(list)?, initial_tip, + state_service, )) } - /// Return a checkpoint verification service using `checkpoint_list` and - /// `initial_tip`. + /// Return a checkpoint verification service using `checkpoint_list`, + /// `initial_tip`, and `state_service`. + /// + /// Assumes that the provided genesis checkpoint is correct. /// /// Callers should prefer `CheckpointVerifier::new`, which uses the - /// hard-coded checkpoint lists. See `CheckpointVerifier::new` and - /// `CheckpointList::from_list` for more details. + /// hard-coded checkpoint lists. See that function for more details. pub(crate) fn from_checkpoint_list( checkpoint_list: CheckpointList, initial_tip: Option>, + state_service: S, ) -> Self { // All the initialisers should call this function, so we only have to // change fields or default values in one place. @@ -193,6 +222,7 @@ impl CheckpointVerifier { CheckpointVerifier { checkpoint_list, initial_tip_hash, + state_service, queued: BTreeMap::new(), verifier_progress, } @@ -702,7 +732,14 @@ impl CheckpointVerifier { } /// CheckpointVerifier rejects pending futures on drop. -impl Drop for CheckpointVerifier { +impl Drop for CheckpointVerifier +where + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ /// Send an error on `tx` for any `QueuedBlock`s that haven't been verified. /// /// We can't implement `Drop` on QueuedBlock, because `send()` consumes @@ -727,7 +764,14 @@ impl Drop for CheckpointVerifier { /// The CheckpointVerifier service implementation. /// /// After verification, the block futures resolve to their hashes. -impl Service> for CheckpointVerifier { +impl Service> for CheckpointVerifier +where + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ type Response = block::Hash; type Error = Error; type Future = @@ -741,11 +785,11 @@ impl Service> for CheckpointVerifier { } fn call(&mut self, block: Arc) -> Self::Future { - // TODO(jlusby): Error = Report + let state_service = self.state_service.clone(); // Queue the block for verification, until we receive all the blocks for // the current checkpoint range. - let rx = self.queue_block(block); + let rx = self.queue_block(block.clone()); // Try to verify from the previous checkpoint to a target checkpoint. // @@ -754,15 +798,31 @@ impl Service> for CheckpointVerifier { // on the next call(). Failures always reject a block, so we know // there will be at least one more call(). // - // TODO(teor): retry on failure (low priority, failures should be rare) + // We don't retry with a smaller range on failure, because failures + // should be rare. self.process_checkpoint_range(); metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64); async move { - // Remove the Result<..., RecvError> wrapper from the channel future - rx.await - .expect("CheckpointVerifier does not leave dangling receivers") + match rx.await.expect( + "unexpected closed receiver: CheckpointVerifier does not leave dangling receivers", + ) { + Ok(hash) => { + let verified_hash = match state_service + .oneshot(zebra_state::Request::AddBlock { block }) + .await? { + zebra_state::Response::Added { hash } => hash, + _ => unreachable!("unexpected response type: state service should return the correct response type for each request"), + }; + assert_eq!( + verified_hash, hash, + "state service returned wrong hash: hashes must be equal" + ); + Ok(hash) + } + Err(e) => Err(e)?, + } } .boxed() } diff --git a/zebra-consensus/src/checkpoint/tests.rs b/zebra-consensus/src/checkpoint/tests.rs index 458eeb5ef..ca8bc4180 100644 --- a/zebra-consensus/src/checkpoint/tests.rs +++ b/zebra-consensus/src/checkpoint/tests.rs @@ -12,6 +12,7 @@ use tokio::{stream::StreamExt, time::timeout}; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; +use zebra_chain::parameters::Network::*; use zebra_chain::serialization::ZcashDeserialize; /// The timeout we apply to each verify future during testing. @@ -44,8 +45,10 @@ async fn single_item_checkpoint_list() -> Result<(), Report> { .cloned() .collect(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); let mut checkpoint_verifier = - CheckpointVerifier::from_list(genesis_checkpoint_list, None).map_err(|e| eyre!(e))?; + CheckpointVerifier::from_list(genesis_checkpoint_list, None, state_service) + .map_err(|e| eyre!(e))?; assert_eq!( checkpoint_verifier.previous_checkpoint_height(), @@ -124,8 +127,10 @@ async fn multi_item_checkpoint_list() -> Result<(), Report> { .map(|(_block, height, hash)| (*height, *hash)) .collect(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); let mut checkpoint_verifier = - CheckpointVerifier::from_list(checkpoint_list, None).map_err(|e| eyre!(e))?; + CheckpointVerifier::from_list(checkpoint_list, None, state_service) + .map_err(|e| eyre!(e))?; assert_eq!( checkpoint_verifier.previous_checkpoint_height(), @@ -261,8 +266,10 @@ async fn continuous_blockchain(restart_height: Option) -> Result< let initial_tip = restart_height .map(|block::Height(height)| &blockchain[height as usize].0) .cloned(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); let mut checkpoint_verifier = - CheckpointVerifier::from_list(checkpoint_list, initial_tip).map_err(|e| eyre!(e))?; + CheckpointVerifier::from_list(checkpoint_list, initial_tip, state_service) + .map_err(|e| eyre!(e))?; // Setup checks if restart_height @@ -382,8 +389,10 @@ async fn block_higher_than_max_checkpoint_fail() -> Result<(), Report> { .cloned() .collect(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); let mut checkpoint_verifier = - CheckpointVerifier::from_list(genesis_checkpoint_list, None).map_err(|e| eyre!(e))?; + CheckpointVerifier::from_list(genesis_checkpoint_list, None, state_service) + .map_err(|e| eyre!(e))?; assert_eq!( checkpoint_verifier.previous_checkpoint_height(), @@ -457,8 +466,10 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> { .cloned() .collect(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); let mut checkpoint_verifier = - CheckpointVerifier::from_list(genesis_checkpoint_list, None).map_err(|e| eyre!(e))?; + CheckpointVerifier::from_list(genesis_checkpoint_list, None, state_service) + .map_err(|e| eyre!(e))?; assert_eq!( checkpoint_verifier.previous_checkpoint_height(), @@ -637,8 +648,10 @@ async fn checkpoint_drop_cancel() -> Result<(), Report> { .map(|(_block, height, hash)| (*height, *hash)) .collect(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); let mut checkpoint_verifier = - CheckpointVerifier::from_list(checkpoint_list, None).map_err(|e| eyre!(e))?; + CheckpointVerifier::from_list(checkpoint_list, None, state_service) + .map_err(|e| eyre!(e))?; assert_eq!( checkpoint_verifier.previous_checkpoint_height(), @@ -721,8 +734,9 @@ async fn hard_coded_mainnet() -> Result<(), Report> { Arc::::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])?; let hash0 = block0.hash(); + let state_service = zebra_state::init(zebra_state::Config::ephemeral(), Mainnet); // Use the hard-coded checkpoint list - let mut checkpoint_verifier = CheckpointVerifier::new(Network::Mainnet, None); + let mut checkpoint_verifier = CheckpointVerifier::new(Network::Mainnet, None, state_service); assert_eq!( checkpoint_verifier.previous_checkpoint_height(),