state: perform sled reads synchronously

We already use an actor model for the state service, so we get an
ordered sequence of state queries by message-passing.  Instead of
performing reads in the futures we return, this commit performs them
synchronously.  This means that all sled access is done from the same
task, which

(1) might reduce contention
(2) allows us to avoid using sled transactions when writing to the
state.

Co-authored-by: Jane Lusby <jane@zfnd.org>


Co-authored-by: Jane Lusby <jane@zfnd.org>
Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
This commit is contained in:
Henry de Valence 2020-10-24 17:09:50 -07:00
parent 253bab042e
commit 6f8f8a56d4
2 changed files with 52 additions and 88 deletions

View File

@ -7,7 +7,7 @@ use std::{
time::Instant,
};
use futures::future::{FutureExt, TryFutureExt};
use futures::future::FutureExt;
use memory_state::{NonFinalizedState, QueuedBlocks};
use tokio::sync::broadcast;
use tower::{util::BoxService, Service};
@ -213,26 +213,24 @@ impl Service<Request> for StateService {
}
Request::Depth(hash) => {
// todo: handle in memory and sled
self.sled.depth(hash).map_ok(Response::Depth).boxed()
let rsp = self.sled.depth(hash).map(Response::Depth);
async move { rsp }.boxed()
}
Request::Tip => {
// todo: handle in memory and sled
self.sled.tip().map_ok(Response::Tip).boxed()
let rsp = self.sled.tip().map(Response::Tip);
async move { rsp }.boxed()
}
Request::BlockLocator => {
// todo: handle in memory and sled
self.sled
.block_locator()
.map_ok(Response::BlockLocator)
.boxed()
let rsp = self.sled.block_locator().map(Response::BlockLocator);
async move { rsp }.boxed()
}
Request::Transaction(_) => unimplemented!(),
Request::Block(hash_or_height) => {
//todo: handle in memory and sled
self.sled
.block(hash_or_height)
.map_ok(Response::Block)
.boxed()
let rsp = self.sled.block(hash_or_height).map(Response::Block);
async move { rsp }.boxed()
}
Request::AwaitUtxo(outpoint) => {
let fut = self.pending_utxos.queue(outpoint);

View File

@ -1,6 +1,6 @@
//! The primary implementation of the `zebra_state::Service` built upon sled
use std::{collections::HashMap, convert::TryInto, future::Future, sync::Arc};
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tracing::trace;
use zebra_chain::{
@ -158,7 +158,7 @@ impl FinalizedState {
/// Returns the hash of the current finalized tip block.
pub fn finalized_tip_hash(&self) -> block::Hash {
read_tip(&self.hash_by_height)
self.tip()
.expect("inability to look up tip is unrecoverable")
.map(|(_, hash)| hash)
// if the state is empty, return the genesis previous block hash
@ -167,7 +167,7 @@ impl FinalizedState {
/// Returns the height of the current finalized tip block.
pub fn finalized_tip_height(&self) -> Option<block::Height> {
read_tip(&self.hash_by_height)
self.tip()
.expect("inability to look up tip is unrecoverable")
.map(|(height, _)| height)
}
@ -239,78 +239,60 @@ impl FinalizedState {
}
// TODO: this impl works only during checkpointing, it needs to be rewritten
pub fn block_locator(&self) -> impl Future<Output = Result<Vec<block::Hash>, BoxError>> {
let hash_by_height = self.hash_by_height.clone();
pub fn block_locator(&self) -> Result<Vec<block::Hash>, BoxError> {
let (tip_height, _) = match self.tip()? {
Some(height) => height,
None => return Ok(Vec::new()),
};
let tip = self.tip();
async move {
let (tip_height, _) = match tip.await? {
Some(height) => height,
None => return Ok(Vec::new()),
};
let heights = crate::util::block_locator_heights(tip_height);
let mut hashes = Vec::with_capacity(heights.len());
for height in heights {
if let Some(bytes) = hash_by_height.get(&height.0.to_be_bytes())? {
let hash = block::Hash(bytes.as_ref().try_into().unwrap());
hashes.push(hash)
}
let heights = crate::util::block_locator_heights(tip_height);
let mut hashes = Vec::with_capacity(heights.len());
for height in heights {
if let Some(bytes) = self.hash_by_height.get(&height.0.to_be_bytes())? {
let hash = block::Hash(bytes.as_ref().try_into().unwrap());
hashes.push(hash)
}
Ok(hashes)
}
Ok(hashes)
}
pub fn tip(
&self,
) -> impl Future<Output = Result<Option<(block::Height, block::Hash)>, BoxError>> {
let hash_by_height = self.hash_by_height.clone();
async move { read_tip(&hash_by_height) }
pub fn tip(&self) -> Result<Option<(block::Height, block::Hash)>, BoxError> {
Ok(self.hash_by_height.iter().rev().next().transpose()?.map(
|(height_bytes, hash_bytes)| {
let height = block::Height(u32::from_be_bytes(
height_bytes.as_ref().try_into().unwrap(),
));
let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap());
(height, hash)
},
))
}
pub fn depth(&self, hash: block::Hash) -> impl Future<Output = Result<Option<u32>, BoxError>> {
let height_by_hash = self.height_by_hash.clone();
pub fn depth(&self, hash: block::Hash) -> Result<Option<u32>, BoxError> {
let height = match self.height_by_hash.get(&hash.0)? {
Some(bytes) => block::Height(u32::from_be_bytes(bytes.as_ref().try_into().unwrap())),
None => return Ok(None),
};
// TODO: this impl works only during checkpointing, it needs to be rewritten
let tip = self.tip();
let (tip_height, _) = self.tip()?.expect("tip must exist");
async move {
let height = match height_by_hash.get(&hash.0)? {
Ok(Some(tip_height.0 - height.0))
}
pub fn block(&self, hash_or_height: HashOrHeight) -> Result<Option<Arc<Block>>, BoxError> {
let height = match hash_or_height {
HashOrHeight::Height(height) => height,
HashOrHeight::Hash(hash) => match self.height_by_hash.get(&hash.0)? {
Some(bytes) => {
block::Height(u32::from_be_bytes(bytes.as_ref().try_into().unwrap()))
}
None => return Ok(None),
};
},
};
let (tip_height, _) = tip.await?.expect("tip must exist");
Ok(Some(tip_height.0 - height.0))
}
}
pub fn block(
&self,
hash_or_height: HashOrHeight,
) -> impl Future<Output = Result<Option<Arc<Block>>, BoxError>> {
let height_by_hash = self.height_by_hash.clone();
let block_by_height = self.block_by_height.clone();
async move {
let height = match hash_or_height {
HashOrHeight::Height(height) => height,
HashOrHeight::Hash(hash) => match height_by_hash.get(&hash.0)? {
Some(bytes) => {
block::Height(u32::from_be_bytes(bytes.as_ref().try_into().unwrap()))
}
None => return Ok(None),
},
};
match block_by_height.get(&height.0.to_be_bytes())? {
Some(bytes) => Ok(Some(Arc::<Block>::zcash_deserialize(bytes.as_ref())?)),
None => Ok(None),
}
match self.block_by_height.get(&height.0.to_be_bytes())? {
Some(bytes) => Ok(Some(Arc::<Block>::zcash_deserialize(bytes.as_ref())?)),
None => Ok(None),
}
}
@ -323,19 +305,3 @@ impl FinalizedState {
self.utxo_by_outpoint.zs_get(outpoint)
}
}
// Split into a helper function to be called synchronously or asynchronously.
fn read_tip(hash_by_height: &sled::Tree) -> Result<Option<(block::Height, block::Hash)>, BoxError> {
Ok(hash_by_height
.iter()
.rev()
.next()
.transpose()?
.map(|(height_bytes, hash_bytes)| {
let height = block::Height(u32::from_be_bytes(
height_bytes.as_ref().try_into().unwrap(),
));
let hash = block::Hash(hash_bytes.as_ref().try_into().unwrap());
(height, hash)
}))
}