diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 06dea6e69..5c7e1c6a4 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -22,11 +22,23 @@ use zebra_state as zs; use crate::{ block::BlockVerifier, - block::VerifyBlockError, - checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError}, + checkpoint::{CheckpointList, CheckpointVerifier}, BoxError, Config, }; +/// The bound for each verifier's buffer. +/// +/// We choose the verifier buffer bound based on the maximum number of +/// concurrent verifier users, to avoid contention: +/// - the `ChainSync` component +/// - the `Inbound` service +/// - a miner component, which we might add in future, and +/// - 1 extra slot to avoid contention. +/// +/// We deliberately add extra slots, because they only cost a small amount of +/// memory, but missing slots can significantly slow down Zebra. +const VERIFIER_BUFFER_BOUND: usize = 4; + /// The chain verifier routes requests to either the checkpoint verifier or the /// block verifier, depending on the maximum checkpoint height. struct ChainVerifier @@ -34,17 +46,20 @@ where S: Service + Send + Clone + 'static, S::Future: Send + 'static, { - block: BlockVerifier, - checkpoint: CheckpointVerifier, + // Normally, we erase the types on buffer-wrapped services. + // But if we did that here, the block and checkpoint services would be + // type-indistinguishable, risking future substitution errors. + block_verifier: Buffer, Arc>, + checkpoint_verifier: Buffer, Arc>, max_checkpoint_height: block::Height, } #[derive(Debug, Display, Error)] pub enum VerifyChainError { /// block could not be checkpointed - Checkpoint(#[source] VerifyCheckpointError), + Checkpoint(#[source] BoxError), /// block could not be verified - Block(#[source] VerifyBlockError), + Block(#[source] BoxError), } impl Service> for ChainVerifier @@ -57,30 +72,34 @@ where type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match (self.checkpoint.poll_ready(cx), self.block.poll_ready(cx)) { - // First, fail if either service fails. - (Poll::Ready(Err(e)), _) => Poll::Ready(Err(VerifyChainError::Checkpoint(e))), - (_, Poll::Ready(Err(e))) => Poll::Ready(Err(VerifyChainError::Block(e))), - // Second, we're unready if either service is unready. - (Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending, - // Finally, we're ready if both services are ready and OK. - (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())), - } + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + // Correctness: + // + // We can't call `poll_ready` on the block and checkpoint verifiers here, + // because each `poll_ready` must be followed by a `call`, and we don't + // know which verifier we're going to choose yet. + // See #1593 for details. + Poll::Ready(Ok(())) } fn call(&mut self, block: Arc) -> Self::Future { match block.coinbase_height() { + // Correctness: + // + // We use `ServiceExt::oneshot` to make sure every `poll_ready` has + // a matching `call`. See #1593 for details. Some(height) if height <= self.max_checkpoint_height => self - .checkpoint - .call(block) + .checkpoint_verifier + .clone() + .oneshot(block) .map_err(VerifyChainError::Checkpoint) .boxed(), // This also covers blocks with no height, which the block verifier // will reject immediately. _ => self - .block - .call(block) + .block_verifier + .clone() + .oneshot(block) .map_err(VerifyChainError::Block) .boxed(), } @@ -103,7 +122,7 @@ where pub async fn init( config: Config, network: Network, - mut state_service: S, + state_service: S, ) -> Buffer, block::Hash, VerifyChainError>, Arc> where S: Service + Send + Clone + 'static, @@ -118,11 +137,13 @@ where .expect("hardcoded checkpoint list extends past sapling activation") }; + // Correctness: + // + // We use `ServiceExt::oneshot` to make sure every `poll_ready` has a + // matching `call`. See #1593 for details. let tip = match state_service - .ready_and() - .await - .unwrap() - .call(zs::Request::Tip) + .clone() + .oneshot(zs::Request::Tip) .await .unwrap() { @@ -131,15 +152,18 @@ where }; tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier"); - let block = BlockVerifier::new(network, state_service.clone()); - let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service); + let block_verifier = BlockVerifier::new(network, state_service.clone()); + let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service); + + let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND); + let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND); Buffer::new( BoxService::new(ChainVerifier { - block, - checkpoint, + block_verifier, + checkpoint_verifier, max_checkpoint_height, }), - 3, + VERIFIER_BUFFER_BOUND, ) }