Implement best tip block hashes for ChainTip receivers (#2677)

* Always prefer the non-finalized tip in ChainTipSender

This significantly simplifies the internal implementation of ChainTipSender.

Also make the methods and types a bit more generic.

* Update ChainTipSender with blocks, not heights

Also fix a bug where queued non-finalized blocks would clear the chain tip.

* Provide a best tip hash in ChainTip receivers

* Skip finalized blocks once the non-finalized state is active

* Add tip hash and NoChainTip tests

* Remove a redundant finalized tip update

* Skip `None` updates to the finalized tip

The finalized and non-finalized tips never update to `None`
once they have added at least one block.

* Stop committing finalized queued blocks if there is an error

Also return the highest committed queued block.

Co-authored-by: Janito Vaqueiro Ferreira Filho <janito.vff@gmail.com>
This commit is contained in:
teor 2021-08-28 05:18:47 +10:00 committed by GitHub
parent 6a9a8dfc38
commit f9c90b3d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 88 deletions

View File

@ -10,6 +10,9 @@ use crate::block;
pub trait ChainTip { pub trait ChainTip {
/// Return the height of the best chain tip. /// Return the height of the best chain tip.
fn best_tip_height(&self) -> Option<block::Height>; fn best_tip_height(&self) -> Option<block::Height>;
/// Return the block hash of the best chain tip.
fn best_tip_hash(&self) -> Option<block::Hash>;
} }
/// A chain tip that is always empty. /// A chain tip that is always empty.
@ -20,4 +23,8 @@ impl ChainTip for NoChainTip {
fn best_tip_height(&self) -> Option<block::Height> { fn best_tip_height(&self) -> Option<block::Height> {
None None
} }
fn best_tip_hash(&self) -> Option<block::Hash> {
None
}
} }

View File

@ -76,12 +76,8 @@ impl StateService {
const PRUNE_INTERVAL: Duration = Duration::from_secs(30); const PRUNE_INTERVAL: Duration = Duration::from_secs(30);
pub fn new(config: Config, network: Network) -> (Self, ChainTipReceiver) { pub fn new(config: Config, network: Network) -> (Self, ChainTipReceiver) {
let (mut chain_tip_sender, chain_tip_receiver) = ChainTipSender::new();
let disk = FinalizedState::new(&config, network); let disk = FinalizedState::new(&config, network);
let (chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(disk.tip_block());
if let Some(finalized_height) = disk.finalized_tip_height() {
chain_tip_sender.set_finalized_height(finalized_height);
}
let mem = NonFinalizedState::new(network); let mem = NonFinalizedState::new(network);
let queued_blocks = QueuedBlocks::default(); let queued_blocks = QueuedBlocks::default();
@ -132,10 +128,10 @@ impl StateService {
let (rsp_tx, rsp_rx) = oneshot::channel(); let (rsp_tx, rsp_rx) = oneshot::channel();
self.disk.queue_and_commit_finalized((finalized, rsp_tx)); self.disk.queue_and_commit_finalized((finalized, rsp_tx));
// TODO: move into the finalized state,
if let Some(finalized_height) = self.disk.finalized_tip_height() { // so we can clone committed `Arc<Block>`s before they get dropped
self.chain_tip_sender.set_finalized_height(finalized_height); self.chain_tip_sender
} .set_finalized_tip(self.disk.tip_block());
rsp_rx rsp_rx
} }
@ -197,14 +193,10 @@ impl StateService {
let finalized_tip_height = self.disk.finalized_tip_height().expect( let finalized_tip_height = self.disk.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state", "Finalized state must have at least one block before committing non-finalized state",
); );
let non_finalized_tip_height = self.mem.best_tip().map(|(height, _hash)| height);
self.queued_blocks.prune_by_height(finalized_tip_height); self.queued_blocks.prune_by_height(finalized_tip_height);
self.chain_tip_sender self.chain_tip_sender
.set_finalized_height(finalized_tip_height); .set_best_non_finalized_tip(self.mem.best_tip_block());
self.chain_tip_sender
.set_best_non_finalized_height(non_finalized_tip_height);
tracing::trace!("finished processing queued block"); tracing::trace!("finished processing queued block");
rsp_rx rsp_rx

View File

@ -1,66 +1,89 @@
use std::sync::Arc;
use tokio::sync::watch; use tokio::sync::watch;
use zebra_chain::{block, chain_tip::ChainTip}; use zebra_chain::{
block::{self, Block},
chain_tip::ChainTip,
};
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
/// The internal watch channel data type for [`ChainTipSender`] and [`ChainTipReceiver`].
type ChainTipData = Option<Arc<Block>>;
/// A sender for recent changes to the non-finalized and finalized chain tips. /// A sender for recent changes to the non-finalized and finalized chain tips.
#[derive(Debug)] #[derive(Debug)]
pub struct ChainTipSender { pub struct ChainTipSender {
finalized: Option<block::Height>, /// Have we got any chain tips from the non-finalized state?
non_finalized: Option<block::Height>, ///
sender: watch::Sender<Option<block::Height>>, /// Once this flag is set, we ignore the finalized state.
/// `None` tips don't set this flag.
non_finalized_tip: bool,
/// The sender channel for chain tip data.
sender: watch::Sender<ChainTipData>,
/// A copy of the data in `sender`.
// TODO: Replace with calls to `watch::Sender::borrow` once Tokio is updated to 1.0.0 (#2573) // TODO: Replace with calls to `watch::Sender::borrow` once Tokio is updated to 1.0.0 (#2573)
active_value: Option<block::Height>, active_value: ChainTipData,
} }
impl ChainTipSender { impl ChainTipSender {
/// Create new linked instances of [`ChainTipSender`] and [`ChainTipReceiver`]. /// Create new linked instances of [`ChainTipSender`] and [`ChainTipReceiver`],
pub fn new() -> (Self, ChainTipReceiver) { /// using `initial_tip` as the tip.
pub fn new(initial_tip: impl Into<Option<Arc<Block>>>) -> (Self, ChainTipReceiver) {
let (sender, receiver) = watch::channel(None); let (sender, receiver) = watch::channel(None);
let mut sender = ChainTipSender {
non_finalized_tip: false,
sender,
active_value: None,
};
let receiver = ChainTipReceiver::new(receiver);
( sender.update(initial_tip);
ChainTipSender {
finalized: None, (sender, receiver)
non_finalized: None,
sender,
active_value: None,
},
ChainTipReceiver::new(receiver),
)
} }
/// Update the current finalized block height. /// Update the current finalized tip.
/// ///
/// May trigger an update to best tip height. /// May trigger an update to the best tip.
pub fn set_finalized_height(&mut self, new_height: block::Height) { pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<Arc<Block>>>) {
if self.finalized != Some(new_height) { if !self.non_finalized_tip {
self.finalized = Some(new_height); self.update(new_tip);
self.update();
} }
} }
/// Update the current non-finalized block height. /// Update the current non-finalized tip.
/// ///
/// May trigger an update to the best tip height. /// May trigger an update to the best tip.
pub fn set_best_non_finalized_height(&mut self, new_height: Option<block::Height>) { pub fn set_best_non_finalized_tip(&mut self, new_tip: impl Into<Option<Arc<Block>>>) {
if self.non_finalized != new_height { let new_tip = new_tip.into();
self.non_finalized = new_height;
self.update(); // once the non-finalized state becomes active, it is always populated
// but ignoring `None`s makes the tests easier
if new_tip.is_some() {
self.non_finalized_tip = true;
self.update(new_tip)
} }
} }
/// Possibly send an update to listeners. /// Possibly send an update to listeners.
/// ///
/// An update is only sent if the current best tip height is different from the last best tip /// An update is only sent if the current best tip is different from the last best tip
/// height that was sent. /// that was sent.
fn update(&mut self) { fn update(&mut self, new_tip: impl Into<Option<Arc<Block>>>) {
let new_value = self.non_finalized.max(self.finalized); let new_tip = new_tip.into();
if new_value != self.active_value { if new_tip.is_none() {
let _ = self.sender.send(new_value); return;
self.active_value = new_value; }
if new_tip != self.active_value {
let _ = self.sender.send(new_tip.clone());
self.active_value = new_tip;
} }
} }
} }
@ -68,25 +91,35 @@ impl ChainTipSender {
/// A receiver for recent changes to the non-finalized and finalized chain tips. /// A receiver for recent changes to the non-finalized and finalized chain tips.
/// ///
/// The latest changes are available from all cloned instances of this type. /// The latest changes are available from all cloned instances of this type.
///
/// The chain tip data is based on:
/// * the best non-finalized chain tip, if available, or
/// * the finalized tip.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ChainTipReceiver { pub struct ChainTipReceiver {
receiver: watch::Receiver<Option<block::Height>>, receiver: watch::Receiver<ChainTipData>,
} }
impl ChainTipReceiver { impl ChainTipReceiver {
/// Create a new chain tip receiver from a watch channel receiver. /// Create a new chain tip receiver from a watch channel receiver.
fn new(receiver: watch::Receiver<Option<block::Height>>) -> Self { fn new(receiver: watch::Receiver<ChainTipData>) -> Self {
Self { receiver } Self { receiver }
} }
} }
impl ChainTip for ChainTipReceiver { impl ChainTip for ChainTipReceiver {
/// Return the height of the best chain tip. /// Return the height of the best chain tip.
///
/// The returned block height comes from:
/// * the best non-finalized chain tip, if available, or
/// * the finalized tip.
fn best_tip_height(&self) -> Option<block::Height> { fn best_tip_height(&self) -> Option<block::Height> {
*self.receiver.borrow() self.receiver
.borrow()
.as_ref()
.and_then(|block| block.coinbase_height())
}
/// Return the block hash of the best chain tip.
fn best_tip_hash(&self) -> Option<block::Hash> {
// TODO: get the hash from the state and store it in the sender,
// so we don't have to recalculate it every time
self.receiver.borrow().as_ref().map(|block| block.hash())
} }
} }

View File

@ -1,47 +1,68 @@
use std::{env, sync::Arc};
use proptest::prelude::*; use proptest::prelude::*;
use proptest_derive::Arbitrary; use proptest_derive::Arbitrary;
use zebra_chain::{block, chain_tip::ChainTip}; use zebra_chain::{block::Block, chain_tip::ChainTip};
use super::super::ChainTipSender; use super::super::ChainTipSender;
const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 4;
proptest! { proptest! {
#![proptest_config(
proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_BLOCK_VEC_PROPTEST_CASES))
)]
/// Check that the best tip uses the non-finalized tip if available,
/// or otherwise the finalized tip.
#[test] #[test]
fn best_tip_is_highest_of_latest_finalized_and_non_finalized_heights( fn best_tip_is_latest_non_finalized_then_latest_finalized(
height_updates in any::<Vec<HeightUpdate>>(), tip_updates in any::<Vec<BlockUpdate>>(),
) { ) {
let (mut chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(); let (mut chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(None);
let mut latest_finalized_height = None; let mut latest_finalized_tip = None;
let mut latest_non_finalized_height = None; let mut latest_non_finalized_tip = None;
let mut seen_non_finalized_tip = false;
for update in height_updates { for update in tip_updates {
match update { match update {
HeightUpdate::Finalized(height) => { BlockUpdate::Finalized(block) => {
chain_tip_sender.set_finalized_height(height); chain_tip_sender.set_finalized_tip(block.clone());
latest_finalized_height = Some(height); if block.is_some() {
latest_finalized_tip = block;
}
} }
HeightUpdate::NonFinalized(height) => { BlockUpdate::NonFinalized(block) => {
chain_tip_sender.set_best_non_finalized_height(height); chain_tip_sender.set_best_non_finalized_tip(block.clone());
latest_non_finalized_height = height; if block.is_some() {
latest_non_finalized_tip = block;
seen_non_finalized_tip = true;
}
} }
} }
} }
let expected_height = match (latest_finalized_height, latest_non_finalized_height) { let expected_tip = if seen_non_finalized_tip {
(Some(finalized_height), Some(non_finalized_height)) => { latest_non_finalized_tip
Some(finalized_height.max(non_finalized_height)) } else {
} latest_finalized_tip
(finalized_height, None) => finalized_height,
(None, non_finalized_height) => non_finalized_height,
}; };
let expected_height = expected_tip.as_ref().and_then(|block| block.coinbase_height());
prop_assert_eq!(chain_tip_receiver.best_tip_height(), expected_height); prop_assert_eq!(chain_tip_receiver.best_tip_height(), expected_height);
let expected_hash = expected_tip.as_ref().map(|block| block.hash());
prop_assert_eq!(chain_tip_receiver.best_tip_hash(), expected_hash);
} }
} }
#[derive(Arbitrary, Clone, Copy, Debug)] #[derive(Arbitrary, Clone, Debug)]
enum HeightUpdate { enum BlockUpdate {
Finalized(block::Height), Finalized(Option<Arc<Block>>),
NonFinalized(Option<block::Height>), NonFinalized(Option<Arc<Block>>),
} }

View File

@ -1,10 +1,19 @@
use zebra_chain::chain_tip::ChainTip; use zebra_chain::chain_tip::{ChainTip, NoChainTip};
use super::super::ChainTipSender; use super::super::ChainTipSender;
#[test] #[test]
fn best_tip_height_is_initially_empty() { fn best_tip_is_initially_empty() {
let (_chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(); let (_chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(None);
assert_eq!(chain_tip_receiver.best_tip_height(), None); assert_eq!(chain_tip_receiver.best_tip_height(), None);
assert_eq!(chain_tip_receiver.best_tip_hash(), None);
}
#[test]
fn empty_chain_tip_is_empty() {
let chain_tip_receiver = NoChainTip;
assert_eq!(chain_tip_receiver.best_tip_height(), None);
assert_eq!(chain_tip_receiver.best_tip_hash(), None);
} }

View File

@ -146,15 +146,27 @@ impl FinalizedState {
/// ///
/// After queueing a finalized block, this method checks whether the newly /// After queueing a finalized block, this method checks whether the newly
/// queued block (and any of its descendants) can be committed to the state. /// queued block (and any of its descendants) can be committed to the state.
pub fn queue_and_commit_finalized(&mut self, queued: QueuedFinalized) { ///
/// Returns the highest finalized tip block committed from the queue,
/// or `None` if no blocks were committed in this call.
/// (Use [`tip_block`] to get the finalized tip, regardless of when it was committed.)
pub fn queue_and_commit_finalized(
&mut self,
queued: QueuedFinalized,
) -> Option<FinalizedBlock> {
let mut highest_queue_commit = None;
let prev_hash = queued.0.block.header.previous_block_hash; let prev_hash = queued.0.block.header.previous_block_hash;
let height = queued.0.height; let height = queued.0.height;
self.queued_by_prev_hash.insert(prev_hash, queued); self.queued_by_prev_hash.insert(prev_hash, queued);
while let Some(queued_block) = self.queued_by_prev_hash.remove(&self.finalized_tip_hash()) { while let Some(queued_block) = self.queued_by_prev_hash.remove(&self.finalized_tip_hash()) {
self.commit_finalized(queued_block); if let Ok(finalized) = self.commit_finalized(queued_block) {
metrics::counter!("state.finalized.committed.block.count", 1); highest_queue_commit = Some(finalized);
metrics::gauge!("state.finalized.committed.block.height", height.0 as _); } else {
// the last block in the queue failed, so we can't commit the next block
break;
}
} }
if self.queued_by_prev_hash.is_empty() { if self.queued_by_prev_hash.is_empty() {
@ -173,6 +185,8 @@ impl FinalizedState {
"state.finalized.queued.block.count", "state.finalized.queued.block.count",
self.queued_by_prev_hash.len() as f64 self.queued_by_prev_hash.len() as f64
); );
highest_queue_commit
} }
/// Returns the hash of the current finalized tip block. /// Returns the hash of the current finalized tip block.
@ -453,10 +467,32 @@ impl FinalizedState {
/// order. This function is called by [`queue`], which ensures order. /// order. This function is called by [`queue`], which ensures order.
/// It is intentionally not exposed as part of the public API of the /// It is intentionally not exposed as part of the public API of the
/// [`FinalizedState`]. /// [`FinalizedState`].
fn commit_finalized(&mut self, queued_block: QueuedFinalized) { fn commit_finalized(&mut self, queued_block: QueuedFinalized) -> Result<FinalizedBlock, ()> {
let (finalized, rsp_tx) = queued_block; let (finalized, rsp_tx) = queued_block;
let result = self.commit_finalized_direct(finalized, "CommitFinalized request"); let result = self.commit_finalized_direct(finalized.clone(), "CommitFinalized request");
let block_result;
if result.is_ok() {
metrics::counter!("state.finalized.committed.block.count", 1);
metrics::gauge!(
"state.finalized.committed.block.height",
finalized.height.0 as _
);
block_result = Ok(finalized);
} else {
metrics::counter!("state.finalized.error.block.count", 1);
metrics::gauge!(
"state.finalized.error.block.height",
finalized.height.0 as _
);
block_result = Err(());
}
let _ = rsp_tx.send(result.map_err(Into::into)); let _ = rsp_tx.send(result.map_err(Into::into));
block_result
} }
/// Returns the tip height and hash if there is one. /// Returns the tip height and hash if there is one.
@ -473,6 +509,12 @@ impl FinalizedState {
}) })
} }
/// Returns the tip block, if there is one.
pub fn tip_block(&self) -> Option<Arc<Block>> {
let (height, _hash) = self.tip()?;
self.block(height.into())
}
/// Returns the height of the given block if it exists. /// Returns the height of the given block if it exists.
pub fn height(&self, hash: block::Hash) -> Option<block::Height> { pub fn height(&self, hash: block::Hash) -> Option<block::Height> {
let height_by_hash = self.db.cf_handle("height_by_hash").unwrap(); let height_by_hash = self.db.cf_handle("height_by_hash").unwrap();

View File

@ -318,6 +318,12 @@ impl NonFinalizedState {
Some((height, hash)) Some((height, hash))
} }
/// Returns the block at the tip of the best chain.
pub fn best_tip_block(&self) -> Option<Arc<Block>> {
let (height, _hash) = self.best_tip()?;
self.best_block(height.into())
}
/// Returns the height of `hash` in the best chain. /// Returns the height of `hash` in the best chain.
pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> { pub fn best_height_by_hash(&self, hash: block::Hash) -> Option<block::Height> {
let best_chain = self.best_chain()?; let best_chain = self.best_chain()?;