diff --git a/zebra-state/src/error.rs b/zebra-state/src/error.rs new file mode 100644 index 000000000..8bdf83769 --- /dev/null +++ b/zebra-state/src/error.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; +use thiserror::Error; + +/// A wrapper for type erased errors that is itself clonable and implements the +/// Error trait +#[derive(Debug, Error, Clone)] +#[error(transparent)] +pub struct CloneError { + source: Arc, +} + +impl From for CloneError { + fn from(source: CommitBlockError) -> Self { + let source = Arc::new(source); + Self { source } + } +} + +impl From for CloneError { + fn from(source: BoxError) -> Self { + let source = Arc::from(source); + Self { source } + } +} + +/// A boxed [`std::error::Error`]. +pub type BoxError = Box; + +/// An error describing the reason a block could not be committed to the state. +#[derive(Debug, Error)] +#[error("block is not contextually valid")] +pub struct CommitBlockError(#[from] ValidateContextError); + +/// An error describing why a block failed contextual validation. +#[derive(displaydoc::Display, Debug, Error)] +#[non_exhaustive] +pub enum ValidateContextError { + /// block.height is lower than the current finalized height + #[non_exhaustive] + OrphanedBlock, +} diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index aa076fd85..88ac2dab2 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -8,6 +8,7 @@ mod config; mod constants; +mod error; mod request; mod response; mod service; @@ -22,9 +23,7 @@ use service::QueuedBlock; use sled_state::FinalizedState; pub use config::Config; +pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError}; pub use request::{HashOrHeight, Request}; pub use response::Response; pub use service::init; - -/// A boxed [`std::error::Error`]. -pub type BoxError = Box; diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 0eb487515..196b9e1ed 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -7,8 +7,7 @@ use std::{ use futures::future::{FutureExt, TryFutureExt}; use memory_state::{NonFinalizedState, QueuedBlocks}; -use thiserror::Error; -use tokio::sync::oneshot; +use tokio::sync::broadcast; use tower::{buffer::Buffer, util::BoxService, Service}; use tracing::instrument; use zebra_chain::{ @@ -16,7 +15,10 @@ use zebra_chain::{ parameters::Network, }; -use crate::{BoxError, Config, FinalizedState, Request, Response}; +use crate::{ + BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response, + ValidateContextError, +}; mod memory_state; @@ -27,7 +29,7 @@ pub struct QueuedBlock { // TODO: add these parameters when we can compute anchors. // sprout_anchor: sprout::tree::Root, // sapling_anchor: sapling::tree::Root, - pub rsp_tx: oneshot::Sender>, + pub rsp_tx: broadcast::Sender>, } struct StateService { @@ -39,16 +41,6 @@ struct StateService { queued_blocks: QueuedBlocks, } -#[derive(Debug, Error)] -#[error("block is not contextually valid")] -struct CommitError(#[from] ValidateContextError); - -#[derive(displaydoc::Display, Debug, Error)] -enum ValidateContextError { - /// block.height is lower than the current finalized height - OrphanedBlock, -} - impl StateService { pub fn new(config: Config, network: Network) -> Self { let sled = FinalizedState::new(&config, network); @@ -96,7 +88,7 @@ impl StateService { /// Run contextual validation on `block` and add it to the non-finalized /// state if it is contextually valid. - fn validate_and_commit(&mut self, block: Arc) -> Result<(), CommitError> { + fn validate_and_commit(&mut self, block: Arc) -> Result<(), CommitBlockError> { self.check_contextual_validity(&block)?; let parent_hash = block.header.previous_block_hash; @@ -128,7 +120,7 @@ impl StateService { let result = self .validate_and_commit(block) .map(|()| hash) - .map_err(Into::into); + .map_err(CloneError::from); let _ = rsp_tx.send(result); new_parents.push(hash); } @@ -168,29 +160,33 @@ impl Service for StateService { fn call(&mut self, req: Request) -> Self::Future { match req { Request::CommitBlock { block } => { - let (rsp_tx, rsp_rx) = oneshot::channel(); + let (rsp_tx, mut rsp_rx) = broadcast::channel(1); self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx }); async move { rsp_rx + .recv() .await - .expect("sender oneshot is not dropped") + .expect("sender is not dropped") .map(Response::Committed) + .map_err(Into::into) } .boxed() } Request::CommitFinalizedBlock { block } => { - let (rsp_tx, rsp_rx) = oneshot::channel(); + let (rsp_tx, mut rsp_rx) = broadcast::channel(1); self.sled .queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx }); async move { rsp_rx + .recv() .await - .expect("sender oneshot is not dropped") + .expect("sender is not dropped") .map(Response::Committed) + .map_err(Into::into) } .boxed() } diff --git a/zebra-state/src/sled_state.rs b/zebra-state/src/sled_state.rs index a1ba1e67e..73b593956 100644 --- a/zebra-state/src/sled_state.rs +++ b/zebra-state/src/sled_state.rs @@ -133,7 +133,7 @@ impl FinalizedState { fn commit_finalized(&mut self, queued_block: QueuedBlock) { let QueuedBlock { block, rsp_tx } = queued_block; let result = self.commit_finalized_direct(block); - let _ = rsp_tx.send(result); + let _ = rsp_tx.send(result.map_err(Into::into)); } // TODO: this impl works only during checkpointing, it needs to be rewritten