state: partial implementation of new sled layout

This commit is contained in:
Henry de Valence 2020-09-09 21:15:08 -07:00
parent f1f0b331ac
commit b27ace87eb
4 changed files with 138 additions and 197 deletions

View File

@ -3,6 +3,8 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::future::{FutureExt, TryFutureExt};
use tower::{buffer::Buffer, util::BoxService, Service};
use zebra_chain::parameters::Network;
@ -36,10 +38,32 @@ impl Service<Request> for StateService {
fn call(&mut self, req: Request) -> Self::Future {
match req {
Request::CommitBlock { block } => unimplemented!(),
Request::CommitFinalizedBlock { block } => unimplemented!(),
Request::Depth(hash) => unimplemented!(),
Request::Tip => unimplemented!(),
Request::BlockLocator => unimplemented!(),
Request::CommitFinalizedBlock { block } => {
let rsp = self
.sled
.commit_finalized(block)
.map(|hash| Response::Committed(hash));
async move { rsp }.boxed()
}
Request::Depth(hash) => {
// todo: handle in memory and sled
self.sled
.depth(hash)
.map_ok(|depth| Response::Depth(depth))
.boxed()
}
Request::Tip => {
// todo: handle in memory and sled
self.sled.tip().map_ok(|tip| Response::Tip(tip)).boxed()
}
Request::BlockLocator => {
// todo: handle in memory and sled
self.sled
.block_locator()
.map_ok(|locator| Response::BlockLocator(locator))
.boxed()
}
Request::Transaction(hash) => unimplemented!(),
Request::Block(HashOrHeight::Hash(hash)) => unimplemented!(),
Request::Block(HashOrHeight::Height(height)) => unimplemented!(),

View File

@ -1,221 +1,139 @@
//! The primary implementation of the `zebra_state::Service` built upon sled
use crate::Config;
use std::error;
use std::sync::Arc;
use tracing::instrument;
use zebra_chain::serialization::{SerializationError, ZcashDeserialize, ZcashSerialize};
use std::{convert::TryInto, future::Future, sync::Arc};
use zebra_chain::serialization::ZcashSerialize;
use zebra_chain::{
block::{self, Block},
parameters::Network,
};
use crate::BoxError;
#[derive(Clone)]
pub struct SledState {
storage: sled::Db,
hash_by_height: sled::Tree,
height_by_hash: sled::Tree,
block_by_height: sled::Tree,
// tx_by_hash: sled::Tree,
// utxo_by_outpoint: sled::Tree,
// sprout_nullifiers: sled::Tree,
// sapling_nullifiers: sled::Tree,
// sprout_anchors: sled::Tree,
// sapling_anchors: sled::Tree,
}
impl SledState {
#[instrument]
pub(crate) fn new(config: &Config, network: Network) -> Self {
let config = config.sled_config(network);
pub fn new(config: &Config, network: Network) -> Self {
let db = config.sled_config(network).open().unwrap();
Self {
storage: config.open().unwrap(),
hash_by_height: db.open_tree(b"hash_by_height").unwrap(),
height_by_hash: db.open_tree(b"height_by_hash").unwrap(),
block_by_height: db.open_tree(b"block_by_height").unwrap(),
// tx_by_hash: db.open_tree(b"tx_by_hash").unwrap(),
// utxo_by_outpoint: db.open_tree(b"utxo_by_outpoint").unwrap(),
// sprout_nullifiers: db.open_tree(b"sprout_nullifiers").unwrap(),
// sapling_nullifiers: db.open_tree(b"sapling_nullifiers").unwrap(),
}
}
#[instrument(skip(self))]
pub(super) fn insert(
&mut self,
block: impl Into<Arc<Block>> + std::fmt::Debug,
) -> Result<block::Hash, Error> {
use sled::Transactional;
let block = block.into();
/// Commit a finalized block to the state. It's the caller's responsibility
/// to ensure that blocks are committed in order.
pub fn commit_finalized(&self, block: Arc<Block>) -> Result<block::Hash, BoxError> {
// The only valid block without a coinbase height is the genesis
// block. By this point the block has been validated, so if
// there's no coinbase height, it must be the genesis block.
let height = block.coinbase_height().unwrap_or(block::Height(0));
let height_bytes = height.0.to_be_bytes();
let hash = block.hash();
let height = block
.coinbase_height()
.expect("missing height: valid blocks must have a height");
// Make sure blocks are inserted in order, as a defence in depth.
// See the state design RFC #0005 for details.
//
// TODO: handle multiple chains
match self.get_tip()? {
None => {
// This is a defence in depth - there is no need to check the
// genesis hash or previous block hash here.
assert_eq!(
height,
block::Height(0),
"out of order block: the first block must be at the genesis height"
);
}
Some(tip_hash) => {
assert_eq!(
block.header.previous_block_hash, tip_hash,
"out of order block: the next block must be a child of the current tip"
);
let tip_block = self
.get(tip_hash)?
.expect("missing tip block: tip hashes must have a corresponding block");
let tip_height = tip_block
.coinbase_height()
.expect("missing height: valid blocks must have a height");
assert_eq!(
height,
block::Height(tip_height.0 + 1),
"out of order block: the next height must be 1 greater than the tip height"
);
}
};
use sled::Transactional;
(
&self.hash_by_height,
&self.height_by_hash,
&self.block_by_height,
)
.transaction(|(hash_by_height, height_by_hash, block_by_height)| {
// TODO: do serialization above
// for some reason this wouldn't move into the closure (??)
let block_bytes = block
.zcash_serialize_to_vec()
.expect("zcash_serialize_to_vec has wrong return type");
let height_map = self.storage.open_tree(b"height_map")?;
let by_hash = self.storage.open_tree(b"by_hash")?;
// TODO: check highest entry of hash_by_height as in RFC
let bytes = block.zcash_serialize_to_vec()?;
hash_by_height.insert(&height_bytes, &hash.0)?;
height_by_hash.insert(&hash.0, &height_bytes)?;
block_by_height.insert(&height_bytes, block_bytes)?;
(&height_map, &by_hash).transaction(|(height_map, by_hash)| {
height_map.insert(&height.0.to_be_bytes(), &hash.0)?;
by_hash.insert(&hash.0, bytes.clone())?;
Ok(())
})?;
tracing::trace!(?height, ?hash, "Committed block");
metrics::gauge!("state.committed.block.height", height.0 as _);
metrics::counter!("state.committed.block.count", 1);
// for some reason type inference fails here
Ok::<_, sled::transaction::ConflictableTransactionError>(())
})?;
Ok(hash)
}
#[instrument(skip(self))]
pub(super) fn get(&self, hash: block::Hash) -> Result<Option<Arc<Block>>, Error> {
let by_hash = self.storage.open_tree(b"by_hash")?;
let key = &hash.0;
let value = by_hash.get(key)?;
// TODO: this impl works only during checkpointing, it needs to be rewritten
pub fn block_locator(&self) -> impl Future<Output = Result<Vec<block::Hash>, BoxError>> {
let hash_by_height = self.hash_by_height.clone();
if let Some(bytes) = value {
let bytes = bytes.as_ref();
let block = ZcashDeserialize::zcash_deserialize(bytes)?;
Ok(Some(block))
} else {
Ok(None)
}
}
let tip = self.tip();
#[instrument(skip(self))]
pub(super) fn get_main_chain_at(
&self,
height: block::Height,
) -> Result<Option<block::Hash>, Error> {
let height_map = self.storage.open_tree(b"height_map")?;
let key = height.0.to_be_bytes();
let value = height_map.get(key)?;
async move {
let (tip_height, _) = match tip.await? {
Some(height) => height,
None => return Ok(Vec::new()),
};
if let Some(bytes) = value {
let bytes = bytes.as_ref();
let hash = ZcashDeserialize::zcash_deserialize(bytes)?;
Ok(Some(hash))
} else {
Ok(None)
}
}
#[instrument(skip(self))]
pub(super) fn get_tip(&self) -> Result<Option<block::Hash>, Error> {
let tree = self.storage.open_tree(b"height_map")?;
let last_entry = tree.iter().values().next_back();
match last_entry {
Some(Ok(bytes)) => Ok(Some(ZcashDeserialize::zcash_deserialize(bytes.as_ref())?)),
Some(Err(e)) => Err(e)?,
None => Ok(None),
}
}
#[instrument(skip(self))]
fn contains(&self, hash: &block::Hash) -> Result<bool, Error> {
let by_hash = self.storage.open_tree(b"by_hash")?;
let key = &hash.0;
Ok(by_hash.contains_key(key)?)
}
}
/// An alternate repr for `block::Height` that implements `AsRef<[u8]>` for usage
/// with sled
struct BytesHeight(u32, [u8; 4]);
impl From<block::Height> for BytesHeight {
fn from(height: block::Height) -> Self {
let bytes = height.0.to_be_bytes();
Self(height.0, bytes)
}
}
impl AsRef<[u8]> for BytesHeight {
fn as_ref(&self) -> &[u8] {
&self.1[..]
}
}
pub(super) enum BlockQuery {
ByHash(block::Hash),
ByHeight(block::Height),
}
impl From<block::Hash> for BlockQuery {
fn from(hash: block::Hash) -> Self {
Self::ByHash(hash)
}
}
impl From<block::Height> for BlockQuery {
fn from(height: block::Height) -> Self {
Self::ByHeight(height)
}
}
type BoxError = Box<dyn error::Error + Send + Sync + 'static>;
// these hacks are necessary to capture spantraces that can be extracted again
// while still having a nice From impl.
//
// Please forgive me.
/// a type that can store any error and implements the Error trait at the cost of
/// not implementing From<E: Error>
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
struct BoxRealError(BoxError);
/// The TracedError wrapper on a type that implements Error
#[derive(Debug)]
pub struct Error(tracing_error::TracedError<BoxRealError>);
macro_rules! impl_from {
($($src:ty,)*) => {$(
impl From<$src> for Error {
fn from(source: $src) -> Self {
let source = BoxRealError(source.into());
Self(source.into())
let heights = crate::util::block_locator_heights(tip_height);
let mut hashes = Vec::with_capacity(heights.len());
for height in heights {
if let Some(bytes) = hash_by_height.get(&height.0.to_be_bytes())? {
let hash = block::Hash(bytes.as_ref().try_into().unwrap());
hashes.push(hash)
}
}
Ok(hashes)
}
}
pub fn tip(
&self,
) -> impl Future<Output = Result<Option<(block::Height, block::Hash)>, BoxError>> {
let hash_by_height = self.hash_by_height.clone();
async move {
Ok(hash_by_height
.iter()
.rev()
.next()
.transpose()?
.map(|(height_bytes, hash_bytes)| {
let height = block::Height(u32::from_be_bytes(
height_bytes.as_ref().try_into().unwrap(),
));
let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap());
(height, hash)
}))
}
}
pub fn depth(&self, hash: block::Hash) -> impl Future<Output = Result<Option<u32>, BoxError>> {
let height_by_hash = self.height_by_hash.clone();
// TODO: this impl works only during checkpointing, it needs to be rewritten
let tip = self.tip();
async move {
let height = match height_by_hash.get(&hash.0)? {
Some(bytes) => {
block::Height(u32::from_be_bytes(bytes.as_ref().try_into().unwrap()))
}
None => return Ok(None),
};
let (tip_height, _) = tip.await?.expect("tip must exist");
Ok(Some(tip_height.0 - height.0))
}
)*
}
}
// The hoops we have to jump through to keep using this like a BoxError
impl_from! {
&str,
SerializationError,
std::io::Error,
sled::Error,
sled::transaction::TransactionError,
}
impl Into<BoxError> for Error {
fn into(self) -> BoxError {
BoxError::from(self.0)
}
}

View File

@ -48,7 +48,7 @@ static BLOCK_LOCATOR_CASES: &[(u32, u32)] = &[
#[test]
fn test_block_locator_heights() {
for (height, min_height) in BLOCK_LOCATOR_CASES.iter().cloned() {
let locator = util::block_locator_heights(block::Height(height)).collect::<Vec<_>>();
let locator = util::block_locator_heights(block::Height(height));
assert!(!locator.is_empty(), "locators must not be empty");
if (height - min_height) > 1 {

View File

@ -4,7 +4,7 @@ use zebra_chain::block;
use crate::constants;
/// Get the heights of the blocks for constructing a block_locator list
pub fn block_locator_heights(tip_height: block::Height) -> impl Iterator<Item = block::Height> {
pub fn block_locator_heights(tip_height: block::Height) -> Vec<block::Height> {
// Stop at the reorg limit, or the genesis block.
let min_locator_height = tip_height
.0
@ -17,13 +17,12 @@ pub fn block_locator_heights(tip_height: block::Height) -> impl Iterator<Item =
.chain(iter::once(min_locator_height))
.map(block::Height);
let locators: Vec<_> = locators.collect();
let locators = locators.collect();
tracing::info!(
?tip_height,
?min_locator_height,
?locators,
"created block locator"
);
locators.into_iter()
locators
}