consensus: partially update to new state API

This disables one test that can't be easily fixed at the moment, because
it tests the wrong thing: the checkpoint and block verifiers will
produce different transcripts.

It also disables the initial_tip logic for now, pending simplification
of the ChainVerifier logic.
This commit is contained in:
Henry de Valence 2020-09-09 18:53:40 -07:00
parent 070013439e
commit 93cc6957b1
7 changed files with 150 additions and 206 deletions

View File

@ -8,57 +8,44 @@
//! Verification is provided via a `tower::Service`, to support backpressure and batch
//! verification.
mod check;
#[cfg(test)]
mod tests;
use chrono::Utc;
use color_eyre::eyre::{eyre, Report};
use futures_util::FutureExt;
use std::{
error,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use chrono::Utc;
use futures_util::FutureExt;
use tower::{buffer::Buffer, Service, ServiceExt};
use zebra_chain::block::{self, Block};
use zebra_state as zs;
use crate::BoxError;
mod check;
#[cfg(test)]
mod tests;
/// A service that verifies blocks.
#[derive(Debug)]
struct BlockVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// The underlying state service, possibly wrapped in other services.
state_service: S,
}
/// The error type for the BlockVerifier Service.
// TODO(jlusby): Error = Report ?
type Error = Box<dyn error::Error + Send + Sync + 'static>;
/// The BlockVerifier service implementation.
///
/// The state service is only used for contextual verification.
/// (The `ChainVerifier` updates the state.)
impl<S> Service<Arc<Block>> for BlockVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
type Response = block::Hash;
type Error = Error;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@ -76,6 +63,19 @@ where
async move {
let hash = block.hash();
// Check that this block is actually a new block.
match state_service.ready_and().await?.call(zs::Request::Depth(hash)).await? {
zs::Response::Depth(Some(depth)) => {
return Err(format!(
"block {} is already in the chain at depth {:?}",
hash,
depth,
).into())
},
zs::Response::Depth(None) => {},
_ => unreachable!("wrong response to Request::Depth"),
}
// These checks only apply to generated blocks. We check the block
// height for parsed blocks when we deserialize them.
let height = block
@ -89,13 +89,6 @@ where
block::Height::MAX))?;
}
// Check that this block is actually a new block
if BlockVerifier::get_block(&mut state_service, hash).await?.is_some() {
Err(format!("duplicate block {:?} {:?}: block has already been verified",
height,
hash))?;
}
// Do the difficulty checks first, to raise the threshold for
// attacks that use any other fields.
let difficulty_threshold = block
@ -130,60 +123,19 @@ where
);
metrics::counter!("block.verified.block.count", 1);
// Commit the block in the future - the state will handle out of
// order blocks.
let ready_state = state_service
.ready_and()
.await?;
match ready_state.call(zebra_state::Request::AddBlock { block }).await? {
zebra_state::Response::Added { hash: committed_hash } => {
// Finally, submit the block for contextual verification.
match state_service.oneshot(zs::Request::CommitBlock{ block }).await? {
zs::Response::Committed(committed_hash) => {
assert_eq!(committed_hash, hash, "state returned wrong hash: hashes must be equal");
Ok(hash)
}
_ => Err(format!("adding block {:?} {:?} to state failed", height, hash))?,
_ => unreachable!("wrong response to CommitBlock"),
}
}
.boxed()
}
}
/// The BlockVerifier implementation.
///
/// The state service is only used for contextual verification.
/// (The `ChainVerifier` updates the state.)
impl<S> BlockVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
/// Get the block for `hash`, using `state`.
///
/// If there is no block for that hash, returns `Ok(None)`.
/// Returns an error if `state_service.poll_ready` errors.
async fn get_block(
state_service: &mut S,
hash: block::Hash,
) -> Result<Option<Arc<Block>>, Report> {
let block = state_service
.ready_and()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::GetBlock { hash })
.await
.map(|response| match response {
zebra_state::Response::Block { block } => block,
_ => unreachable!("GetBlock request can only result in Response::Block"),
})
.ok();
Ok(block)
}
}
/// Return a block verification service, using the provided state service.
///
/// The block verifier holds a state service of type `S`, into which newly
@ -202,16 +154,13 @@ pub fn init<S>(
) -> impl Service<
Arc<Block>,
Response = block::Hash,
Error = Error,
Future = impl Future<Output = Result<block::Hash, Error>>,
Error = BoxError,
Future = impl Future<Output = Result<block::Hash, BoxError>>,
> + Send
+ Clone
+ 'static
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
Buffer::new(BlockVerifier { state_service }, 1)

View File

@ -1,12 +1,14 @@
//! Consensus check functions
use super::*;
use chrono::{DateTime, Utc};
use zebra_chain::{
block::{Block, Header},
work::equihash,
};
use crate::BoxError;
/// Check that there is exactly one coinbase transaction in `Block`, and that
/// the coinbase transaction is the first transaction in the block.
///
@ -15,7 +17,7 @@ use zebra_chain::{
/// fees paid by transactions included in this block." [§3.10][3.10]
///
/// [3.10]: https://zips.z.cash/protocol/protocol.pdf#coinbasetransactions
pub fn is_coinbase_first(block: &Block) -> Result<(), Error> {
pub fn is_coinbase_first(block: &Block) -> Result<(), BoxError> {
let first = block
.transactions
.get(0)
@ -49,6 +51,6 @@ pub fn is_equihash_solution_valid(header: &Header) -> Result<(), equihash::Error
/// accepted." [§7.5][7.5]
///
/// [7.5]: https://zips.z.cash/protocol/protocol.pdf#blockheader
pub fn is_time_valid_at(header: &Header, now: DateTime<Utc>) -> Result<(), Error> {
pub fn is_time_valid_at(header: &Header, now: DateTime<Utc>) -> Result<(), BoxError> {
header.is_time_valid_at(now)
}

View File

@ -28,8 +28,12 @@ use std::{
use tower::{buffer::Buffer, Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::block::{self, Block};
use zebra_chain::parameters::{Network, NetworkUpgrade::Sapling};
use zebra_chain::{
block::{self, Block},
parameters::{Network, NetworkUpgrade::Sapling},
};
use zebra_state as zs;
/// The maximum expected gap between blocks.
///
@ -41,10 +45,7 @@ const MAX_EXPECTED_BLOCK_GAP: u32 = 100_000;
#[derive(Clone)]
struct ChainCheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// The underlying `CheckpointVerifier`, wrapped in a buffer, so we can
@ -61,10 +62,7 @@ struct ChainVerifier<BV, S>
where
BV: Service<Arc<Block>, Response = block::Hash, Error = Error> + Send + Clone + 'static,
BV::Future: Send + 'static,
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// The underlying `BlockVerifier`, possibly wrapped in other services.
@ -95,10 +93,7 @@ impl<BV, S> Service<Arc<Block>> for ChainVerifier<BV, S>
where
BV: Service<Arc<Block>, Response = block::Hash, Error = Error> + Send + Clone + 'static,
BV::Future: Send + 'static,
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
S::Future: Send + 'static,
{
type Response = block::Hash;
@ -222,15 +217,26 @@ pub async fn init<S>(
+ Clone
+ 'static
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
S::Future: Send + 'static,
{
let initial_tip = zebra_state::current_tip(state_service.clone())
/*
let initial_tip = if let zs::Response::Tip(tip) = state_service
.ready_and()
.await
.expect("State service poll_ready is Ok");
.unwrap()
.call(zs::Request::Tip)
.await
.unwrap()
{
tip
} else {
unreachable!("wrong response to Request::Tip");
};
*/
// TODO: restore this after figuring out what data is required,
// after simplification of the chainverifier code.
let initial_tip = None;
let block_verifier = crate::block::init(state_service.clone());
let checkpoint_list = match config.checkpoint_sync {
@ -282,10 +288,7 @@ pub(crate) fn init_from_verifiers<BV, S>(
where
BV: Service<Arc<Block>, Response = block::Hash, Error = Error> + Send + Clone + 'static,
BV::Future: Send + 'static,
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = Error> + Send + Clone + 'static,
S::Future: Send + 'static,
{
let max_checkpoint_height = checkpoint_list.clone().map(|c| c.max_height());

View File

@ -15,6 +15,7 @@ use zebra_chain::{
parameters::Network,
serialization::ZcashDeserialize,
};
use zebra_state as zs;
use zebra_test::transcript::{TransError, Transcript};
use crate::checkpoint::CheckpointList;
@ -62,15 +63,15 @@ fn verifiers_from_checkpoint_list(
+ Clone
+ 'static,
impl Service<
zebra_state::Request,
Response = zebra_state::Response,
zs::Request,
Response = zs::Response,
Error = Error,
Future = impl Future<Output = Result<zebra_state::Response, Error>>,
Future = impl Future<Output = Result<zs::Response, Error>>,
> + Send
+ Clone
+ 'static,
) {
let state_service = zebra_state::init(zebra_state::Config::ephemeral(), network);
let state_service = zs::init(zs::Config::ephemeral(), network);
let block_verifier = crate::block::init(state_service.clone());
let chain_verifier = super::init_from_verifiers(
network,
@ -97,10 +98,10 @@ fn verifiers_from_network(
+ Clone
+ 'static,
impl Service<
zebra_state::Request,
Response = zebra_state::Response,
zs::Request,
Response = zs::Response,
Error = Error,
Future = impl Future<Output = Result<zebra_state::Response, Error>>,
Future = impl Future<Output = Result<zs::Response, Error>>,
> + Send
+ Clone
+ 'static,
@ -154,38 +155,27 @@ static NO_COINBASE_TRANSCRIPT: Lazy<Vec<(Arc<Block>, Result<block::Hash, TransEr
vec![(Arc::new(block), Err(TransError::Any))]
});
static NO_COINBASE_STATE_TRANSCRIPT: Lazy<
Vec<(
zebra_state::Request,
Result<zebra_state::Response, TransError>,
)>,
> = Lazy::new(|| {
let block = block_no_transactions();
let hash = block.hash();
static NO_COINBASE_STATE_TRANSCRIPT: Lazy<Vec<(zs::Request, Result<zs::Response, TransError>)>> =
Lazy::new(|| {
let block = block_no_transactions();
let hash = block.hash();
vec![(
zebra_state::Request::GetBlock { hash },
Err(TransError::Any),
)]
});
vec![(zs::Request::Block(hash.into()), Err(TransError::Any))]
});
static STATE_VERIFY_TRANSCRIPT_GENESIS: Lazy<
Vec<(
zebra_state::Request,
Result<zebra_state::Response, TransError>,
)>,
> = Lazy::new(|| {
let block: Arc<_> =
Block::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])
.unwrap()
.into();
let hash = block.hash();
static STATE_VERIFY_TRANSCRIPT_GENESIS: Lazy<Vec<(zs::Request, Result<zs::Response, TransError>)>> =
Lazy::new(|| {
let block: Arc<_> =
Block::zcash_deserialize(&zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES[..])
.unwrap()
.into();
let hash = block.hash();
vec![(
zebra_state::Request::GetBlock { hash },
Ok(zebra_state::Response::Block { block }),
)]
});
vec![(
zs::Request::Block(hash.into()),
Ok(zs::Response::Block(Some(block))),
)]
});
#[tokio::test]
async fn verify_block_test() -> Result<(), Report> {
@ -252,7 +242,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> {
let chain_verifier = super::init(
config.clone(),
network,
zebra_state::init(zebra_state::Config::ephemeral(), network),
zs::init(zs::Config::ephemeral(), network),
)
.await;
@ -350,6 +340,12 @@ async fn verify_fail_add_block_checkpoint() -> Result<(), Report> {
Ok(())
}
/*
// This test is disabled because it doesn't test the right thing:
// the BlockVerifier and CheckpointVerifier make different requests
// and produce different transcripts.
#[tokio::test]
// Temporarily ignore this test, until the state can handle out-of-order blocks
#[ignore]
@ -406,7 +402,7 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
.collect();
let checkpoint_list = CheckpointList::from_list(checkpoint_list).map_err(|e| eyre!(e))?;
let mut state_service = zebra_state::init(zebra_state::Config::ephemeral(), network);
let mut state_service = zs::init(zs::Config::ephemeral(), network);
/// SPANDOC: Add blocks to the state from 0..=restart_height {?restart_height}
if restart_height.is_some() {
for block in blockchain
@ -418,7 +414,7 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
.ready_and()
.map_err(|e| eyre!(e))
.await?
.call(zebra_state::Request::AddBlock {
.call(zs::Request::AddBlock {
block: block.clone(),
})
.map_err(|e| eyre!(e))
@ -465,3 +461,4 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
Ok(())
}
*/

View File

@ -13,6 +13,27 @@
//! Verification is provided via a `tower::Service`, to support backpressure and batch
//! verification.
use std::{
collections::BTreeMap,
future::Future,
ops::{Bound, Bound::*},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures_util::FutureExt;
use tokio::sync::oneshot;
use tower::{Service, ServiceExt};
use zebra_chain::{
block::{self, Block},
parameters::Network,
};
use zebra_state as zs;
use crate::{parameters, BoxError};
pub(crate) mod list;
mod types;
@ -23,30 +44,6 @@ pub(crate) use list::CheckpointList;
use types::{Progress, Progress::*};
use types::{Target, Target::*};
use crate::parameters;
use futures_util::FutureExt;
use std::{
collections::BTreeMap,
error,
future::Future,
ops::{Bound, Bound::*},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower::{Service, ServiceExt};
use zebra_chain::{
block::{self, Block},
parameters::Network,
};
/// The inner error type for CheckpointVerifier.
// TODO(jlusby): Error = Report ?
type Error = Box<dyn error::Error + Send + Sync + 'static>;
/// An unverified block, which is in the queue for checkpoint verification.
#[derive(Debug)]
struct QueuedBlock {
@ -55,7 +52,7 @@ struct QueuedBlock {
/// `block`'s cached header hash.
hash: block::Hash,
/// The transmitting end of the oneshot channel for this block's result.
tx: oneshot::Sender<Result<block::Hash, Error>>,
tx: oneshot::Sender<Result<block::Hash, BoxError>>,
}
/// A list of unverified blocks at a particular height.
@ -89,10 +86,7 @@ pub const MAX_CHECKPOINT_HEIGHT_GAP: usize = 2_000;
#[derive(Debug)]
pub struct CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
// Inputs
@ -129,10 +123,7 @@ where
/// Contains non-service utility functions for CheckpointVerifiers.
impl<S> CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// Return a checkpoint verification service for `network`, using the
@ -179,7 +170,7 @@ where
list: impl IntoIterator<Item = (block::Height, block::Hash)>,
initial_tip: Option<Arc<Block>>,
state_service: S,
) -> Result<Self, Error> {
) -> Result<Self, BoxError> {
Ok(Self::from_checkpoint_list(
CheckpointList::from_list(list)?,
initial_tip,
@ -365,7 +356,7 @@ where
/// - the block's height is less than or equal to the previously verified
/// checkpoint
/// - verification has finished
fn check_height(&self, height: block::Height) -> Result<(), Error> {
fn check_height(&self, height: block::Height) -> Result<(), BoxError> {
if height > self.checkpoint_list.max_height() {
Err("block is higher than the maximum checkpoint")?;
}
@ -419,7 +410,7 @@ where
///
/// Returns an error if the block's height is invalid, see `check_height()`
/// for details.
fn check_block(&self, block: &Block) -> Result<block::Height, Error> {
fn check_block(&self, block: &Block) -> Result<block::Height, BoxError> {
let block_height = block
.coinbase_height()
.ok_or("the block does not have a coinbase height")?;
@ -435,7 +426,10 @@ where
///
/// If the block does not have a coinbase height, sends an error on `tx`,
/// and does not queue the block.
fn queue_block(&mut self, block: Arc<Block>) -> oneshot::Receiver<Result<block::Hash, Error>> {
fn queue_block(
&mut self,
block: Arc<Block>,
) -> oneshot::Receiver<Result<block::Hash, BoxError>> {
// Set up a oneshot channel to send results
let (tx, rx) = oneshot::channel();
@ -734,10 +728,7 @@ where
/// CheckpointVerifier rejects pending futures on drop.
impl<S> Drop for CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
/// Send an error on `tx` for any `QueuedBlock`s that haven't been verified.
@ -766,14 +757,11 @@ where
/// After verification, the block futures resolve to their hashes.
impl<S> Service<Arc<Block>> for CheckpointVerifier<S>
where
S: Service<zebra_state::Request, Response = zebra_state::Response, Error = Error>
+ Send
+ Clone
+ 'static,
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
S::Future: Send + 'static,
{
type Response = block::Hash;
type Error = Error;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
@ -805,23 +793,25 @@ where
metrics::gauge!("checkpoint.queued_slots", self.queued.len() as i64);
async move {
match rx.await.expect(
"unexpected closed receiver: CheckpointVerifier does not leave dangling receivers",
) {
match rx
.await
.expect("CheckpointVerifier does not leave dangling receivers")
{
Ok(hash) => {
let verified_hash = match state_service
.oneshot(zebra_state::Request::AddBlock { block })
.await? {
zebra_state::Response::Added { hash } => hash,
_ => unreachable!("unexpected response type: state service should return the correct response type for each request"),
};
.oneshot(zs::Request::CommitFinalizedBlock { block })
.await?
{
zs::Response::Committed(hash) => hash,
_ => unreachable!("wrong response for CommitFinalizedBlock"),
};
assert_eq!(
verified_hash, hash,
"state service returned wrong hash: hashes must be equal"
);
Ok(hash)
}
Err(e) => Err(e)?,
Err(e) => Err(e),
}
}
.boxed()

View File

@ -314,7 +314,7 @@ async fn continuous_blockchain(restart_height: Option<block::Height>) -> Result<
/// SPANDOC: Add block to the state {?height}
ready_state_service
.call(zebra_state::Request::AddBlock {
.call(zebra_state::Request::CommitFinalizedBlock {
block: block.clone(),
})
.await

View File

@ -28,3 +28,6 @@ mod script;
mod transaction;
pub use crate::config::Config;
/// A boxed [`std::error::Error`].
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;