2020-09-09 17:13:58 -07:00
|
|
|
use std::{
|
|
|
|
future::Future,
|
|
|
|
pin::Pin,
|
2020-09-09 23:07:47 -07:00
|
|
|
sync::Arc,
|
2020-09-09 17:13:58 -07:00
|
|
|
task::{Context, Poll},
|
2020-10-14 14:06:32 -07:00
|
|
|
time::Duration,
|
|
|
|
time::Instant,
|
2020-09-09 17:13:58 -07:00
|
|
|
};
|
2020-09-09 21:15:08 -07:00
|
|
|
|
|
|
|
use futures::future::{FutureExt, TryFutureExt};
|
2020-10-07 20:07:32 -07:00
|
|
|
use memory_state::{NonFinalizedState, QueuedBlocks};
|
2020-10-09 01:37:24 -07:00
|
|
|
use tokio::sync::broadcast;
|
2020-09-09 17:13:58 -07:00
|
|
|
use tower::{buffer::Buffer, util::BoxService, Service};
|
2020-10-07 20:07:32 -07:00
|
|
|
use tracing::instrument;
|
2020-09-09 23:07:47 -07:00
|
|
|
use zebra_chain::{
|
|
|
|
block::{self, Block},
|
|
|
|
parameters::Network,
|
|
|
|
};
|
2020-09-09 17:13:58 -07:00
|
|
|
|
2020-10-09 01:37:24 -07:00
|
|
|
use crate::{
|
|
|
|
BoxError, CloneError, CommitBlockError, Config, FinalizedState, Request, Response,
|
|
|
|
ValidateContextError,
|
|
|
|
};
|
2020-10-07 20:07:32 -07:00
|
|
|
|
|
|
|
mod memory_state;
|
2020-10-14 14:06:32 -07:00
|
|
|
mod utxo;
|
2020-09-09 17:13:58 -07:00
|
|
|
|
2020-09-09 23:07:47 -07:00
|
|
|
// todo: put this somewhere
|
2020-09-24 15:46:04 -07:00
|
|
|
#[derive(Debug)]
|
2020-09-09 23:07:47 -07:00
|
|
|
pub struct QueuedBlock {
|
|
|
|
pub block: Arc<Block>,
|
|
|
|
// TODO: add these parameters when we can compute anchors.
|
|
|
|
// sprout_anchor: sprout::tree::Root,
|
|
|
|
// sapling_anchor: sapling::tree::Root,
|
2020-10-09 01:37:24 -07:00
|
|
|
pub rsp_tx: broadcast::Sender<Result<block::Hash, CloneError>>,
|
2020-09-09 23:07:47 -07:00
|
|
|
}
|
|
|
|
|
2020-09-09 17:13:58 -07:00
|
|
|
struct StateService {
|
|
|
|
/// Holds data relating to finalized chain state.
|
2020-09-24 15:46:04 -07:00
|
|
|
sled: FinalizedState,
|
2020-09-09 17:13:58 -07:00
|
|
|
/// Holds data relating to non-finalized chain state.
|
2020-10-07 20:07:32 -07:00
|
|
|
mem: NonFinalizedState,
|
|
|
|
/// Blocks awaiting their parent blocks for contextual verification.
|
|
|
|
queued_blocks: QueuedBlocks,
|
2020-10-14 14:06:32 -07:00
|
|
|
/// The set of outpoints with pending requests for their associated transparent::Output
|
|
|
|
pending_utxos: utxo::PendingUtxos,
|
|
|
|
/// Instant tracking the last time `pending_utxos` was pruned
|
|
|
|
last_prune: Instant,
|
2020-10-07 20:07:32 -07:00
|
|
|
}
|
|
|
|
|
2020-09-09 17:13:58 -07:00
|
|
|
impl StateService {
|
2020-10-14 14:06:32 -07:00
|
|
|
const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
|
|
|
|
|
2020-09-09 17:13:58 -07:00
|
|
|
pub fn new(config: Config, network: Network) -> Self {
|
2020-09-24 15:46:04 -07:00
|
|
|
let sled = FinalizedState::new(&config, network);
|
2020-10-07 20:07:32 -07:00
|
|
|
let mem = NonFinalizedState::default();
|
|
|
|
let queued_blocks = QueuedBlocks::default();
|
2020-10-14 14:06:32 -07:00
|
|
|
let pending_utxos = utxo::PendingUtxos::default();
|
2020-10-07 20:07:32 -07:00
|
|
|
|
|
|
|
Self {
|
|
|
|
sled,
|
|
|
|
mem,
|
|
|
|
queued_blocks,
|
2020-10-14 14:06:32 -07:00
|
|
|
pending_utxos,
|
|
|
|
last_prune: Instant::now(),
|
2020-10-07 20:07:32 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Queue a non finalized block for verification and check if any queued
|
|
|
|
/// blocks are ready to be verified and committed to the state.
|
|
|
|
///
|
|
|
|
/// This function encodes the logic for [committing non-finalized blocks][1]
|
|
|
|
/// in RFC0005.
|
|
|
|
///
|
|
|
|
/// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks
|
|
|
|
#[instrument(skip(self, new))]
|
|
|
|
fn queue_and_commit_non_finalized_blocks(&mut self, new: QueuedBlock) {
|
|
|
|
let parent_hash = new.block.header.previous_block_hash;
|
|
|
|
|
|
|
|
self.queued_blocks.queue(new);
|
|
|
|
|
|
|
|
if !self.can_fork_chain_at(&parent_hash) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.process_queued(parent_hash);
|
|
|
|
|
|
|
|
while self.mem.best_chain_len() > crate::constants::MAX_BLOCK_REORG_HEIGHT {
|
|
|
|
let finalized = self.mem.finalize();
|
|
|
|
self.sled
|
|
|
|
.commit_finalized_direct(finalized)
|
|
|
|
.expect("expected that sled errors would not occur");
|
|
|
|
}
|
|
|
|
|
|
|
|
self.queued_blocks
|
|
|
|
.prune_by_height(self.sled.finalized_tip_height().expect(
|
|
|
|
"Finalized state must have at least one block before committing non-finalized state",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Run contextual validation on `block` and add it to the non-finalized
|
|
|
|
/// state if it is contextually valid.
|
2020-10-09 01:37:24 -07:00
|
|
|
fn validate_and_commit(&mut self, block: Arc<Block>) -> Result<(), CommitBlockError> {
|
2020-10-07 20:07:32 -07:00
|
|
|
self.check_contextual_validity(&block)?;
|
|
|
|
let parent_hash = block.header.previous_block_hash;
|
|
|
|
|
|
|
|
if self.sled.finalized_tip_hash() == parent_hash {
|
|
|
|
self.mem.commit_new_chain(block);
|
|
|
|
} else {
|
|
|
|
self.mem.commit_block(block);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
|
|
|
|
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
|
|
|
|
self.mem.any_chain_contains(hash) || &self.sled.finalized_tip_hash() == hash
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Attempt to validate and commit all queued blocks whose parents have
|
|
|
|
/// recently arrived starting from `new_parent`, in breadth-first ordering.
|
|
|
|
#[instrument(skip(self))]
|
|
|
|
fn process_queued(&mut self, new_parent: block::Hash) {
|
|
|
|
let mut new_parents = vec![new_parent];
|
|
|
|
|
|
|
|
while let Some(parent) = new_parents.pop() {
|
|
|
|
let queued_children = self.queued_blocks.dequeue_children(parent);
|
|
|
|
|
|
|
|
for QueuedBlock { block, rsp_tx } in queued_children {
|
|
|
|
let hash = block.hash();
|
|
|
|
let result = self
|
|
|
|
.validate_and_commit(block)
|
|
|
|
.map(|()| hash)
|
2020-10-09 01:37:24 -07:00
|
|
|
.map_err(CloneError::from);
|
2020-10-07 20:07:32 -07:00
|
|
|
let _ = rsp_tx.send(result);
|
|
|
|
new_parents.push(hash);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Check that `block` is contextually valid based on the committed finalized
|
|
|
|
/// and non-finalized state.
|
|
|
|
fn check_contextual_validity(&mut self, block: &Block) -> Result<(), ValidateContextError> {
|
|
|
|
use ValidateContextError::*;
|
|
|
|
|
|
|
|
if block
|
|
|
|
.coinbase_height()
|
|
|
|
.expect("valid blocks have a coinbase height")
|
|
|
|
<= self.sled.finalized_tip_height().expect(
|
|
|
|
"finalized state must contain at least one block to use the non-finalized state",
|
|
|
|
)
|
|
|
|
{
|
|
|
|
Err(OrphanedBlock)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: contextual validation design and implementation
|
|
|
|
Ok(())
|
2020-09-09 17:13:58 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Service<Request> for StateService {
|
|
|
|
type Response = Response;
|
|
|
|
type Error = BoxError;
|
|
|
|
type Future =
|
|
|
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
|
|
|
|
|
|
|
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
2020-10-14 14:06:32 -07:00
|
|
|
let now = Instant::now();
|
|
|
|
|
|
|
|
if self.last_prune + Self::PRUNE_INTERVAL < now {
|
|
|
|
self.pending_utxos.prune();
|
|
|
|
self.last_prune = now;
|
|
|
|
}
|
|
|
|
|
2020-09-09 17:13:58 -07:00
|
|
|
Poll::Ready(Ok(()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, req: Request) -> Self::Future {
|
|
|
|
match req {
|
2020-10-07 20:07:32 -07:00
|
|
|
Request::CommitBlock { block } => {
|
2020-10-09 01:37:24 -07:00
|
|
|
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
|
2020-10-07 20:07:32 -07:00
|
|
|
|
2020-10-14 14:06:32 -07:00
|
|
|
self.pending_utxos.check_block(&block);
|
2020-10-07 20:07:32 -07:00
|
|
|
self.queue_and_commit_non_finalized_blocks(QueuedBlock { block, rsp_tx });
|
|
|
|
|
|
|
|
async move {
|
|
|
|
rsp_rx
|
2020-10-09 01:37:24 -07:00
|
|
|
.recv()
|
2020-10-07 20:07:32 -07:00
|
|
|
.await
|
2020-10-09 01:37:24 -07:00
|
|
|
.expect("sender is not dropped")
|
2020-10-07 20:07:32 -07:00
|
|
|
.map(Response::Committed)
|
2020-10-09 01:37:24 -07:00
|
|
|
.map_err(Into::into)
|
2020-10-07 20:07:32 -07:00
|
|
|
}
|
|
|
|
.boxed()
|
|
|
|
}
|
2020-09-09 21:15:08 -07:00
|
|
|
Request::CommitFinalizedBlock { block } => {
|
2020-10-09 01:37:24 -07:00
|
|
|
let (rsp_tx, mut rsp_rx) = broadcast::channel(1);
|
2020-09-09 23:07:47 -07:00
|
|
|
|
2020-10-14 14:06:32 -07:00
|
|
|
self.pending_utxos.check_block(&block);
|
2020-10-07 20:07:32 -07:00
|
|
|
self.sled
|
|
|
|
.queue_and_commit_finalized_blocks(QueuedBlock { block, rsp_tx });
|
2020-09-09 21:15:08 -07:00
|
|
|
|
2020-09-09 23:07:47 -07:00
|
|
|
async move {
|
|
|
|
rsp_rx
|
2020-10-09 01:37:24 -07:00
|
|
|
.recv()
|
2020-09-09 23:07:47 -07:00
|
|
|
.await
|
2020-10-09 01:37:24 -07:00
|
|
|
.expect("sender is not dropped")
|
2020-09-10 10:52:51 -07:00
|
|
|
.map(Response::Committed)
|
2020-10-09 01:37:24 -07:00
|
|
|
.map_err(Into::into)
|
2020-09-09 23:07:47 -07:00
|
|
|
}
|
|
|
|
.boxed()
|
2020-09-09 21:15:08 -07:00
|
|
|
}
|
|
|
|
Request::Depth(hash) => {
|
|
|
|
// todo: handle in memory and sled
|
2020-09-10 10:52:51 -07:00
|
|
|
self.sled.depth(hash).map_ok(Response::Depth).boxed()
|
2020-09-09 21:15:08 -07:00
|
|
|
}
|
|
|
|
Request::Tip => {
|
|
|
|
// todo: handle in memory and sled
|
2020-09-10 10:52:51 -07:00
|
|
|
self.sled.tip().map_ok(Response::Tip).boxed()
|
2020-09-09 21:15:08 -07:00
|
|
|
}
|
|
|
|
Request::BlockLocator => {
|
|
|
|
// todo: handle in memory and sled
|
|
|
|
self.sled
|
|
|
|
.block_locator()
|
2020-09-10 10:52:51 -07:00
|
|
|
.map_ok(Response::BlockLocator)
|
2020-09-09 21:15:08 -07:00
|
|
|
.boxed()
|
|
|
|
}
|
2020-09-10 10:25:39 -07:00
|
|
|
Request::Transaction(_) => unimplemented!(),
|
2020-09-10 10:19:45 -07:00
|
|
|
Request::Block(hash_or_height) => {
|
|
|
|
//todo: handle in memory and sled
|
|
|
|
self.sled
|
|
|
|
.block(hash_or_height)
|
2020-09-10 10:52:51 -07:00
|
|
|
.map_ok(Response::Block)
|
2020-09-10 10:19:45 -07:00
|
|
|
.boxed()
|
|
|
|
}
|
2020-10-14 14:06:32 -07:00
|
|
|
Request::AwaitUtxo(outpoint) => {
|
|
|
|
let fut = self.pending_utxos.queue(outpoint);
|
|
|
|
|
|
|
|
if let Some(finalized_utxo) = self.sled.utxo(&outpoint).unwrap() {
|
|
|
|
self.pending_utxos.respond(outpoint, finalized_utxo);
|
|
|
|
} else if let Some(non_finalized_utxo) = self.mem.utxo(&outpoint) {
|
|
|
|
self.pending_utxos.respond(outpoint, non_finalized_utxo);
|
|
|
|
}
|
|
|
|
|
|
|
|
fut.boxed()
|
|
|
|
}
|
2020-09-09 17:13:58 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-09 17:51:08 -07:00
|
|
|
/// Initialize a state service from the provided [`Config`].
|
2020-09-09 17:13:58 -07:00
|
|
|
///
|
|
|
|
/// Each `network` has its own separate sled database.
|
|
|
|
///
|
|
|
|
/// The resulting service is clonable, to provide shared access to a common chain
|
|
|
|
/// state. It's possible to construct multiple state services in the same
|
|
|
|
/// application (as long as they, e.g., use different storage locations), but
|
|
|
|
/// doing so is probably not what you want.
|
|
|
|
pub fn init(
|
|
|
|
config: Config,
|
|
|
|
network: Network,
|
|
|
|
) -> Buffer<BoxService<Request, Response, BoxError>, Request> {
|
2020-09-19 23:58:41 -07:00
|
|
|
Buffer::new(BoxService::new(StateService::new(config, network)), 3)
|
2020-09-09 17:13:58 -07:00
|
|
|
}
|