Make the state service use broadcast channels (#1137)

And refactor error handling
This commit is contained in:
Jane Lusby 2020-10-09 01:37:24 -07:00 committed by GitHub
parent 76e7e3d714
commit b3634fa3e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 24 deletions

41
zebra-state/src/error.rs Normal file
View File

@ -0,0 +1,41 @@
use std::sync::Arc;
use thiserror::Error;
/// A wrapper for type erased errors that is itself clonable and implements the
/// Error trait
#[derive(Debug, Error, Clone)]
#[error(transparent)]
pub struct CloneError {
source: Arc<dyn std::error::Error + Send + Sync + 'static>,
}
impl From<CommitBlockError> for CloneError {
fn from(source: CommitBlockError) -> Self {
let source = Arc::new(source);
Self { source }
}
}
impl From<BoxError> for CloneError {
fn from(source: BoxError) -> Self {
let source = Arc::from(source);
Self { source }
}
}
/// A boxed [`std::error::Error`].
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// An error describing the reason a block could not be committed to the state.
#[derive(Debug, Error)]
#[error("block is not contextually valid")]
pub struct CommitBlockError(#[from] ValidateContextError);
/// An error describing why a block failed contextual validation.
#[derive(displaydoc::Display, Debug, Error)]
#[non_exhaustive]
pub enum ValidateContextError {
/// block.height is lower than the current finalized height
#[non_exhaustive]
OrphanedBlock,
}

View File

@ -8,6 +8,7 @@
mod config;
mod constants;
mod error;
mod request;
mod response;
mod service;
@ -22,9 +23,7 @@ use service::QueuedBlock;
use sled_state::FinalizedState;
pub use config::Config;
pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError};
pub use request::{HashOrHeight, Request};
pub use response::Response;
pub use service::init;
/// A boxed [`std::error::Error`].
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

View File

@ -7,8 +7,7 @@ use std::{
use futures::future::{FutureExt, TryFutureExt};
use memory_state::{NonFinalizedState, QueuedBlocks};
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::sync::broadcast;
use tower::{buffer::Buffer, util::BoxService, Service};
use tracing::instrument;
use zebra_chain::{
@ -16,7 +15,10 @@ use zebra_chain::{
parameters::Network,
};
use crate::{BoxError, Config, FinalizedState, Request, Response};
use crate::{
BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response,
ValidateContextError,
};
mod memory_state;
@ -27,7 +29,7 @@ pub struct QueuedBlock {
// TODO: add these parameters when we can compute anchors.
// sprout_anchor: sprout::tree::Root,
// sapling_anchor: sapling::tree::Root,
pub rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
pub rsp_tx: broadcast::Sender<Result<block::Hash, CloneError>>,
}
struct StateService {
@ -39,16 +41,6 @@ struct StateService {
queued_blocks: QueuedBlocks,
}
#[derive(Debug, Error)]
#[error("block is not contextually valid")]
struct CommitError(#[from] ValidateContextError);
#[derive(displaydoc::Display, Debug, Error)]
enum ValidateContextError {
/// block.height is lower than the current finalized height
OrphanedBlock,
}
impl StateService {
pub fn new(config: Config, network: Network) -> Self {
let sled = FinalizedState::new(&config, network);
@ -96,7 +88,7 @@ impl StateService {
/// Run contextual validation on `block` and add it to the non-finalized
/// state if it is contextually valid.
fn validate_and_commit(&mut self, block: Arc<Block>) -> Result<(), CommitError> {
fn validate_and_commit(&mut self, block: Arc<Block>) -> Result<(), CommitBlockError> {
self.check_contextual_validity(&block)?;
let parent_hash = block.header.previous_block_hash;
@ -128,7 +120,7 @@ impl StateService {
let result = self
.validate_and_commit(block)
.map(|()| hash)
.map_err(Into::into);
.map_err(CloneError::from);
let _ = rsp_tx.send(result);
new_parents.push(hash);
}
@ -168,29 +160,33 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CommitBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel();
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });
async move {
rsp_rx
.recv()
.await
.expect("sender oneshot is not dropped")
.expect("sender is not dropped")
.map(Response::Committed)
.map_err(Into::into)
}
.boxed()
}
Request::CommitFinalizedBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel();
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
self.sled
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });
async move {
rsp_rx
.recv()
.await
.expect("sender oneshot is not dropped")
.expect("sender is not dropped")
.map(Response::Committed)
.map_err(Into::into)
}
.boxed()
}

View File

@ -133,7 +133,7 @@ impl FinalizedState {
fn commit_finalized(&mut self, queued_block: QueuedBlock) {
let QueuedBlock { block, rsp_tx } = queued_block;
let result = self.commit_finalized_direct(block);
let _ = rsp_tx.send(result);
let _ = rsp_tx.send(result.map_err(Into::into));
}
// TODO: this impl works only during checkpointing, it needs to be rewritten