From b6fe8164732649305293734f3ecf62da60de1f26 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Sep 2021 08:31:16 +1000 Subject: [PATCH] Add a `ChainTipChange` type to `await` chain tip changes (#2715) * Rename ChainTipReceiver to CurrentChainTip `fastmod ChainTipReceiver CurrentChainTip zebra*` * Update chain tip documentation and variable names * Basic chain tip change implementation, without resets Also includes the following name changes: ``` fastmod CurrentChainTip LatestChainTip zebra* fastmod chain_tip_receiver latest_chain_tip zebra* ``` * Clarify the difference between `LatestChainTip` and `ChainTipChange` --- zebra-chain/src/chain_tip.rs | 2 +- zebra-network/src/isolated.rs | 2 +- zebra-network/src/peer/handshake.rs | 20 +- zebra-network/src/peer_set/initialize.rs | 6 +- zebra-state/src/arbitrary.rs | 28 ++- zebra-state/src/lib.rs | 2 +- zebra-state/src/service.rs | 27 ++- zebra-state/src/service/chain_tip.rs | 215 ++++++++++++++++-- .../src/service/chain_tip/tests/prop.rs | 31 ++- .../src/service/chain_tip/tests/vectors.rs | 55 ++++- zebra-state/src/service/tests.rs | 42 +++- zebra-state/src/tests/setup.rs | 2 +- zebra-state/tests/basic.rs | 2 +- zebrad/src/commands/start.rs | 5 +- 14 files changed, 362 insertions(+), 77 deletions(-) diff --git a/zebra-chain/src/chain_tip.rs b/zebra-chain/src/chain_tip.rs index 77e5d47d9..becf414a8 100644 --- a/zebra-chain/src/chain_tip.rs +++ b/zebra-chain/src/chain_tip.rs @@ -1,4 +1,4 @@ -//! Chain tip interfaces. +//! Zebra interfaces for access to chain tip information. use std::sync::Arc; diff --git a/zebra-network/src/isolated.rs b/zebra-network/src/isolated.rs index e3c1df981..0fb5d2a09 100644 --- a/zebra-network/src/isolated.rs +++ b/zebra-network/src/isolated.rs @@ -58,7 +58,7 @@ pub fn connect_isolated( Ok::>(Response::Nil) })) .with_user_agent(user_agent) - .with_chain_tip_receiver(NoChainTip) + .with_latest_chain_tip(NoChainTip) .finish() .expect("provided mandatory builder parameters"); diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 33a8f7691..ce8482936 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -57,7 +57,7 @@ pub struct Handshake { our_services: PeerServices, relay: bool, parent_span: Span, - chain_tip_receiver: C, + latest_chain_tip: C, } /// The peer address that we are handshaking with. @@ -307,7 +307,7 @@ pub struct Builder { user_agent: Option, relay: Option, inv_collector: Option>, - chain_tip_receiver: C, + latest_chain_tip: C, } impl Builder @@ -374,9 +374,9 @@ where /// constant over network upgrade activations. /// /// Use [`NoChainTip`] to explicitly provide no chain tip. - pub fn with_chain_tip_receiver(self, chain_tip_receiver: NewC) -> Builder { + pub fn with_latest_chain_tip(self, latest_chain_tip: NewC) -> Builder { Builder { - chain_tip_receiver, + latest_chain_tip, // TODO: Until Rust RFC 2528 reaches stable, we can't do `..self` config: self.config, inbound_service: self.inbound_service, @@ -429,7 +429,7 @@ where our_services, relay, parent_span: Span::current(), - chain_tip_receiver: self.chain_tip_receiver, + latest_chain_tip: self.latest_chain_tip, }) } } @@ -452,7 +452,7 @@ where our_services: None, relay: None, inv_collector: None, - chain_tip_receiver: NoChainTip, + latest_chain_tip: NoChainTip, } } } @@ -471,7 +471,7 @@ pub async fn negotiate_version( user_agent: String, our_services: PeerServices, relay: bool, - chain_tip_receiver: impl ChainTip, + latest_chain_tip: impl ChainTip, ) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> { // Create a random nonce for this connection let local_nonce = Nonce::default(); @@ -585,7 +585,7 @@ pub async fn negotiate_version( // SECURITY: Reject connections to peers on old versions, because they might not know about all // network upgrades and could lead to chain forks or slower block propagation. - let height = chain_tip_receiver.best_tip_height(); + let height = latest_chain_tip.best_tip_height(); let min_version = Version::min_remote_for_height(config.network, height); if remote_version < min_version { // Disconnect if peer is using an obsolete version. @@ -643,7 +643,7 @@ where let user_agent = self.user_agent.clone(); let our_services = self.our_services; let relay = self.relay; - let chain_tip_receiver = self.chain_tip_receiver.clone(); + let latest_chain_tip = self.latest_chain_tip.clone(); let fut = async move { debug!( @@ -674,7 +674,7 @@ where user_agent, our_services, relay, - chain_tip_receiver, + latest_chain_tip, ), ) .await??; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index ee8c06920..835d9dfa6 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -37,7 +37,7 @@ mod tests; type PeerChange = Result, BoxError>; /// Initialize a peer set, using a network `config`, `inbound_service`, -/// and `chain_tip_receiver`. +/// and `latest_chain_tip`. /// /// The peer set abstracts away peer management to provide a /// [`tower::Service`] representing "the network" that load-balances requests @@ -62,7 +62,7 @@ type PeerChange = Result, BoxError>; pub async fn init( config: Config, inbound_service: S, - chain_tip_receiver: C, + latest_chain_tip: C, ) -> ( Buffer, Request>, Arc>, @@ -92,7 +92,7 @@ where .with_timestamp_collector(timestamp_collector) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) - .with_chain_tip_receiver(chain_tip_receiver) + .with_latest_chain_tip(latest_chain_tip) .want_transactions(true) .finish() .expect("configured all required parameters"); diff --git a/zebra-state/src/arbitrary.rs b/zebra-state/src/arbitrary.rs index 38657aa54..d43e76f1d 100644 --- a/zebra-state/src/arbitrary.rs +++ b/zebra-state/src/arbitrary.rs @@ -8,7 +8,7 @@ use zebra_chain::{ value_balance::ValueBalance, }; -use crate::{request::ContextuallyValidBlock, PreparedBlock}; +use crate::{request::ContextuallyValidBlock, service::chain_tip::ChainTipBlock, PreparedBlock}; /// Mocks computation done during semantic validation pub trait Prepare { @@ -33,6 +33,32 @@ impl Prepare for Arc { } } +impl From for ChainTipBlock +where + T: Prepare, +{ + fn from(block: T) -> Self { + block.prepare().into() + } +} + +impl From for ChainTipBlock { + fn from(prepared: PreparedBlock) -> Self { + let PreparedBlock { + block: _, + hash, + height, + new_outputs: _, + transaction_hashes, + } = prepared; + Self { + hash, + height, + transaction_hashes, + } + } +} + impl PreparedBlock { /// Returns a [`ContextuallyValidBlock`] created from this block, /// with fake zero-valued spent UTXOs. diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index b6b712642..9e2da095f 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -35,7 +35,7 @@ pub use constants::MAX_BLOCK_REORG_HEIGHT; pub use error::{BoxError, CloneError, CommitBlockError, ValidateContextError}; pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, Request}; pub use response::Response; -pub use service::{chain_tip::ChainTipReceiver, init}; +pub use service::{chain_tip::LatestChainTip, init}; #[cfg(any(test, feature = "proptest-impl"))] pub use service::init_test; diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index a752e57bc..234192143 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -29,7 +29,7 @@ use crate::{ }; use self::{ - chain_tip::{ChainTipReceiver, ChainTipSender}, + chain_tip::{ChainTipChange, ChainTipSender, LatestChainTip}, non_finalized_state::{NonFinalizedState, QueuedBlocks}, }; @@ -76,13 +76,14 @@ pub(crate) struct StateService { impl StateService { const PRUNE_INTERVAL: Duration = Duration::from_secs(30); - pub fn new(config: Config, network: Network) -> (Self, ChainTipReceiver) { + pub fn new(config: Config, network: Network) -> (Self, LatestChainTip, ChainTipChange) { let disk = FinalizedState::new(&config, network); let initial_tip = disk .tip_block() .map(FinalizedBlock::from) .map(ChainTipBlock::from); - let (chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(initial_tip); + let (chain_tip_sender, latest_chain_tip, chain_tip_change) = + ChainTipSender::new(initial_tip); let mem = NonFinalizedState::new(network); let queued_blocks = QueuedBlocks::default(); @@ -122,7 +123,7 @@ impl StateService { } tracing::info!("no legacy chain found"); - (state, chain_tip_receiver) + (state, latest_chain_tip, chain_tip_change) } /// Queue a finalized block for verification and storage in the finalized state. @@ -769,7 +770,7 @@ impl Service for StateService { } /// Initialize a state service from the provided [`Config`]. -/// Returns a boxed state service, and a receiver for state chain tip updates. +/// Returns a boxed state service, and receivers for state chain tip updates. /// /// Each `network` has its own separate on-disk database. /// @@ -780,10 +781,18 @@ impl Service for StateService { pub fn init( config: Config, network: Network, -) -> (BoxService, ChainTipReceiver) { - let (state_service, chain_tip_receiver) = StateService::new(config, network); +) -> ( + BoxService, + LatestChainTip, + ChainTipChange, +) { + let (state_service, latest_chain_tip, chain_tip_change) = StateService::new(config, network); - (BoxService::new(state_service), chain_tip_receiver) + ( + BoxService::new(state_service), + latest_chain_tip, + chain_tip_change, + ) } /// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot. @@ -791,7 +800,7 @@ pub fn init( /// This can be used to create a state service for testing. See also [`init`]. #[cfg(any(test, feature = "proptest-impl"))] pub fn init_test(network: Network) -> Buffer, Request> { - let (state_service, _) = StateService::new(Config::ephemeral(), network); + let (state_service, _, _) = StateService::new(Config::ephemeral(), network); Buffer::new(BoxService::new(state_service), 1) } diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 04bd2f39b..f86f63a82 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -1,3 +1,10 @@ +//! Access to Zebra chain tip information. +//! +//! Zebra has 3 different interfaces for access to chain tip information: +//! * [zebra_state::Request](crate::request): [tower::Service] requests about chain state, +//! * [LatestChainTip] for efficient access to the current best tip, and +//! * [ChainTipChange] to `await` specific changes to the chain tip. + use std::sync::Arc; use tokio::sync::watch; @@ -6,23 +13,30 @@ use zebra_chain::{block, chain_tip::ChainTip, transaction}; use crate::{request::ContextuallyValidBlock, FinalizedBlock}; +use TipAction::*; + #[cfg(test)] mod tests; -/// The internal watch channel data type for [`ChainTipSender`] and [`ChainTipReceiver`]. +/// The internal watch channel data type for [`ChainTipSender`], [`LatestChainTip`], +/// and [`ChainTipChange`]. type ChainTipData = Option; /// A chain tip block, with precalculated block data. /// -/// Used to efficiently update the [`ChainTipSender`]. +/// Used to efficiently update [`ChainTipSender`], [`LatestChainTip`], +/// and [`ChainTipChange`]. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ChainTipBlock { - pub(crate) hash: block::Hash, - pub(crate) height: block::Height, + /// The hash of the best chain tip block. + pub hash: block::Hash, + + /// The height of the best chain tip block. + pub height: block::Height, /// The mined transaction IDs of the transactions in `block`, /// in the same order as `block.transactions`. - pub(crate) transaction_hashes: Arc<[transaction::Hash]>, + pub transaction_hashes: Arc<[transaction::Hash]>, } impl From for ChainTipBlock { @@ -60,7 +74,7 @@ impl From for ChainTipBlock { } } -/// A sender for recent changes to the non-finalized and finalized chain tips. +/// A sender for changes to the non-finalized and finalized chain tips. #[derive(Debug)] pub struct ChainTipSender { /// Have we got any chain tips from the non-finalized state? @@ -78,23 +92,28 @@ pub struct ChainTipSender { } impl ChainTipSender { - /// Create new linked instances of [`ChainTipSender`] and [`ChainTipReceiver`], + /// Create new linked instances of [`ChainTipSender`], [`LatestChainTip`], and [`ChainTipChange`], /// using `initial_tip` as the tip. - pub fn new(initial_tip: impl Into>) -> (Self, ChainTipReceiver) { + pub fn new( + initial_tip: impl Into>, + ) -> (Self, LatestChainTip, ChainTipChange) { let (sender, receiver) = watch::channel(None); + let mut sender = ChainTipSender { non_finalized_tip: false, sender, active_value: None, }; - let receiver = ChainTipReceiver::new(receiver); + + let current = LatestChainTip::new(receiver.clone()); + let change = ChainTipChange::new(receiver); sender.update(initial_tip); - (sender, receiver) + (sender, current, change) } - /// Update the current finalized tip. + /// Update the latest finalized tip. /// /// May trigger an update to the best tip. pub fn set_finalized_tip(&mut self, new_tip: impl Into>) { @@ -103,7 +122,7 @@ impl ChainTipSender { } } - /// Update the current non-finalized tip. + /// Update the latest non-finalized tip. /// /// May trigger an update to the best tip. pub fn set_best_non_finalized_tip(&mut self, new_tip: impl Into>) { @@ -139,26 +158,35 @@ impl ChainTipSender { } } -/// A receiver for recent changes to the non-finalized and finalized chain tips. +/// Efficient access to the state's current best chain tip. /// -/// The latest changes are available from all cloned instances of this type. +/// Each method returns data from the latest tip, +/// regardless of how many times you call it. +/// +/// Cloned instances provide identical tip data. /// /// The chain tip data is based on: /// * the best non-finalized chain tip, if available, or /// * the finalized tip. +/// +/// ## Note +/// +/// If a lot of blocks are committed at the same time, +/// the latest tip will skip some blocks in the chain. #[derive(Clone, Debug)] -pub struct ChainTipReceiver { +pub struct LatestChainTip { + /// The receiver for the current chain tip's data. receiver: watch::Receiver, } -impl ChainTipReceiver { - /// Create a new chain tip receiver from a watch channel receiver. +impl LatestChainTip { + /// Create a new [`LatestChainTip`] from a watch channel receiver. fn new(receiver: watch::Receiver) -> Self { Self { receiver } } } -impl ChainTip for ChainTipReceiver { +impl ChainTip for LatestChainTip { /// Return the height of the best chain tip. fn best_tip_height(&self) -> Option { self.receiver.borrow().as_ref().map(|block| block.height) @@ -181,3 +209,154 @@ impl ChainTip for ChainTipReceiver { .unwrap_or_else(|| Arc::new([])) } } + +/// A chain tip change monitor. +/// +/// Awaits changes and resets of the state's best chain tip, +/// returning the latest [`TipAction`] once the state is updated. +/// +/// Each cloned instance separately tracks the last block data it provided. +/// If the best chain fork has changed since the last [`tip_change`] on that instance, +/// it returns a [`Reset`]. +/// +/// The chain tip data is based on: +/// * the best non-finalized chain tip, if available, or +/// * the finalized tip. +#[derive(Debug)] +pub struct ChainTipChange { + /// The receiver for the current chain tip's data. + receiver: watch::Receiver, + + /// The most recent hash provided by this instance. + previous_change_hash: Option, +} + +/// Actions that we can take in response to a [`ChainTipChange`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum TipAction { + /// The chain tip was updated continuously, + /// using a child `block` of the previous block. + /// + /// The genesis block action is a `Grow`. + Grow { + /// Information about the block used to grow the chain. + block: ChainTipBlock, + }, + + /// The chain tip was reset to a block with `height` and `hash`. + /// + /// Resets can happen for different reasons: + /// * a newly created or cloned [`ChainTipChange`], which is behind the current tip, + /// * extending the chain with a network upgrade activation block, + /// * switching to a different best [`Chain`], also known as a rollback, and + /// * receiving multiple blocks since the previous change. + /// + /// To keep the code and tests simple, Zebra performs the same reset actions, + /// regardless of the reset reason. + /// + /// `Reset`s do not have the transaction hashes from the tip block, + /// because all transactions should be cleared by a reset. + Reset { + /// The block height of the tip, after the chain reset. + height: block::Height, + + /// The block hash of the tip, after the chain reset. + /// + /// Mainly useful for logging and debugging. + hash: block::Hash, + }, +} + +impl ChainTipChange { + /// Wait until the tip has changed, then return the corresponding [`TipAction`]. + /// + /// The returned action describes how the tip has changed + /// since the last call to this method. + /// + /// If there have been no changes since the last time this method was called, + /// it waits for the next tip change before returning. + /// + /// If there have been multiple changes since the last time this method was called, + /// they are combined into a single [`TipAction::Reset`]. + /// + /// Returns an error if communication with the state is lost. + /// + /// ## Note + /// + /// If a lot of blocks are committed at the same time, + /// the change will skip some blocks, and return a [`Reset`]. + pub async fn tip_change(&mut self) -> Result { + let block = self.tip_block_change().await?; + + // TODO: handle resets here + + self.previous_change_hash = Some(block.hash); + + Ok(Grow { block }) + } + + /// Create a new [`ChainTipChange`] from a watch channel receiver. + fn new(receiver: watch::Receiver) -> Self { + Self { + receiver, + previous_change_hash: None, + } + } + + /// Wait until the next chain tip change, then return the corresponding [`ChainTipBlock`]. + /// + /// Returns an error if communication with the state is lost. + async fn tip_block_change(&mut self) -> Result { + loop { + // If there are multiple changes while this code is executing, + // we don't rely on getting the first block or the latest block + // after the change notification. + // Any block update after the change will do, + // we'll catch up with the tip after the next change. + self.receiver.changed().await?; + + // Wait until there is actually Some block, + // so we don't have `Option`s inside `TipAction`s. + if let Some(block) = self.best_tip_block() { + assert!( + Some(block.hash) != self.previous_change_hash, + "ChainTipSender must ignore unchanged tips" + ); + + return Ok(block); + } + } + } + + /// Return the current best [`ChainTipBlock`], + /// or `None` if no block has been committed yet. + fn best_tip_block(&self) -> Option { + self.receiver.borrow().clone() + } +} + +impl Clone for ChainTipChange { + fn clone(&self) -> Self { + Self { + receiver: self.receiver.clone(), + // clear the previous change hash, so the first action is a reset + previous_change_hash: None, + } + } +} + +impl TipAction { + /// Is this tip action a [`Reset`]? + pub fn is_reset(&self) -> bool { + matches!(self, Reset { .. }) + } + + /// Returns the block hash of this tip action, + /// regardless of the underlying variant. + pub fn best_tip_hash(&self) -> block::Hash { + match self { + Grow { block } => block.hash, + Reset { hash, .. } => *hash, + } + } +} diff --git a/zebra-state/src/service/chain_tip/tests/prop.rs b/zebra-state/src/service/chain_tip/tests/prop.rs index 4a3f14137..f94f8e730 100644 --- a/zebra-state/src/service/chain_tip/tests/prop.rs +++ b/zebra-state/src/service/chain_tip/tests/prop.rs @@ -1,13 +1,15 @@ use std::{env, sync::Arc}; +use futures::FutureExt; use proptest::prelude::*; use proptest_derive::Arbitrary; use zebra_chain::{block::Block, chain_tip::ChainTip}; -use crate::{service::chain_tip::ChainTipBlock, FinalizedBlock}; - -use super::super::ChainTipSender; +use crate::{ + service::chain_tip::{ChainTipBlock, ChainTipSender, TipAction::*}, + FinalizedBlock, +}; const DEFAULT_BLOCK_VEC_PROPTEST_CASES: u32 = 4; @@ -25,7 +27,7 @@ proptest! { fn best_tip_is_latest_non_finalized_then_latest_finalized( tip_updates in any::>(), ) { - let (mut chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(None); + let (mut chain_tip_sender, latest_chain_tip, mut chain_tip_change) = ChainTipSender::new(None); let mut latest_finalized_tip = None; let mut latest_non_finalized_tip = None; @@ -62,16 +64,16 @@ proptest! { .and_then(|(chain_tip, _block)| chain_tip.as_ref()) .map(|chain_tip| chain_tip.height); let expected_height = expected_tip.as_ref().and_then(|(_chain_tip, block)| block.coinbase_height()); - prop_assert_eq!(chain_tip_receiver.best_tip_height(), chain_tip_height); - prop_assert_eq!(chain_tip_receiver.best_tip_height(), expected_height); + prop_assert_eq!(latest_chain_tip.best_tip_height(), chain_tip_height); + prop_assert_eq!(latest_chain_tip.best_tip_height(), expected_height); let chain_tip_hash = expected_tip .as_ref() .and_then(|(chain_tip, _block)| chain_tip.as_ref()) .map(|chain_tip| chain_tip.hash); let expected_hash = expected_tip.as_ref().map(|(_chain_tip, block)| block.hash()); - prop_assert_eq!(chain_tip_receiver.best_tip_hash(), chain_tip_hash); - prop_assert_eq!(chain_tip_receiver.best_tip_hash(), expected_hash); + prop_assert_eq!(latest_chain_tip.best_tip_hash(), chain_tip_hash); + prop_assert_eq!(latest_chain_tip.best_tip_hash(), expected_hash); let chain_tip_transaction_ids = expected_tip .as_ref() @@ -85,13 +87,22 @@ proptest! { .map(|transaction| transaction.hash()) .collect(); prop_assert_eq!( - chain_tip_receiver.best_tip_mined_transaction_ids(), + latest_chain_tip.best_tip_mined_transaction_ids(), chain_tip_transaction_ids ); prop_assert_eq!( - chain_tip_receiver.best_tip_mined_transaction_ids(), + latest_chain_tip.best_tip_mined_transaction_ids(), expected_transaction_ids ); + + prop_assert_eq!( + chain_tip_change + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"), + expected_tip.map(|(_chain_tip, block)| Grow { block: block.into() }) + ); } } diff --git a/zebra-state/src/service/chain_tip/tests/vectors.rs b/zebra-state/src/service/chain_tip/tests/vectors.rs index b02c3c873..1efcfc439 100644 --- a/zebra-state/src/service/chain_tip/tests/vectors.rs +++ b/zebra-state/src/service/chain_tip/tests/vectors.rs @@ -1,29 +1,64 @@ use std::iter; +use futures::FutureExt; + use zebra_chain::chain_tip::{ChainTip, NoChainTip}; use super::super::ChainTipSender; #[test] -fn best_tip_is_initially_empty() { - let (_chain_tip_sender, chain_tip_receiver) = ChainTipSender::new(None); +fn current_best_tip_is_initially_empty() { + let (_chain_tip_sender, latest_chain_tip, _chain_tip_change) = ChainTipSender::new(None); - assert_eq!(chain_tip_receiver.best_tip_height(), None); - assert_eq!(chain_tip_receiver.best_tip_hash(), None); + assert_eq!(latest_chain_tip.best_tip_height(), None); + assert_eq!(latest_chain_tip.best_tip_hash(), None); assert_eq!( - chain_tip_receiver.best_tip_mined_transaction_ids(), + latest_chain_tip.best_tip_mined_transaction_ids(), iter::empty().collect() ); } #[test] -fn empty_chain_tip_is_empty() { - let chain_tip_receiver = NoChainTip; +fn empty_latest_chain_tip_is_empty() { + let latest_chain_tip = NoChainTip; - assert_eq!(chain_tip_receiver.best_tip_height(), None); - assert_eq!(chain_tip_receiver.best_tip_hash(), None); + assert_eq!(latest_chain_tip.best_tip_height(), None); + assert_eq!(latest_chain_tip.best_tip_hash(), None); assert_eq!( - chain_tip_receiver.best_tip_mined_transaction_ids(), + latest_chain_tip.best_tip_mined_transaction_ids(), iter::empty().collect() ); } + +#[test] +fn chain_tip_change_is_initially_not_ready() { + let (_chain_tip_sender, _latest_chain_tip, mut chain_tip_change) = ChainTipSender::new(None); + + let first = chain_tip_change + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"); + + assert_eq!(first, None); + + // try again, just to be sure + let first = chain_tip_change + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"); + + assert_eq!(first, None); + + // also test our manual `Clone` impl + #[allow(clippy::redundant_clone)] + let first_clone = chain_tip_change + .clone() + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"); + + assert_eq!(first_clone, None); +} diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index b567f3105..2f041d0df 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -1,6 +1,6 @@ use std::{convert::TryInto, env, sync::Arc}; -use futures::stream::FuturesUnordered; +use futures::{stream::FuturesUnordered, FutureExt}; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use zebra_chain::{ @@ -17,7 +17,7 @@ use zebra_test::{prelude::*, transcript::Transcript}; use crate::{ arbitrary::Prepare, constants, init_test, - service::StateService, + service::{chain_tip::TipAction::*, StateService}, tests::setup::{partial_nu5_chain_strategy, transaction_v4_from_coinbase}, BoxError, Config, FinalizedBlock, PreparedBlock, Request, Response, }; @@ -297,24 +297,48 @@ proptest! { ) { zebra_test::init(); - let (mut state_service, chain_tip_receiver) = StateService::new(Config::ephemeral(), network); + let (mut state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network); - prop_assert_eq!(chain_tip_receiver.best_tip_height(), None); + prop_assert_eq!(latest_chain_tip.best_tip_height(), None); + prop_assert_eq!( + chain_tip_change + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"), + None + ); for block in finalized_blocks { - let expected_height = block.height; + let expected_block = block.clone(); state_service.queue_and_commit_finalized(block); - prop_assert_eq!(chain_tip_receiver.best_tip_height(), Some(expected_height)); + prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); + prop_assert_eq!( + chain_tip_change + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"), + Some(Grow { block: expected_block.into() }) + ); } for block in non_finalized_blocks { - let expected_height = block.height; + let expected_block = block.clone(); state_service.queue_and_commit_non_finalized(block); - prop_assert_eq!(chain_tip_receiver.best_tip_height(), Some(expected_height)); + prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); + prop_assert_eq!( + chain_tip_change + .tip_change() + .now_or_never() + .transpose() + .expect("watch sender is not dropped"), + Some(Grow { block: expected_block.into() }) + ); } } @@ -332,7 +356,7 @@ proptest! { ) { zebra_test::init(); - let (mut state_service, _) = StateService::new(Config::ephemeral(), network); + let (mut state_service, _, _) = StateService::new(Config::ephemeral(), network); prop_assert_eq!(state_service.disk.current_value_pool(), ValueBalance::zero()); prop_assert_eq!( diff --git a/zebra-state/src/tests/setup.rs b/zebra-state/src/tests/setup.rs index d61d88d05..a31354dd7 100644 --- a/zebra-state/src/tests/setup.rs +++ b/zebra-state/src/tests/setup.rs @@ -84,7 +84,7 @@ pub(crate) fn new_state_with_mainnet_genesis() -> (StateService, FinalizedBlock) .zcash_deserialize_into::>() .expect("block should deserialize"); - let (mut state, _) = StateService::new(Config::ephemeral(), Mainnet); + let (mut state, _, _) = StateService::new(Config::ephemeral(), Mainnet); assert_eq!(None, state.best_tip()); diff --git a/zebra-state/tests/basic.rs b/zebra-state/tests/basic.rs index a32ea43f2..0c8048c85 100644 --- a/zebra-state/tests/basic.rs +++ b/zebra-state/tests/basic.rs @@ -75,7 +75,7 @@ async fn check_transcripts(network: Network) -> Result<(), Report> { Network::Mainnet => mainnet_transcript, Network::Testnet => testnet_transcript, } { - let (service, _) = zebra_state::init(Config::ephemeral(), network); + let (service, _, _) = zebra_state::init(Config::ephemeral(), network); let transcript = Transcript::from(transcript_data.iter().cloned()); /// SPANDOC: check the on disk service against the transcript transcript.check(service).await?; diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 145a15299..8212d1271 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -50,7 +50,8 @@ impl StartCmd { info!(?config); info!("initializing node state"); - let (state_service, chain_tip_receiver) = + // TODO: use ChainTipChange to get tip changes (#2374, #2710, #2711, #2712, #2713, #2714) + let (state_service, latest_chain_tip, _chain_tip_change) = zebra_state::init(config.state.clone(), config.network.network); let state = ServiceBuilder::new().buffer(20).service(state_service); @@ -78,7 +79,7 @@ impl StartCmd { )); let (peer_set, address_book) = - zebra_network::init(config.network.clone(), inbound, chain_tip_receiver).await; + zebra_network::init(config.network.clone(), inbound, latest_chain_tip).await; setup_tx .send((peer_set.clone(), address_book)) .map_err(|_| eyre!("could not send setup data to inbound service"))?;