zebra/zebra-state/src/service.rs

110 lines
3.4 KiB
Rust
Raw Normal View History

use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::future::{FutureExt, TryFutureExt};
use tokio::sync::oneshot;
use tower::{buffer::Buffer, util::BoxService, Service};
use zebra_chain::{
block::{self, Block},
parameters::Network,
};
use crate::{BoxError, Config, FinalizedState, NonFinalizedState, Request, Response};
// todo: put this somewhere
#[derive(Debug)]
pub struct QueuedBlock {
pub block: Arc<Block>,
// TODO: add these parameters when we can compute anchors.
// sprout_anchor: sprout::tree::Root,
// sapling_anchor: sapling::tree::Root,
pub rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
}
struct StateService {
/// Holds data relating to finalized chain state.
sled: FinalizedState,
/// Holds data relating to non-finalized chain state.
_mem: NonFinalizedState,
}
impl StateService {
pub fn new(config: Config, network: Network) -> Self {
let sled = FinalizedState::new(&config, network);
let _mem = NonFinalizedState::default();
2020-09-10 10:25:39 -07:00
Self { sled, _mem }
}
}
impl Service<Request> for StateService {
type Response = Response;
type Error = BoxError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request) -> Self::Future {
match req {
2020-09-10 10:25:39 -07:00
Request::CommitBlock { .. } => unimplemented!(),
Request::CommitFinalizedBlock { block } => {
let (rsp_tx, rsp_rx) = oneshot::channel();
self.sled.queue(QueuedBlock { block, rsp_tx });
async move {
rsp_rx
.await
.expect("sender oneshot is not dropped")
2020-09-10 10:52:51 -07:00
.map(Response::Committed)
}
.boxed()
}
Request::Depth(hash) => {
// todo: handle in memory and sled
2020-09-10 10:52:51 -07:00
self.sled.depth(hash).map_ok(Response::Depth).boxed()
}
Request::Tip => {
// todo: handle in memory and sled
2020-09-10 10:52:51 -07:00
self.sled.tip().map_ok(Response::Tip).boxed()
}
Request::BlockLocator => {
// todo: handle in memory and sled
self.sled
.block_locator()
2020-09-10 10:52:51 -07:00
.map_ok(Response::BlockLocator)
.boxed()
}
2020-09-10 10:25:39 -07:00
Request::Transaction(_) => unimplemented!(),
Request::Block(hash_or_height) => {
//todo: handle in memory and sled
self.sled
.block(hash_or_height)
2020-09-10 10:52:51 -07:00
.map_ok(Response::Block)
.boxed()
}
}
}
}
2020-09-09 17:51:08 -07:00
/// Initialize a state service from the provided [`Config`].
///
/// Each `network` has its own separate sled database.
///
/// The resulting service is clonable, to provide shared access to a common chain
/// state. It's possible to construct multiple state services in the same
/// application (as long as they, e.g., use different storage locations), but
/// doing so is probably not what you want.
pub fn init(
config: Config,
network: Network,
) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
Buffer::new(BoxService::new(StateService::new(config, network)), 3)
}