state: process CommitFinalizedBlock out of order
This commit is contained in:
parent
b27ace87eb
commit
5d8decd224
|
@ -23,6 +23,7 @@ tower = "0.3.1"
|
|||
tracing = "0.1"
|
||||
tracing-error = "0.1.2"
|
||||
thiserror = "1.0.20"
|
||||
tokio = { version = "0.2.22", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
zebra-test = { path = "../zebra-test/" }
|
||||
|
|
|
@ -20,6 +20,7 @@ mod util;
|
|||
mod tests;
|
||||
|
||||
use memory_state::MemoryState;
|
||||
use service::QueuedBlock;
|
||||
use sled_state::SledState;
|
||||
|
||||
pub use config::Config;
|
||||
|
|
|
@ -1,15 +1,29 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use futures::future::{FutureExt, TryFutureExt};
|
||||
use tokio::sync::oneshot;
|
||||
use tower::{buffer::Buffer, util::BoxService, Service};
|
||||
use zebra_chain::parameters::Network;
|
||||
use zebra_chain::{
|
||||
block::{self, Block},
|
||||
parameters::Network,
|
||||
};
|
||||
|
||||
use crate::{BoxError, Config, HashOrHeight, MemoryState, Request, Response, SledState};
|
||||
|
||||
// todo: put this somewhere
|
||||
pub struct QueuedBlock {
|
||||
pub block: Arc<Block>,
|
||||
// TODO: add these parameters when we can compute anchors.
|
||||
// sprout_anchor: sprout::tree::Root,
|
||||
// sapling_anchor: sapling::tree::Root,
|
||||
pub rsp_tx: oneshot::Sender<Result<block::Hash, BoxError>>,
|
||||
}
|
||||
|
||||
struct StateService {
|
||||
/// Holds data relating to finalized chain state.
|
||||
sled: SledState,
|
||||
|
@ -39,12 +53,18 @@ impl Service<Request> for StateService {
|
|||
match req {
|
||||
Request::CommitBlock { block } => unimplemented!(),
|
||||
Request::CommitFinalizedBlock { block } => {
|
||||
let rsp = self
|
||||
.sled
|
||||
.commit_finalized(block)
|
||||
.map(|hash| Response::Committed(hash));
|
||||
let (rsp_tx, rsp_rx) = oneshot::channel();
|
||||
|
||||
async move { rsp }.boxed()
|
||||
self.sled.queue(QueuedBlock { block, rsp_tx });
|
||||
self.sled.process_queue();
|
||||
|
||||
async move {
|
||||
rsp_rx
|
||||
.await
|
||||
.expect("sender oneshot is not dropped")
|
||||
.map(|hash| Response::Committed(hash))
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
Request::Depth(hash) => {
|
||||
// todo: handle in memory and sled
|
||||
|
|
|
@ -1,16 +1,19 @@
|
|||
//! The primary implementation of the `zebra_state::Service` built upon sled
|
||||
use crate::Config;
|
||||
use std::{convert::TryInto, future::Future, sync::Arc};
|
||||
|
||||
use std::{collections::HashMap, convert::TryInto, future::Future};
|
||||
|
||||
use zebra_chain::serialization::ZcashSerialize;
|
||||
use zebra_chain::{
|
||||
block::{self, Block},
|
||||
block::{self},
|
||||
parameters::Network,
|
||||
};
|
||||
|
||||
use crate::BoxError;
|
||||
use crate::{BoxError, Config, QueuedBlock};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SledState {
|
||||
/// Queued blocks that arrived out of order, indexed by their parent block hash.
|
||||
queued_by_prev_hash: HashMap<block::Hash, QueuedBlock>,
|
||||
|
||||
hash_by_height: sled::Tree,
|
||||
height_by_hash: sled::Tree,
|
||||
block_by_height: sled::Tree,
|
||||
|
@ -27,6 +30,7 @@ impl SledState {
|
|||
let db = config.sled_config(network).open().unwrap();
|
||||
|
||||
Self {
|
||||
queued_by_prev_hash: HashMap::new(),
|
||||
hash_by_height: db.open_tree(b"hash_by_height").unwrap(),
|
||||
height_by_hash: db.open_tree(b"height_by_hash").unwrap(),
|
||||
block_by_height: db.open_tree(b"block_by_height").unwrap(),
|
||||
|
@ -37,9 +41,33 @@ impl SledState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Queue a finalized block to be committed to the state.
|
||||
pub fn queue(&mut self, queued_block: QueuedBlock) {
|
||||
let prev_hash = queued_block.block.header.previous_block_hash;
|
||||
self.queued_by_prev_hash.insert(prev_hash, queued_block);
|
||||
}
|
||||
|
||||
pub fn process_queue(&mut self) {
|
||||
// Cloning means the closure doesn't hold a borrow of &self,
|
||||
// conflicting with mutable access in the loop below.
|
||||
let hash_by_height = self.hash_by_height.clone();
|
||||
let tip_hash = || {
|
||||
read_tip(&hash_by_height)
|
||||
.expect("inability to look up tip is unrecoverable")
|
||||
.map(|(_height, hash)| hash)
|
||||
.unwrap_or(block::Hash([0; 32]))
|
||||
};
|
||||
|
||||
while let Some(queued_block) = self.queued_by_prev_hash.remove(&tip_hash()) {
|
||||
self.commit_finalized(queued_block)
|
||||
}
|
||||
}
|
||||
|
||||
/// Commit a finalized block to the state. It's the caller's responsibility
|
||||
/// to ensure that blocks are committed in order.
|
||||
pub fn commit_finalized(&self, block: Arc<Block>) -> Result<block::Hash, BoxError> {
|
||||
fn commit_finalized(&mut self, queued_block: QueuedBlock) {
|
||||
let QueuedBlock { block, rsp_tx } = queued_block;
|
||||
|
||||
// The only valid block without a coinbase height is the genesis
|
||||
// block. By this point the block has been validated, so if
|
||||
// there's no coinbase height, it must be the genesis block.
|
||||
|
@ -48,7 +76,7 @@ impl SledState {
|
|||
let hash = block.hash();
|
||||
|
||||
use sled::Transactional;
|
||||
(
|
||||
let transaction_result = (
|
||||
&self.hash_by_height,
|
||||
&self.height_by_hash,
|
||||
&self.block_by_height,
|
||||
|
@ -68,9 +96,9 @@ impl SledState {
|
|||
|
||||
// for some reason type inference fails here
|
||||
Ok::<_, sled::transaction::ConflictableTransactionError>(())
|
||||
})?;
|
||||
});
|
||||
|
||||
Ok(hash)
|
||||
let _ = rsp_tx.send(transaction_result.map(|_| hash).map_err(Into::into));
|
||||
}
|
||||
|
||||
// TODO: this impl works only during checkpointing, it needs to be rewritten
|
||||
|
@ -101,20 +129,7 @@ impl SledState {
|
|||
&self,
|
||||
) -> impl Future<Output = Result<Option<(block::Height, block::Hash)>, BoxError>> {
|
||||
let hash_by_height = self.hash_by_height.clone();
|
||||
async move {
|
||||
Ok(hash_by_height
|
||||
.iter()
|
||||
.rev()
|
||||
.next()
|
||||
.transpose()?
|
||||
.map(|(height_bytes, hash_bytes)| {
|
||||
let height = block::Height(u32::from_be_bytes(
|
||||
height_bytes.as_ref().try_into().unwrap(),
|
||||
));
|
||||
let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap());
|
||||
(height, hash)
|
||||
}))
|
||||
}
|
||||
async move { read_tip(&hash_by_height) }
|
||||
}
|
||||
|
||||
pub fn depth(&self, hash: block::Hash) -> impl Future<Output = Result<Option<u32>, BoxError>> {
|
||||
|
@ -137,3 +152,19 @@ impl SledState {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Split into a helper function to be called synchronously or asynchronously.
|
||||
fn read_tip(hash_by_height: &sled::Tree) -> Result<Option<(block::Height, block::Hash)>, BoxError> {
|
||||
Ok(hash_by_height
|
||||
.iter()
|
||||
.rev()
|
||||
.next()
|
||||
.transpose()?
|
||||
.map(|(height_bytes, hash_bytes)| {
|
||||
let height = block::Height(u32::from_be_bytes(
|
||||
height_bytes.as_ref().try_into().unwrap(),
|
||||
));
|
||||
let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap());
|
||||
(height, hash)
|
||||
}))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue