remap the state storage to only store blocks once (#741)
This commit is contained in:
parent
917a4fbdbe
commit
1015db25a8
|
@ -2729,9 +2729,11 @@ dependencies = [
|
|||
"sled",
|
||||
"spandoc",
|
||||
"tempdir",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-futures",
|
||||
"zebra-chain",
|
||||
"zebra-test",
|
||||
|
|
|
@ -16,8 +16,8 @@ use thiserror::Error;
|
|||
// XXX refine error types -- better to use boxed errors?
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SerializationError {
|
||||
/// An underlying IO error.
|
||||
#[error("io error")]
|
||||
/// An io error that prevented deserialization
|
||||
#[error("unable to deserialize type")]
|
||||
Io(#[from] io::Error),
|
||||
/// The data to be deserialized was malformed.
|
||||
// XXX refine errors
|
||||
|
|
|
@ -21,6 +21,8 @@ futures = "0.3.5"
|
|||
tower = "0.3.1"
|
||||
tracing = "0.1"
|
||||
tracing-futures = "0.2"
|
||||
tracing-error = "0.1.2"
|
||||
thiserror = "1.0.20"
|
||||
|
||||
[dev-dependencies]
|
||||
zebra-test = { path = "../zebra-test/" }
|
||||
|
|
|
@ -98,9 +98,8 @@ impl Service<Request> for InMemoryState {
|
|||
let block_locator = crate::block_locator_heights(tip_height)
|
||||
.map(|height| {
|
||||
self.index
|
||||
.get(height)
|
||||
.get_main_chain_at(height)
|
||||
.expect("there should be no holes in the chain")
|
||||
.hash()
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ use zebra_chain::{
|
|||
#[derive(Default)]
|
||||
pub(super) struct BlockIndex {
|
||||
by_hash: HashMap<BlockHeaderHash, Arc<Block>>,
|
||||
by_height: BTreeMap<BlockHeight, Arc<Block>>,
|
||||
height_map: BTreeMap<BlockHeight, BlockHeaderHash>,
|
||||
}
|
||||
|
||||
impl BlockIndex {
|
||||
|
@ -22,9 +22,9 @@ impl BlockIndex {
|
|||
let hash = block.as_ref().into();
|
||||
let height = block.coinbase_height().unwrap();
|
||||
|
||||
match self.by_height.entry(height) {
|
||||
match self.height_map.entry(height) {
|
||||
Entry::Vacant(entry) => {
|
||||
let _ = entry.insert(block.clone());
|
||||
let _ = entry.insert(hash);
|
||||
let _ = self.by_hash.insert(hash, block);
|
||||
Ok(hash)
|
||||
}
|
||||
|
@ -32,35 +32,18 @@ impl BlockIndex {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn get(&mut self, query: impl Into<BlockQuery>) -> Option<Arc<Block>> {
|
||||
match query.into() {
|
||||
BlockQuery::ByHash(hash) => self.by_hash.get(&hash),
|
||||
BlockQuery::ByHeight(height) => self.by_height.get(&height),
|
||||
}
|
||||
.cloned()
|
||||
pub(super) fn get(&self, hash: BlockHeaderHash) -> Option<Arc<Block>> {
|
||||
self.by_hash.get(&hash).cloned()
|
||||
}
|
||||
|
||||
pub(super) fn get_main_chain_at(&self, height: BlockHeight) -> Option<BlockHeaderHash> {
|
||||
self.height_map.get(&height).cloned()
|
||||
}
|
||||
|
||||
pub(super) fn get_tip(&self) -> Option<Arc<Block>> {
|
||||
self.by_height
|
||||
.iter()
|
||||
.next_back()
|
||||
.map(|(_key, value)| value.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) enum BlockQuery {
|
||||
ByHash(BlockHeaderHash),
|
||||
ByHeight(BlockHeight),
|
||||
}
|
||||
|
||||
impl From<BlockHeaderHash> for BlockQuery {
|
||||
fn from(hash: BlockHeaderHash) -> Self {
|
||||
Self::ByHash(hash)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BlockHeight> for BlockQuery {
|
||||
fn from(height: BlockHeight) -> Self {
|
||||
Self::ByHeight(height)
|
||||
self.height_map.iter().next_back().map(|(_height, &hash)| {
|
||||
self.get(hash)
|
||||
.expect("block must be in pool to be in the height map")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,7 +10,8 @@ use std::{
|
|||
task::{Context, Poll},
|
||||
};
|
||||
use tower::{buffer::Buffer, Service};
|
||||
use zebra_chain::serialization::{ZcashDeserialize, ZcashSerialize};
|
||||
use tracing::instrument;
|
||||
use zebra_chain::serialization::{SerializationError, ZcashDeserialize, ZcashSerialize};
|
||||
use zebra_chain::{
|
||||
block::{Block, BlockHeaderHash},
|
||||
types::BlockHeight,
|
||||
|
@ -23,6 +24,7 @@ struct SledState {
|
|||
}
|
||||
|
||||
impl SledState {
|
||||
#[instrument]
|
||||
pub(crate) fn new(config: &Config, network: Network) -> Self {
|
||||
let config = config.sled_config(network);
|
||||
|
||||
|
@ -31,41 +33,33 @@ impl SledState {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub(super) fn insert(
|
||||
&mut self,
|
||||
block: impl Into<Arc<Block>>,
|
||||
block: impl Into<Arc<Block>> + std::fmt::Debug,
|
||||
) -> Result<BlockHeaderHash, Error> {
|
||||
let block = block.into();
|
||||
let hash: BlockHeaderHash = block.as_ref().into();
|
||||
let height = block.coinbase_height().unwrap();
|
||||
|
||||
let by_height = self.storage.open_tree(b"by_height")?;
|
||||
let height_map = self.storage.open_tree(b"height_map")?;
|
||||
let by_hash = self.storage.open_tree(b"by_hash")?;
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
block.zcash_serialize(&mut bytes)?;
|
||||
|
||||
// TODO(jlusby): make this transactional
|
||||
by_height.insert(&height.0.to_be_bytes(), bytes.as_slice())?;
|
||||
height_map.insert(&height.0.to_be_bytes(), &hash.0)?;
|
||||
by_hash.insert(&hash.0, bytes)?;
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
pub(super) fn get(&self, query: impl Into<BlockQuery>) -> Result<Option<Arc<Block>>, Error> {
|
||||
let query = query.into();
|
||||
let value = match query {
|
||||
BlockQuery::ByHash(hash) => {
|
||||
let by_hash = self.storage.open_tree(b"by_hash")?;
|
||||
let key = &hash.0;
|
||||
by_hash.get(key)?
|
||||
}
|
||||
BlockQuery::ByHeight(height) => {
|
||||
let by_height = self.storage.open_tree(b"by_height")?;
|
||||
let key = height.0.to_be_bytes();
|
||||
by_height.get(key)?
|
||||
}
|
||||
};
|
||||
#[instrument(skip(self))]
|
||||
pub(super) fn get(&self, hash: BlockHeaderHash) -> Result<Option<Arc<Block>>, Error> {
|
||||
let by_hash = self.storage.open_tree(b"by_hash")?;
|
||||
let key = &hash.0;
|
||||
let value = by_hash.get(key)?;
|
||||
|
||||
if let Some(bytes) = value {
|
||||
let bytes = bytes.as_ref();
|
||||
|
@ -76,8 +70,27 @@ impl SledState {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn get_tip(&self) -> Result<Option<Arc<Block>>, Error> {
|
||||
let tree = self.storage.open_tree(b"by_height")?;
|
||||
#[instrument(skip(self))]
|
||||
pub(super) fn get_main_chain_at(
|
||||
&self,
|
||||
height: BlockHeight,
|
||||
) -> Result<Option<BlockHeaderHash>, Error> {
|
||||
let height_map = self.storage.open_tree(b"height_map")?;
|
||||
let key = height.0.to_be_bytes();
|
||||
let value = height_map.get(key)?;
|
||||
|
||||
if let Some(bytes) = value {
|
||||
let bytes = bytes.as_ref();
|
||||
let hash = ZcashDeserialize::zcash_deserialize(bytes)?;
|
||||
Ok(Some(hash))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub(super) fn get_tip(&self) -> Result<Option<BlockHeaderHash>, Error> {
|
||||
let tree = self.storage.open_tree(b"height_map")?;
|
||||
let last_entry = tree.iter().values().next_back();
|
||||
|
||||
match last_entry {
|
||||
|
@ -87,6 +100,7 @@ impl SledState {
|
|||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
fn contains(&self, hash: &BlockHeaderHash) -> Result<bool, Error> {
|
||||
let by_hash = self.storage.open_tree(b"by_hash")?;
|
||||
let key = &hash.0;
|
||||
|
@ -127,7 +141,6 @@ impl Service<Request> for SledState {
|
|||
async move {
|
||||
storage
|
||||
.get_tip()?
|
||||
.map(|block| block.as_ref().into())
|
||||
.map(|hash| Response::Tip { hash })
|
||||
.ok_or_else(|| "zebra-state contains no blocks".into())
|
||||
}
|
||||
|
@ -144,9 +157,12 @@ impl Service<Request> for SledState {
|
|||
let block = storage
|
||||
.get(hash)?
|
||||
.expect("block must be present if contains returned true");
|
||||
let tip = storage
|
||||
let tip_hash = storage
|
||||
.get_tip()?
|
||||
.expect("storage must have a tip if it contains the previous block");
|
||||
let tip = storage
|
||||
.get(tip_hash)?
|
||||
.expect("block must be present if contains returned true");
|
||||
|
||||
let depth =
|
||||
tip.coinbase_height().unwrap().0 - block.coinbase_height().unwrap().0;
|
||||
|
@ -159,7 +175,7 @@ impl Service<Request> for SledState {
|
|||
let storage = self.clone();
|
||||
|
||||
async move {
|
||||
let tip = match storage.get_tip()? {
|
||||
let tip_hash = match storage.get_tip()? {
|
||||
Some(tip) => tip,
|
||||
None => {
|
||||
return Ok(Response::BlockLocator {
|
||||
|
@ -168,6 +184,10 @@ impl Service<Request> for SledState {
|
|||
}
|
||||
};
|
||||
|
||||
let tip = storage
|
||||
.get(tip_hash)?
|
||||
.expect("block must be present if contains returned true");
|
||||
|
||||
let tip_height = tip
|
||||
.coinbase_height()
|
||||
.expect("tip of the current chain will have a coinbase height");
|
||||
|
@ -176,10 +196,8 @@ impl Service<Request> for SledState {
|
|||
|
||||
let block_locator = heights
|
||||
.map(|height| {
|
||||
storage.get(height).map(|block| {
|
||||
block
|
||||
.expect("there should be no holes in the current chain")
|
||||
.hash()
|
||||
storage.get_main_chain_at(height).map(|hash| {
|
||||
hash.expect("there should be no holes in the current chain")
|
||||
})
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
|
@ -235,12 +253,53 @@ pub fn init(
|
|||
) -> impl Service<
|
||||
Request,
|
||||
Response = Response,
|
||||
Error = Error,
|
||||
Future = impl Future<Output = Result<Response, Error>>,
|
||||
Error = BoxError,
|
||||
Future = impl Future<Output = Result<Response, BoxError>>,
|
||||
> + Send
|
||||
+ Clone
|
||||
+ 'static {
|
||||
Buffer::new(SledState::new(&config, network), 1)
|
||||
}
|
||||
|
||||
type Error = Box<dyn error::Error + Send + Sync + 'static>;
|
||||
type BoxError = Box<dyn error::Error + Send + Sync + 'static>;
|
||||
|
||||
// these hacks are necessary to capture spantraces that can be extracted again
|
||||
// while still having a nice From impl.
|
||||
//
|
||||
// Please forgive me.
|
||||
|
||||
/// a type that can store any error and implements the Error trait at the cost of
|
||||
/// not implementing From<E: Error>
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error(transparent)]
|
||||
struct BoxRealError(BoxError);
|
||||
|
||||
/// The TracedError wrapper on a type that implements Error
|
||||
#[derive(Debug)]
|
||||
struct Error(tracing_error::TracedError<BoxRealError>);
|
||||
|
||||
macro_rules! impl_from {
|
||||
($($src:ty,)*) => {$(
|
||||
impl From<$src> for Error {
|
||||
fn from(source: $src) -> Self {
|
||||
let source = BoxRealError(source.into());
|
||||
Self(source.into())
|
||||
}
|
||||
}
|
||||
)*
|
||||
}
|
||||
}
|
||||
|
||||
// The hoops we have to jump through to keep using this like a BoxError
|
||||
impl_from! {
|
||||
&str,
|
||||
SerializationError,
|
||||
std::io::Error,
|
||||
sled::Error,
|
||||
}
|
||||
|
||||
impl Into<BoxError> for Error {
|
||||
fn into(self) -> BoxError {
|
||||
BoxError::from(self.0)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,8 @@ use std::{
|
|||
};
|
||||
use tower::{Service, ServiceExt};
|
||||
|
||||
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
pub struct Transcript<R, S, I>
|
||||
where
|
||||
I: Iterator<Item = (R, S)>,
|
||||
|
@ -33,12 +35,20 @@ where
|
|||
pub async fn check<C>(mut self, mut to_check: C) -> Result<(), Report>
|
||||
where
|
||||
C: Service<R, Response = S>,
|
||||
C::Error: Debug,
|
||||
C::Error: Into<BoxError>,
|
||||
{
|
||||
while let Some((req, expected_rsp)) = self.messages.next() {
|
||||
// These unwraps could propagate errors with the correct
|
||||
// bound on C::Error
|
||||
let rsp = to_check.ready_and().await.unwrap().call(req).await.unwrap();
|
||||
let rsp = to_check
|
||||
.ready_and()
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
.map_err(|e| eyre!(e))?
|
||||
.call(req)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
.map_err(|e| eyre!(e))?;
|
||||
ensure!(
|
||||
rsp == expected_rsp,
|
||||
"Expected {:?}, got {:?}",
|
||||
|
|
Loading…
Reference in New Issue