Stop panicking when using sync and async methods on the same ChainTipChange (#2800)

* Instrument chain tip methods

* Expand tests to cover last_tip_change and multiple change checks

* Expand tests to cover Grow as well as Reset

* Support sync and async methods on the same ChainTipChange

* Add a Tokio 1.0 TODO

* Clarify a comment

* Manual rustfmt inside a proptest

* Remove tracing clones, and instrument ChainTipSender::new

* Add the tokio issue number to a TODO comment
This commit is contained in:
teor 2021-09-28 20:48:19 +10:00 committed by GitHub
parent a0d45c38f3
commit 1601c9fbb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 334 additions and 81 deletions

View File

@ -8,6 +8,7 @@
use std::sync::Arc;
use tokio::sync::watch;
use tracing::instrument;
use zebra_chain::{
block,
@ -97,7 +98,7 @@ pub struct ChainTipSender {
///
/// Once this flag is set, we ignore the finalized state.
/// `None` tips don't set this flag.
non_finalized_tip: bool,
use_non_finalized_tip: bool,
/// The sender channel for chain tip data.
sender: watch::Sender<ChainTipData>,
@ -110,14 +111,18 @@ pub struct ChainTipSender {
impl ChainTipSender {
/// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`],
/// using an `initial_tip` and a [`Network`].
#[instrument(skip(initial_tip), fields(new_height, new_hash))]
pub fn new(
initial_tip: impl Into<Option<ChainTipBlock>>,
network: Network,
) -> (Self, LatestChainTip, ChainTipChange) {
let initial_tip = initial_tip.into();
ChainTipSender::record_new_tip(&initial_tip);
let (sender, receiver) = watch::channel(None);
let mut sender = ChainTipSender {
non_finalized_tip: false,
use_non_finalized_tip: false,
sender,
active_value: None,
};
@ -133,8 +138,20 @@ impl ChainTipSender {
/// Update the latest finalized tip.
///
/// May trigger an update to the best tip.
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>>) {
if !self.non_finalized_tip {
#[instrument(
skip(self, new_tip),
fields(
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
old_height = ?self.active_value.as_ref().map(|block| block.height),
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
new_height,
new_hash,
))]
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
let new_tip = new_tip.into();
ChainTipSender::record_new_tip(&new_tip);
if !self.use_non_finalized_tip {
self.update(new_tip);
}
}
@ -142,13 +159,26 @@ impl ChainTipSender {
/// Update the latest non-finalized tip.
///
/// May trigger an update to the best tip.
pub fn set_best_non_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>>) {
#[instrument(
skip(self, new_tip),
fields(
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
old_height = ?self.active_value.as_ref().map(|block| block.height),
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
new_height,
new_hash,
))]
pub fn set_best_non_finalized_tip(
&mut self,
new_tip: impl Into<Option<ChainTipBlock>> + Clone,
) {
let new_tip = new_tip.into();
ChainTipSender::record_new_tip(&new_tip);
// once the non-finalized state becomes active, it is always populated
// but ignoring `None`s makes the tests easier
if new_tip.is_some() {
self.non_finalized_tip = true;
self.use_non_finalized_tip = true;
self.update(new_tip)
}
}
@ -157,9 +187,7 @@ impl ChainTipSender {
///
/// An update is only sent if the current best tip is different from the last best tip
/// that was sent.
fn update(&mut self, new_tip: impl Into<Option<ChainTipBlock>>) {
let new_tip = new_tip.into();
fn update(&mut self, new_tip: Option<ChainTipBlock>) {
let needs_update = match (new_tip.as_ref(), self.active_value.as_ref()) {
// since the blocks have been contextually validated,
// we know their hashes cover all the block data
@ -173,6 +201,19 @@ impl ChainTipSender {
self.active_value = new_tip;
}
}
/// Record `new_tip` in the current span.
///
/// Callers should create a new span with empty `new_height` and `new_hash` fields.
fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
let span = tracing::Span::current();
let new_height = new_tip.as_ref().map(|block| block.height);
let new_hash = new_tip.as_ref().map(|block| block.hash);
span.record("new_height", &tracing::field::debug(new_height));
span.record("new_hash", &tracing::field::debug(new_hash));
}
}
/// Efficient access to the state's current best chain tip.
@ -205,11 +246,23 @@ impl LatestChainTip {
impl ChainTip for LatestChainTip {
/// Return the height of the best chain tip.
#[instrument(
skip(self),
fields(
height = ?self.receiver.borrow().as_ref().map(|block| block.height),
hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
))]
fn best_tip_height(&self) -> Option<block::Height> {
self.receiver.borrow().as_ref().map(|block| block.height)
}
/// Return the block hash of the best chain tip.
#[instrument(
skip(self),
fields(
height = ?self.receiver.borrow().as_ref().map(|block| block.height),
hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
))]
fn best_tip_hash(&self) -> Option<block::Hash> {
self.receiver.borrow().as_ref().map(|block| block.hash)
}
@ -218,6 +271,13 @@ impl ChainTip for LatestChainTip {
///
/// All transactions with these mined IDs should be rejected from the mempool,
/// even if their authorizing data is different.
#[instrument(
skip(self),
fields(
height = ?self.receiver.borrow().as_ref().map(|block| block.height),
hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
transaction_count = ?self.receiver.borrow().as_ref().map(|block| block.transaction_hashes.len()),
))]
fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> {
self.receiver
.borrow()
@ -310,6 +370,14 @@ impl ChainTipChange {
///
/// If a lot of blocks are committed at the same time,
/// the change will skip some blocks, and return a [`Reset`].
#[instrument(
skip(self),
fields(
current_height = ?self.receiver.borrow().as_ref().map(|block| block.height),
current_hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
last_change_hash = ?self.last_change_hash,
network = ?self.network,
))]
pub async fn wait_for_tip_change(&mut self) -> Result<TipAction, watch::error::RecvError> {
let block = self.tip_block_change().await?;
@ -325,6 +393,14 @@ impl ChainTipChange {
/// - `None` if there has been no change.
///
/// See [`wait_for_tip_change`] for details.
#[instrument(
skip(self),
fields(
current_height = ?self.receiver.borrow().as_ref().map(|block| block.height),
current_hash = ?self.receiver.borrow().as_ref().map(|block| block.hash),
last_change_hash = ?self.last_change_hash,
network = ?self.network,
))]
pub fn last_tip_change(&mut self) -> Option<TipAction> {
// Obtain the tip block.
let block = self.best_tip_block()?;
@ -346,7 +422,7 @@ impl ChainTipChange {
// check for an edge case that's dealt with by other code
assert!(
Some(block.hash) != self.last_change_hash,
"ChainTipSender ignores unchanged tips"
"ChainTipSender and ChainTipChange ignore unchanged tips"
);
// If the previous block hash doesn't match, reset.
@ -410,7 +486,17 @@ impl ChainTipChange {
// Wait until there is actually Some block,
// so we don't have `Option`s inside `TipAction`s.
if let Some(block) = self.best_tip_block() {
return Ok(block);
// Wait until we have a new block
//
// last_tip_change() updates last_change_hash, but it doesn't call receiver.changed().
// So code that uses both sync and async methods can have spurious pending changes.
//
// TODO: use `receiver.borrow_and_update()` in `best_tip_block()`,
// once we upgrade to tokio 1.0 (#2200)
// and remove this extra check
if Some(block.hash) != self.last_change_hash {
return Ok(block);
}
}
}
}
@ -462,4 +548,18 @@ impl TipAction {
hash: block.hash,
}
}
/// Converts this [`TipAction`] into a [`Reset`].
///
/// Designed for use in tests.
#[cfg(test)]
pub(crate) fn into_reset(self) -> Self {
match self {
Grow { block } => Reset {
height: block.height,
hash: block.hash,
},
reset @ Reset { .. } => reset,
}
}
}

View File

@ -1,4 +1,4 @@
use std::{env, sync::Arc};
use std::{collections::HashSet, env, sync::Arc};
use futures::FutureExt;
use proptest::prelude::*;
@ -8,12 +8,18 @@ use zebra_chain::{
block::Block,
chain_tip::ChainTip,
fmt::{DisplayToDebug, SummaryDebug},
parameters::Network,
parameters::{Network, NetworkUpgrade},
};
use crate::service::chain_tip::{ChainTipBlock, ChainTipSender, TipAction};
const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 4;
use TipChangeCheck::*;
/// The default number of proptest cases for these tests.
///
/// Currently, there are 24 different test case combinations,
/// and each test `Vec` has an average of 50 blocks.
const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 8;
proptest! {
#![proptest_config(
@ -27,7 +33,7 @@ proptest! {
/// or otherwise the finalized tip.
#[test]
fn best_tip_is_latest_non_finalized_then_latest_finalized(
tip_updates in any::<SummaryDebug<Vec<BlockUpdate>>>(),
tip_updates in any::<SummaryDebug<Vec<(BlockUpdate, BlockConnection, TipChangeCheck)>>>(),
network in any::<Network>(),
) {
let (mut chain_tip_sender, latest_chain_tip, mut chain_tip_change) = ChainTipSender::new(None, network);
@ -36,81 +42,215 @@ proptest! {
let mut latest_non_finalized_tip = None;
let mut seen_non_finalized_tip = false;
for update in tip_updates {
match update {
BlockUpdate::Finalized(block) => {
let chain_tip = block.clone().map(|block| ChainTipBlock::from(block.0));
chain_tip_sender.set_finalized_tip(chain_tip.clone());
if let Some(block) = block {
latest_finalized_tip = Some((chain_tip, block));
}
let mut pending_action = None;
let mut last_block_hash = None;
let mut chain_hashes = HashSet::new();
for (mut update, connection, tip_change_check) in tip_updates {
// prepare the update
if connection.is_grow() {
if let (Some(mut block), Some(last_block_hash)) = (update.block(), last_block_hash) {
Arc::make_mut(&mut block).header.previous_block_hash = last_block_hash;
*update.block_mut() = Some(block);
}
BlockUpdate::NonFinalized(block) => {
let chain_tip = block.clone().map(|block| ChainTipBlock::from(block.0));
chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone());
if let Some(block) = block {
latest_non_finalized_tip = Some((chain_tip, block));
seen_non_finalized_tip = true;
}
}
let block = update.block();
let chain_tip = block.clone().map(|block| ChainTipBlock::from(block.0));
if let Some(chain_tip) = chain_tip.clone() {
if chain_hashes.contains(&chain_tip.hash) {
// skip duplicate blocks - they are rejected by zebra-state
continue;
}
last_block_hash = Some(chain_tip.hash);
chain_hashes.insert(chain_tip.hash);
}
// do the update
if update.is_finalized() {
chain_tip_sender.set_finalized_tip(chain_tip.clone());
if let Some(block) = block {
latest_finalized_tip = Some((chain_tip.unwrap(), block));
}
} else {
chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone());
if let Some(block) = block {
latest_non_finalized_tip = Some((chain_tip.unwrap(), block));
seen_non_finalized_tip = true;
}
}
// check the results
let expected_tip = if seen_non_finalized_tip {
latest_non_finalized_tip.clone()
} else {
latest_finalized_tip.clone()
};
let chain_tip_height = expected_tip
.as_ref()
.map(|(chain_tip, _block)| chain_tip.height);
let expected_height = expected_tip.as_ref().and_then(|(_chain_tip, block)| block.coinbase_height());
prop_assert_eq!(latest_chain_tip.best_tip_height(), chain_tip_height);
prop_assert_eq!(latest_chain_tip.best_tip_height(), expected_height);
let chain_tip_hash = expected_tip
.as_ref()
.map(|(chain_tip, _block)| chain_tip.hash);
let expected_hash = expected_tip.as_ref().map(|(_chain_tip, block)| block.hash());
prop_assert_eq!(latest_chain_tip.best_tip_hash(), chain_tip_hash);
prop_assert_eq!(latest_chain_tip.best_tip_hash(), expected_hash);
let chain_tip_transaction_ids = expected_tip
.as_ref()
.map(|(chain_tip, _block)| chain_tip.transaction_hashes.clone())
.unwrap_or_else(|| Arc::new([]));
let expected_transaction_ids = expected_tip
.as_ref()
.iter()
.flat_map(|(_chain_tip, block)| block.transactions.clone())
.map(|transaction| transaction.hash())
.collect();
prop_assert_eq!(
latest_chain_tip.best_tip_mined_transaction_ids(),
chain_tip_transaction_ids
);
prop_assert_eq!(
latest_chain_tip.best_tip_mined_transaction_ids(),
expected_transaction_ids
);
let old_last_change_hash = chain_tip_change.last_change_hash;
let new_action = expected_tip.and_then(|(chain_tip, block)| {
if Some(chain_tip.hash) == old_last_change_hash {
// some updates don't do anything, so there's no new action
None
} else if Some(chain_tip.previous_block_hash) != old_last_change_hash
|| NetworkUpgrade::is_activation_height(network, chain_tip.height)
{
Some(TipAction::reset_with(block.0.into()))
} else {
Some(TipAction::grow_with(block.0.into()))
}
});
let expected_action = match (pending_action.clone(), new_action.clone()) {
(Some(pending_action), Some(new_action)) if pending_action == new_action => Some(new_action),
(Some(_pending_action), Some(new_action)) => Some(new_action.into_reset()),
(None, new_action) => new_action,
(pending_action, None) => pending_action,
};
match tip_change_check {
WaitFor => {
// TODO: use `unconstrained` to avoid spurious cooperative multitasking waits
// (needs a recent tokio version)
// See:
// https://github.com/ZcashFoundation/zebra/pull/2777#discussion_r712488817
// https://docs.rs/tokio/1.11.0/tokio/task/index.html#cooperative-scheduling
// https://tokio.rs/blog/2020-04-preemption
prop_assert_eq!(
chain_tip_change
.wait_for_tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped"),
expected_action,
"\n\
unexpected wait_for_tip_change TipAction\n\
new_action: {:?}\n\
pending_action: {:?}\n\
old last_change_hash: {:?}\n\
new last_change_hash: {:?}",
new_action,
pending_action,
old_last_change_hash,
chain_tip_change.last_change_hash
);
pending_action = None;
}
Last => {
prop_assert_eq!(
chain_tip_change.last_tip_change(),
expected_action,
"\n\
unexpected last_tip_change TipAction\n\
new_action: {:?}\n\
pending_action: {:?}\n\
old last_change_hash: {:?}\n\
new last_change_hash: {:?}",
new_action,
pending_action,
old_last_change_hash,
chain_tip_change.last_change_hash
);
pending_action = None;
}
Skip => {
pending_action = expected_action;
}
}
}
let expected_tip = if seen_non_finalized_tip {
latest_non_finalized_tip
} else {
latest_finalized_tip
};
let chain_tip_height = expected_tip
.as_ref()
.and_then(|(chain_tip, _block)| chain_tip.as_ref())
.map(|chain_tip| chain_tip.height);
let expected_height = expected_tip.as_ref().and_then(|(_chain_tip, block)| block.coinbase_height());
prop_assert_eq!(latest_chain_tip.best_tip_height(), chain_tip_height);
prop_assert_eq!(latest_chain_tip.best_tip_height(), expected_height);
let chain_tip_hash = expected_tip
.as_ref()
.and_then(|(chain_tip, _block)| chain_tip.as_ref())
.map(|chain_tip| chain_tip.hash);
let expected_hash = expected_tip.as_ref().map(|(_chain_tip, block)| block.hash());
prop_assert_eq!(latest_chain_tip.best_tip_hash(), chain_tip_hash);
prop_assert_eq!(latest_chain_tip.best_tip_hash(), expected_hash);
let chain_tip_transaction_ids = expected_tip
.as_ref()
.and_then(|(chain_tip, _block)| chain_tip.as_ref())
.map(|chain_tip| chain_tip.transaction_hashes.clone())
.unwrap_or_else(|| Arc::new([]));
let expected_transaction_ids = expected_tip
.as_ref()
.iter()
.flat_map(|(_chain_tip, block)| block.transactions.clone())
.map(|transaction| transaction.hash())
.collect();
prop_assert_eq!(
latest_chain_tip.best_tip_mined_transaction_ids(),
chain_tip_transaction_ids
);
prop_assert_eq!(
latest_chain_tip.best_tip_mined_transaction_ids(),
expected_transaction_ids
);
prop_assert_eq!(
chain_tip_change
.wait_for_tip_change()
.now_or_never()
.transpose()
.expect("watch sender is not dropped"),
expected_tip.map(|(_chain_tip, block)| TipAction::reset_with(block.0.into()))
);
}
}
/// Block update test cases for [`ChainTipSender`]
#[derive(Arbitrary, Clone, Debug)]
enum BlockUpdate {
Finalized(Option<DisplayToDebug<Arc<Block>>>),
NonFinalized(Option<DisplayToDebug<Arc<Block>>>),
}
impl BlockUpdate {
/// Returns the inner block, regardless of variant.
pub fn block(&self) -> Option<DisplayToDebug<Arc<Block>>> {
match self {
BlockUpdate::Finalized(block) => block.clone(),
BlockUpdate::NonFinalized(block) => block.clone(),
}
}
/// Returns a mutable reference to the inner block, regardless of variant.
pub fn block_mut(&mut self) -> &mut Option<DisplayToDebug<Arc<Block>>> {
match self {
BlockUpdate::Finalized(block) => block,
BlockUpdate::NonFinalized(block) => block,
}
}
/// Is it finalized?
pub fn is_finalized(&self) -> bool {
matches!(self, BlockUpdate::Finalized(_))
}
}
/// Block update test case variants for [`ChainTipChange`]
#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq)]
enum BlockConnection {
Reset,
Grow,
}
impl BlockConnection {
/// Is this a grow?
pub fn is_grow(&self) -> bool {
*self == BlockConnection::Grow
}
}
/// Block update checks for [`ChainTipChange`]
#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq)]
enum TipChangeCheck {
/// Check that `wait_for_tip_change` returns the correct result
WaitFor,
/// Check that `last_tip_change` returns the correct result
Last,
/// Don't check this case (causes a `TipAction::Reset` in the next check)
Skip,
}

View File

@ -39,6 +39,13 @@ fn chain_tip_change_is_initially_not_ready() {
let (_chain_tip_sender, _latest_chain_tip, mut chain_tip_change) =
ChainTipSender::new(None, Mainnet);
// TODO: use `tokio::task::unconstrained` to avoid spurious waits from tokio's cooperative multitasking
// (needs a recent tokio version)
// See:
// https://github.com/ZcashFoundation/zebra/pull/2777#discussion_r712488817
// https://docs.rs/tokio/1.11.0/tokio/task/index.html#cooperative-scheduling
// https://tokio.rs/blog/2020-04-preemption
let first = chain_tip_change
.wait_for_tip_change()
.now_or_never()
@ -47,6 +54,8 @@ fn chain_tip_change_is_initially_not_ready() {
assert_eq!(first, None);
assert_eq!(chain_tip_change.last_tip_change(), None);
// try again, just to be sure
let first = chain_tip_change
.wait_for_tip_change()
@ -56,6 +65,8 @@ fn chain_tip_change_is_initially_not_ready() {
assert_eq!(first, None);
assert_eq!(chain_tip_change.last_tip_change(), None);
// also test our manual `Clone` impl
#[allow(clippy::redundant_clone)]
let first_clone = chain_tip_change
@ -66,4 +77,6 @@ fn chain_tip_change_is_initially_not_ready() {
.expect("watch sender is not dropped");
assert_eq!(first_clone, None);
assert_eq!(chain_tip_change.last_tip_change(), None);
}