Fix poll_ready usage in ChainVerifier (#1700)
* change `poll_ready()` and `call()` of `ChainVerifier` * add bound, move max_checkpoint_height * add buffers to the checkpoint and block verifiers And rename the chain verifier fields so `block` means `Arc<Block>`, and `block_verifier` means `Buffer<BlockVerifier, ...>`. * Fix the error types * Use `ServiceExt::oneshot` in `ChainVerifier::call` And: * make the code look like the `main` branch as much as possible * document the `poll_ready`/`call` invariant * Use `ServiceExt::oneshot` in `chain::init` Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
parent
a91006afa7
commit
0723ac5be1
|
@ -22,11 +22,23 @@ use zebra_state as zs;
|
|||
|
||||
use crate::{
|
||||
block::BlockVerifier,
|
||||
block::VerifyBlockError,
|
||||
checkpoint::{CheckpointList, CheckpointVerifier, VerifyCheckpointError},
|
||||
checkpoint::{CheckpointList, CheckpointVerifier},
|
||||
BoxError, Config,
|
||||
};
|
||||
|
||||
/// The bound for each verifier's buffer.
|
||||
///
|
||||
/// We choose the verifier buffer bound based on the maximum number of
|
||||
/// concurrent verifier users, to avoid contention:
|
||||
/// - the `ChainSync` component
|
||||
/// - the `Inbound` service
|
||||
/// - a miner component, which we might add in future, and
|
||||
/// - 1 extra slot to avoid contention.
|
||||
///
|
||||
/// We deliberately add extra slots, because they only cost a small amount of
|
||||
/// memory, but missing slots can significantly slow down Zebra.
|
||||
const VERIFIER_BUFFER_BOUND: usize = 4;
|
||||
|
||||
/// The chain verifier routes requests to either the checkpoint verifier or the
|
||||
/// block verifier, depending on the maximum checkpoint height.
|
||||
struct ChainVerifier<S>
|
||||
|
@ -34,17 +46,20 @@ where
|
|||
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
block: BlockVerifier<S>,
|
||||
checkpoint: CheckpointVerifier<S>,
|
||||
// Normally, we erase the types on buffer-wrapped services.
|
||||
// But if we did that here, the block and checkpoint services would be
|
||||
// type-indistinguishable, risking future substitution errors.
|
||||
block_verifier: Buffer<BlockVerifier<S>, Arc<block::Block>>,
|
||||
checkpoint_verifier: Buffer<CheckpointVerifier<S>, Arc<block::Block>>,
|
||||
max_checkpoint_height: block::Height,
|
||||
}
|
||||
|
||||
#[derive(Debug, Display, Error)]
|
||||
pub enum VerifyChainError {
|
||||
/// block could not be checkpointed
|
||||
Checkpoint(#[source] VerifyCheckpointError),
|
||||
Checkpoint(#[source] BoxError),
|
||||
/// block could not be verified
|
||||
Block(#[source] VerifyBlockError),
|
||||
Block(#[source] BoxError),
|
||||
}
|
||||
|
||||
impl<S> Service<Arc<Block>> for ChainVerifier<S>
|
||||
|
@ -57,30 +72,34 @@ where
|
|||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
match (self.checkpoint.poll_ready(cx), self.block.poll_ready(cx)) {
|
||||
// First, fail if either service fails.
|
||||
(Poll::Ready(Err(e)), _) => Poll::Ready(Err(VerifyChainError::Checkpoint(e))),
|
||||
(_, Poll::Ready(Err(e))) => Poll::Ready(Err(VerifyChainError::Block(e))),
|
||||
// Second, we're unready if either service is unready.
|
||||
(Poll::Pending, _) | (_, Poll::Pending) => Poll::Pending,
|
||||
// Finally, we're ready if both services are ready and OK.
|
||||
(Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
|
||||
}
|
||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// Correctness:
|
||||
//
|
||||
// We can't call `poll_ready` on the block and checkpoint verifiers here,
|
||||
// because each `poll_ready` must be followed by a `call`, and we don't
|
||||
// know which verifier we're going to choose yet.
|
||||
// See #1593 for details.
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, block: Arc<Block>) -> Self::Future {
|
||||
match block.coinbase_height() {
|
||||
// Correctness:
|
||||
//
|
||||
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has
|
||||
// a matching `call`. See #1593 for details.
|
||||
Some(height) if height <= self.max_checkpoint_height => self
|
||||
.checkpoint
|
||||
.call(block)
|
||||
.checkpoint_verifier
|
||||
.clone()
|
||||
.oneshot(block)
|
||||
.map_err(VerifyChainError::Checkpoint)
|
||||
.boxed(),
|
||||
// This also covers blocks with no height, which the block verifier
|
||||
// will reject immediately.
|
||||
_ => self
|
||||
.block
|
||||
.call(block)
|
||||
.block_verifier
|
||||
.clone()
|
||||
.oneshot(block)
|
||||
.map_err(VerifyChainError::Block)
|
||||
.boxed(),
|
||||
}
|
||||
|
@ -103,7 +122,7 @@ where
|
|||
pub async fn init<S>(
|
||||
config: Config,
|
||||
network: Network,
|
||||
mut state_service: S,
|
||||
state_service: S,
|
||||
) -> Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>
|
||||
where
|
||||
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
|
||||
|
@ -118,11 +137,13 @@ where
|
|||
.expect("hardcoded checkpoint list extends past sapling activation")
|
||||
};
|
||||
|
||||
// Correctness:
|
||||
//
|
||||
// We use `ServiceExt::oneshot` to make sure every `poll_ready` has a
|
||||
// matching `call`. See #1593 for details.
|
||||
let tip = match state_service
|
||||
.ready_and()
|
||||
.await
|
||||
.unwrap()
|
||||
.call(zs::Request::Tip)
|
||||
.clone()
|
||||
.oneshot(zs::Request::Tip)
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
|
@ -131,15 +152,18 @@ where
|
|||
};
|
||||
tracing::info!(?tip, ?max_checkpoint_height, "initializing chain verifier");
|
||||
|
||||
let block = BlockVerifier::new(network, state_service.clone());
|
||||
let checkpoint = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);
|
||||
let block_verifier = BlockVerifier::new(network, state_service.clone());
|
||||
let checkpoint_verifier = CheckpointVerifier::from_checkpoint_list(list, tip, state_service);
|
||||
|
||||
let block_verifier = Buffer::new(block_verifier, VERIFIER_BUFFER_BOUND);
|
||||
let checkpoint_verifier = Buffer::new(checkpoint_verifier, VERIFIER_BUFFER_BOUND);
|
||||
|
||||
Buffer::new(
|
||||
BoxService::new(ChainVerifier {
|
||||
block,
|
||||
checkpoint,
|
||||
block_verifier,
|
||||
checkpoint_verifier,
|
||||
max_checkpoint_height,
|
||||
}),
|
||||
3,
|
||||
VERIFIER_BUFFER_BOUND,
|
||||
)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue