Actually use `VerifyCheckpointError::CommitFinalized` (#1706)

Also:
* use `ServiceExt::oneshot` in the checkpoint verifier
* make some error messages more specific
* clean up the `std::future`/`futures` imports
This commit is contained in:
teor 2021-02-09 07:16:36 +10:00 committed by GitHub
parent 132c7fe4a5
commit a91006afa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 9 deletions

View File

@ -15,19 +15,18 @@
use std::{ use std::{
collections::{BTreeMap, HashSet}, collections::{BTreeMap, HashSet},
future::Future,
ops::{Bound, Bound::*}, ops::{Bound, Bound::*},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_util::FutureExt; use futures::{Future, FutureExt, TryFutureExt};
use thiserror::Error; use thiserror::Error;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tower::{Service, ServiceExt}; use tower::{Service, ServiceExt};
use tracing::instrument; use tracing::instrument;
use zebra_chain::{ use zebra_chain::{
block::{self, Block}, block::{self, Block},
parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH}, parameters::{Network, GENESIS_PREVIOUS_BLOCK_HASH},
@ -881,22 +880,25 @@ where
// commit-if-verified logic. This task will always execute, except if // commit-if-verified logic. This task will always execute, except if
// the program is interrupted, in which case there is no longer a // the program is interrupted, in which case there is no longer a
// checkpoint verifier to keep in sync with the state. // checkpoint verifier to keep in sync with the state.
let mut state_service = self.state_service.clone(); let state_service = self.state_service.clone();
let commit_finalized_block = tokio::spawn(async move { let commit_finalized_block = tokio::spawn(async move {
let hash = rx let hash = rx
.await .await
.map_err(Into::into)
.map_err(VerifyCheckpointError::CommitFinalized)
.expect("CheckpointVerifier does not leave dangling receivers")?; .expect("CheckpointVerifier does not leave dangling receivers")?;
// Once we get a verified hash, we must commit it to the chain state // Once we get a verified hash, we must commit it to the chain state
// as a finalized block, or exit the program, so .expect rather than // as a finalized block, or exit the program, so .expect rather than
// propagate errors from the state service. // propagate errors from the state service.
//
// We use a `ServiceExt::oneshot`, so that every state service
// `poll_ready` has a corresponding `call`. See #1593.
match state_service match state_service
.ready_and() .oneshot(zs::Request::CommitFinalizedBlock(block.into()))
.map_err(VerifyCheckpointError::CommitFinalized)
.await .await
.expect("Verified checkpoints must be committed transactionally") .expect("state service commit block failed: verified checkpoints must be committed transactionally")
.call(zs::Request::CommitFinalizedBlock(block.into()))
.await
.expect("Verified checkpoints must be committed transactionally")
{ {
zs::Response::Committed(committed_hash) => { zs::Response::Committed(committed_hash) => {
assert_eq!(committed_hash, hash, "state must commit correct hash"); assert_eq!(committed_hash, hash, "state must commit correct hash");