diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 1b1e9440d..d0035b71b 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -13,7 +13,12 @@ use futures::{ channel::{mpsc, oneshot}, future, FutureExt, SinkExt, StreamExt, }; -use tokio::{net::TcpStream, sync::broadcast, task::JoinError, time::timeout}; +use tokio::{ + net::TcpStream, + sync::{broadcast, watch}, + task::JoinError, + time::timeout, +}; use tokio_util::codec::Framed; use tower::Service; use tracing::{span, Level, Span}; @@ -53,6 +58,7 @@ pub struct Handshake { our_services: PeerServices, relay: bool, parent_span: Span, + best_tip_height: Option>>, } /// The peer address that we are handshaking with. @@ -302,6 +308,7 @@ pub struct Builder { user_agent: Option, relay: Option, inv_collector: Option>, + best_tip_height: Option>>, } impl Builder @@ -361,6 +368,18 @@ where self } + /// Provide a realtime endpoint to obtain the current best chain tip block height. Optional. + /// + /// If this is unset, the minimum accepted protocol version for peer connections is kept + /// constant over network upgrade activations. + pub fn with_best_tip_height( + mut self, + best_tip_height: Option>>, + ) -> Self { + self.best_tip_height = best_tip_height; + self + } + /// Whether to request that peers relay transactions to our node. Optional. /// /// If this is unset, the node will not request transactions. @@ -402,6 +421,7 @@ where our_services, relay, parent_span: Span::current(), + best_tip_height: self.best_tip_height, }) } } @@ -424,6 +444,7 @@ where our_services: None, relay: None, inv_collector: None, + best_tip_height: None, } } } @@ -433,6 +454,7 @@ where /// /// We split `Handshake` into its components before calling this function, /// to avoid infectious `Sync` bounds on the returned future. +#[allow(clippy::too_many_arguments)] pub async fn negotiate_version( peer_conn: &mut Framed, connected_addr: &ConnectedAddr, @@ -441,6 +463,7 @@ pub async fn negotiate_version( user_agent: String, our_services: PeerServices, relay: bool, + best_tip_height: Option>>, ) -> Result<(Version, PeerServices, SocketAddr), HandshakeError> { // Create a random nonce for this connection let local_nonce = Nonce::default(); @@ -552,17 +575,11 @@ pub async fn negotiate_version( Err(HandshakeError::NonceReuse)?; } - // TODO: Reject connections with nodes that don't know about the current network upgrade (#1334) - // Use the latest non-finalized block height, rather than the minimum - if remote_version - < Version::min_remote_for_height( - config.network, - // This code will be replaced in #1334 - constants::INITIAL_MIN_NETWORK_PROTOCOL_VERSION - .activation_height(config.network) - .expect("minimum network protocol network upgrade has an activation height"), - ) - { + // SECURITY: Reject connections to peers on old versions, because they might not know about all + // network upgrades and could lead to chain forks or slower block propagation. + let height = best_tip_height.and_then(|height| *height.borrow()); + let min_version = Version::min_remote_for_height(config.network, height); + if remote_version < min_version { // Disconnect if peer is using an obsolete version. Err(HandshakeError::ObsoleteVersion(remote_version))?; } @@ -617,6 +634,7 @@ where let user_agent = self.user_agent.clone(); let our_services = self.our_services; let relay = self.relay; + let best_tip_height = self.best_tip_height.clone(); let fut = async move { debug!( @@ -647,6 +665,7 @@ where user_agent, our_services, relay, + best_tip_height, ), ) .await??; diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 8b907b3a3..763712037 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -12,7 +12,11 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, TryFutureExt, }; -use tokio::{net::TcpListener, sync::broadcast, time::Instant}; +use tokio::{ + net::TcpListener, + sync::{broadcast, watch}, + time::Instant, +}; use tower::{ buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover, util::BoxService, Service, ServiceExt, @@ -25,7 +29,7 @@ use crate::{ BoxError, Config, Request, Response, }; -use zebra_chain::parameters::Network; +use zebra_chain::{block, parameters::Network}; use super::CandidateSet; use super::PeerSet; @@ -59,6 +63,7 @@ type PeerChange = Result, BoxError>; pub async fn init( config: Config, inbound_service: S, + best_tip_height: Option>>, ) -> ( Buffer, Request>, Arc>, @@ -87,6 +92,7 @@ where .with_timestamp_collector(timestamp_collector) .with_advertised_services(PeerServices::NODE_NETWORK) .with_user_agent(crate::constants::USER_AGENT.to_string()) + .with_best_tip_height(best_tip_height) .want_transactions(true) .finish() .expect("configured all required parameters"); diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs index 7fe372562..405496310 100644 --- a/zebra-network/src/peer_set/initialize/tests/vectors.rs +++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs @@ -110,7 +110,7 @@ async fn local_listener_port_with(listen_addr: SocketAddr, network: Network) { let inbound_service = service_fn(|_| async { unreachable!("inbound service should never be called") }); - let (_peer_service, address_book) = init(config, inbound_service).await; + let (_peer_service, address_book) = init(config, inbound_service, None).await; let local_listener = address_book.lock().unwrap().local_listener_meta_addr(); if listen_addr.port() == 0 { diff --git a/zebra-network/src/protocol/external/types.rs b/zebra-network/src/protocol/external/types.rs index 6eef69873..a69bce09b 100644 --- a/zebra-network/src/protocol/external/types.rs +++ b/zebra-network/src/protocol/external/types.rs @@ -48,7 +48,11 @@ impl Version { /// # Panics /// /// If we are incompatible with our own minimum remote protocol version. - pub fn min_remote_for_height(network: Network, height: block::Height) -> Version { + pub fn min_remote_for_height( + network: Network, + height: impl Into>, + ) -> Version { + let height = height.into().unwrap_or(block::Height(0)); let min_spec = Version::min_specified_for_height(network, height); // shut down if our own version is too old diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 81149e839..b2ddfdf0b 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -6,28 +6,28 @@ use std::{ time::{Duration, Instant}, }; -use check::difficulty::POW_MEDIAN_BLOCK_SPAN; use futures::future::FutureExt; use non_finalized_state::{NonFinalizedState, QueuedBlocks}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; #[cfg(any(test, feature = "proptest-impl"))] use tower::buffer::Buffer; use tower::{util::BoxService, Service}; use tracing::instrument; use zebra_chain::{ block::{self, Block}, - parameters::POW_AVERAGING_WINDOW, parameters::{Network, NetworkUpgrade}, transaction, transaction::Transaction, transparent, }; +use self::best_tip_height::BestTipHeight; use crate::{ constants, request::HashOrHeight, BoxError, CloneError, CommitBlockError, Config, FinalizedBlock, PreparedBlock, Request, Response, ValidateContextError, }; +mod best_tip_height; pub(crate) mod check; mod finalized_state; mod non_finalized_state; @@ -63,14 +63,21 @@ pub(crate) struct StateService { network: Network, /// Instant tracking the last time `pending_utxos` was pruned last_prune: Instant, + /// The current best chain tip height. + best_tip_height: BestTipHeight, } impl StateService { const PRUNE_INTERVAL: Duration = Duration::from_secs(30); - pub fn new(config: Config, network: Network) -> Self { + pub fn new(config: Config, network: Network) -> (Self, watch::Receiver>) { + let (mut best_tip_height, best_tip_height_receiver) = BestTipHeight::new(); let disk = FinalizedState::new(&config, network); + if let Some(finalized_height) = disk.finalized_tip_height() { + best_tip_height.set_finalized_height(finalized_height); + } + let mem = NonFinalizedState::new(network); let queued_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); @@ -82,6 +89,7 @@ impl StateService { pending_utxos, network, last_prune: Instant::now(), + best_tip_height, }; tracing::info!("starting legacy chain check"); @@ -108,7 +116,23 @@ impl StateService { } tracing::info!("no legacy chain found"); - state + (state, best_tip_height_receiver) + } + + /// Queue a finalized block for verification and storage in the finalized state. + fn queue_and_commit_finalized( + &mut self, + finalized: FinalizedBlock, + ) -> oneshot::Receiver> { + let (rsp_tx, rsp_rx) = oneshot::channel(); + + self.disk.queue_and_commit_finalized((finalized, rsp_tx)); + + if let Some(finalized_height) = self.disk.finalized_tip_height() { + self.best_tip_height.set_finalized_height(finalized_height); + } + + rsp_rx } /// Queue a non finalized block for verification and check if any queued @@ -165,10 +189,17 @@ impl StateService { ); } - self.queued_blocks - .prune_by_height(self.disk.finalized_tip_height().expect( + let finalized_tip_height = self.disk.finalized_tip_height().expect( "Finalized state must have at least one block before committing non-finalized state", - )); + ); + let non_finalized_tip_height = self.mem.best_tip().map(|(height, _hash)| height); + + self.queued_blocks.prune_by_height(finalized_tip_height); + + self.best_tip_height + .set_finalized_height(finalized_tip_height); + self.best_tip_height + .set_best_non_finalized_height(non_finalized_tip_height); tracing::trace!("finished processing queued block"); rsp_rx @@ -204,23 +235,6 @@ impl StateService { let queued_children = self.queued_blocks.dequeue_children(parent_hash); for (child, rsp_tx) in queued_children { - // required by validate_and_commit, moved here to make testing easier - assert!( - child.height > self.network.mandatory_checkpoint_height(), - "invalid non-finalized block height: the canopy checkpoint is mandatory, \ - pre-canopy blocks, and the canopy activation block, \ - must be committed to the state as finalized blocks" - ); - - // required by check_contextual_validity, moved here to make testing easier - let relevant_chain = - self.any_ancestor_blocks(child.block.header.previous_block_hash); - assert!( - relevant_chain.len() >= POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN, - "contextual validation requires at least \ - 28 (POW_AVERAGING_WINDOW + POW_MEDIAN_BLOCK_SPAN) blocks" - ); - let child_hash = child.hash; let result; @@ -504,6 +518,17 @@ impl StateService { let intersection = self.find_best_chain_intersection(known_blocks); self.collect_best_chain_hashes(intersection, stop, max_len) } + + /// Assert some assumptions about the prepared `block` before it is validated. + fn assert_block_can_be_validated(&self, block: &PreparedBlock) { + // required by validate_and_commit, moved here to make testing easier + assert!( + block.height > self.network.mandatory_checkpoint_height(), + "invalid non-finalized block height: the canopy checkpoint is mandatory, pre-canopy \ + blocks, and the canopy activation block, must be committed to the state as finalized \ + blocks" + ); + } } pub(crate) struct Iter<'a> { @@ -640,6 +665,8 @@ impl Service for StateService { Request::CommitBlock(prepared) => { metrics::counter!("state.requests", 1, "type" => "commit_block"); + self.assert_block_can_be_validated(&prepared); + self.pending_utxos .check_against_ordered(&prepared.new_outputs); let rsp_rx = self.queue_and_commit_non_finalized(prepared); @@ -656,10 +683,8 @@ impl Service for StateService { Request::CommitFinalizedBlock(finalized) => { metrics::counter!("state.requests", 1, "type" => "commit_finalized_block"); - let (rsp_tx, rsp_rx) = oneshot::channel(); - self.pending_utxos.check_against(&finalized.new_outputs); - self.disk.queue_and_commit_finalized((finalized, rsp_tx)); + let rsp_rx = self.queue_and_commit_finalized(finalized); async move { rsp_rx @@ -748,8 +773,16 @@ impl Service for StateService { /// possible to construct multiple state services in the same application (as /// long as they, e.g., use different storage locations), but doing so is /// probably not what you want. -pub fn init(config: Config, network: Network) -> BoxService { - BoxService::new(StateService::new(config, network)) +pub fn init( + config: Config, + network: Network, +) -> ( + BoxService, + watch::Receiver>, +) { + let (state_service, best_tip_height) = StateService::new(config, network); + + (BoxService::new(state_service), best_tip_height) } /// Initialize a state service with an ephemeral [`Config`] and a buffer with a single slot. @@ -757,7 +790,7 @@ pub fn init(config: Config, network: Network) -> BoxService Buffer, Request> { - let state_service = StateService::new(Config::ephemeral(), network); + let (state_service, _) = StateService::new(Config::ephemeral(), network); Buffer::new(BoxService::new(state_service), 1) } diff --git a/zebra-state/src/service/best_tip_height.rs b/zebra-state/src/service/best_tip_height.rs new file mode 100644 index 000000000..34f6f005e --- /dev/null +++ b/zebra-state/src/service/best_tip_height.rs @@ -0,0 +1,70 @@ +use tokio::sync::watch; + +use zebra_chain::block; + +#[cfg(test)] +mod tests; + +/// A helper type to determine the best chain tip block height. +/// +/// The block height is determined based on the current finalized block height and the current best +/// non-finalized chain's tip block height. The height is made available from a [`watch::Receiver`]. +#[derive(Debug)] +pub struct BestTipHeight { + finalized: Option, + non_finalized: Option, + sender: watch::Sender>, + // TODO: Replace with calls to `watch::Sender::borrow` once Tokio is updated to 1.0.0 (#2573) + active_value: Option, +} + +impl BestTipHeight { + /// Create a new instance of [`BestTipHeight`] and the [`watch::Receiver`] endpoint for the + /// current best tip block height. + pub fn new() -> (Self, watch::Receiver>) { + let (sender, receiver) = watch::channel(None); + + ( + BestTipHeight { + finalized: None, + non_finalized: None, + sender, + active_value: None, + }, + receiver, + ) + } + + /// Update the current finalized block height. + /// + /// May trigger an update to best tip height. + pub fn set_finalized_height(&mut self, new_height: block::Height) { + if self.finalized != Some(new_height) { + self.finalized = Some(new_height); + self.update(); + } + } + + /// Update the current non-finalized block height. + /// + /// May trigger an update to the best tip height. + pub fn set_best_non_finalized_height(&mut self, new_height: Option) { + if self.non_finalized != new_height { + self.non_finalized = new_height; + self.update(); + } + } + + /// Possibly send an update to listeners. + /// + /// An update is only sent if the current best tip height is different from the last best tip + /// height that was sent. + fn update(&mut self) { + let new_value = self.non_finalized.max(self.finalized); + + if new_value != self.active_value { + let _ = self.sender.send(new_value); + self.active_value = new_value; + } + } +} diff --git a/zebra-state/src/service/best_tip_height/tests.rs b/zebra-state/src/service/best_tip_height/tests.rs new file mode 100644 index 000000000..cc95d9d45 --- /dev/null +++ b/zebra-state/src/service/best_tip_height/tests.rs @@ -0,0 +1,2 @@ +mod prop; +mod vectors; diff --git a/zebra-state/src/service/best_tip_height/tests/prop.rs b/zebra-state/src/service/best_tip_height/tests/prop.rs new file mode 100644 index 000000000..9228dd80b --- /dev/null +++ b/zebra-state/src/service/best_tip_height/tests/prop.rs @@ -0,0 +1,47 @@ +use proptest::prelude::*; +use proptest_derive::Arbitrary; + +use zebra_chain::block; + +use super::super::BestTipHeight; + +proptest! { + #[test] + fn best_tip_value_is_heighest_of_latest_finalized_and_non_finalized_heights( + height_updates in any::>(), + ) { + let (mut best_tip_height, receiver) = BestTipHeight::new(); + + let mut latest_finalized_height = None; + let mut latest_non_finalized_height = None; + + for update in height_updates { + match update { + HeightUpdate::Finalized(height) => { + best_tip_height.set_finalized_height(height); + latest_finalized_height = Some(height); + } + HeightUpdate::NonFinalized(height) => { + best_tip_height.set_best_non_finalized_height(height); + latest_non_finalized_height = height; + } + } + } + + let expected_height = match (latest_finalized_height, latest_non_finalized_height) { + (Some(finalized_height), Some(non_finalized_height)) => { + Some(finalized_height.max(non_finalized_height)) + } + (finalized_height, None) => finalized_height, + (None, non_finalized_height) => non_finalized_height, + }; + + prop_assert_eq!(*receiver.borrow(), expected_height); + } +} + +#[derive(Arbitrary, Clone, Copy, Debug)] +enum HeightUpdate { + Finalized(block::Height), + NonFinalized(Option), +} diff --git a/zebra-state/src/service/best_tip_height/tests/vectors.rs b/zebra-state/src/service/best_tip_height/tests/vectors.rs new file mode 100644 index 000000000..6231f3905 --- /dev/null +++ b/zebra-state/src/service/best_tip_height/tests/vectors.rs @@ -0,0 +1,8 @@ +use super::super::BestTipHeight; + +#[test] +fn best_tip_value_is_initially_empty() { + let (_best_tip_height, receiver) = BestTipHeight::new(); + + assert_eq!(*receiver.borrow(), None); +} diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index d1b34d4bb..e1d218baf 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -2,16 +2,21 @@ use std::{env, sync::Arc}; use futures::stream::FuturesUnordered; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; + use zebra_chain::{ block::Block, parameters::{Network, NetworkUpgrade}, - serialization::ZcashDeserializeInto, + serialization::{ZcashDeserialize, ZcashDeserializeInto}, transaction, transparent, }; use zebra_test::{prelude::*, transcript::Transcript}; use crate::{ - constants, init_test, tests::setup::partial_nu5_chain_strategy, BoxError, Request, Response, + arbitrary::Prepare, + constants, init_test, + service::StateService, + tests::setup::{partial_nu5_chain_strategy, transaction_v4_from_coinbase}, + BoxError, Config, FinalizedBlock, PreparedBlock, Request, Response, }; const LAST_BLOCK_HEIGHT: u32 = 10; @@ -274,4 +279,85 @@ proptest! { prop_assert_eq!(response, Ok(())); } + + /// Test that the best tip height is updated accordingly. + /// + /// 1. Generate a finalized chain and some non-finalized blocks. + /// 2. Check that initially the best tip height is empty. + /// 3. Commit the finalized blocks and check that the best tip height is updated accordingly. + /// 4. Commit the non-finalized blocks and check that the best tip height is also updated + /// accordingly. + #[test] + fn best_tip_height_is_updated( + (network, finalized_blocks, non_finalized_blocks) + in continuous_empty_blocks_from_test_vectors(), + ) { + zebra_test::init(); + + let (mut state_service, best_tip_height) = StateService::new(Config::ephemeral(), network); + + prop_assert_eq!(*best_tip_height.borrow(), None); + + for block in finalized_blocks { + let expected_height = block.height; + + state_service.queue_and_commit_finalized(block); + + prop_assert_eq!(*best_tip_height.borrow(), Some(expected_height)); + } + + for block in non_finalized_blocks { + let expected_height = block.height; + + state_service.queue_and_commit_non_finalized(block); + + prop_assert_eq!(*best_tip_height.borrow(), Some(expected_height)); + } + } +} + +/// Test strategy to generate a chain split in two from the test vectors. +/// +/// Selects either the mainnet or testnet chain test vector and randomly splits the chain in two +/// lists of blocks. The first containing the blocks to be finalized (which always includes at +/// least the genesis block) and the blocks to be stored in the non-finalized state. +fn continuous_empty_blocks_from_test_vectors( +) -> impl Strategy, Vec)> { + any::() + .prop_flat_map(|network| { + // Select the test vector based on the network + let raw_blocks = match network { + Network::Mainnet => &*zebra_test::vectors::CONTINUOUS_MAINNET_BLOCKS, + Network::Testnet => &*zebra_test::vectors::CONTINUOUS_TESTNET_BLOCKS, + }; + + // Transform the test vector's block bytes into a vector of `PreparedBlock`s. + let blocks: Vec<_> = raw_blocks + .iter() + .map(|(_height, &block_bytes)| { + let mut block_reader: &[u8] = block_bytes; + let mut block = Block::zcash_deserialize(&mut block_reader) + .expect("Failed to deserialize block from test vector"); + + let coinbase = transaction_v4_from_coinbase(&block.transactions[0]); + block.transactions = vec![Arc::new(coinbase)]; + + Arc::new(block).prepare() + }) + .collect(); + + // Always finalize the genesis block + let finalized_blocks_count = 1..=blocks.len(); + + (Just(network), Just(blocks), finalized_blocks_count) + }) + .prop_map(|(network, mut blocks, finalized_blocks_count)| { + let non_finalized_blocks = blocks.split_off(finalized_blocks_count); + let finalized_blocks: Vec<_> = blocks + .into_iter() + .map(|prepared_block| FinalizedBlock::from(prepared_block.block)) + .collect(); + + (network, finalized_blocks, non_finalized_blocks) + }) } diff --git a/zebra-state/src/tests/setup.rs b/zebra-state/src/tests/setup.rs index 66cdfb08b..d490a561a 100644 --- a/zebra-state/src/tests/setup.rs +++ b/zebra-state/src/tests/setup.rs @@ -83,7 +83,7 @@ pub(crate) fn new_state_with_mainnet_genesis() -> (StateService, FinalizedBlock) .zcash_deserialize_into::>() .expect("block should deserialize"); - let mut state = StateService::new(Config::ephemeral(), Mainnet); + let (mut state, _) = StateService::new(Config::ephemeral(), Mainnet); assert_eq!(None, state.best_tip()); diff --git a/zebra-state/tests/basic.rs b/zebra-state/tests/basic.rs index 8a1e36325..1462b8dcf 100644 --- a/zebra-state/tests/basic.rs +++ b/zebra-state/tests/basic.rs @@ -7,7 +7,6 @@ use color_eyre::eyre::Report; use once_cell::sync::Lazy; use std::sync::Arc; -use tempdir::TempDir; use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserialize}; use zebra_test::transcript::{ExpectedTranscriptError, Transcript}; @@ -76,20 +75,10 @@ async fn check_transcripts(network: Network) -> Result<(), Report> { Network::Mainnet => mainnet_transcript, Network::Testnet => testnet_transcript, } { - let storage_guard = TempDir::new("")?; - let cache_dir = storage_guard.path().to_owned(); - let service = zebra_state::init( - Config { - cache_dir, - ..Config::default() - }, - network, - ); + let service = zebra_state::init_test(network); let transcript = Transcript::from(transcript_data.iter().cloned()); /// SPANDOC: check the on disk service against the transcript transcript.check(service).await?; - // Delete the contents of the temp directory before going to the next case. - std::mem::drop(storage_guard); } Ok(()) diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 3fd2caa68..96138e4ec 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -49,10 +49,9 @@ impl StartCmd { info!(?config); info!("initializing node state"); - let state = ServiceBuilder::new().buffer(20).service(zebra_state::init( - config.state.clone(), - config.network.network, - )); + let (state_service, best_tip_height) = + zebra_state::init(config.state.clone(), config.network.network); + let state = ServiceBuilder::new().buffer(20).service(state_service); info!("initializing verifiers"); let verifier = zebra_consensus::chain::init( @@ -72,7 +71,8 @@ impl StartCmd { .buffer(20) .service(Inbound::new(setup_rx, state.clone(), verifier.clone())); - let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound).await; + let (peer_set, address_book) = + zebra_network::init(config.network.clone(), inbound, Some(best_tip_height)).await; setup_tx .send((peer_set.clone(), address_book)) .map_err(|_| eyre!("could not send setup data to inbound service"))?;