diff --git a/Cargo.lock b/Cargo.lock index 0a2a8d3de..dfc60d6f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4376,6 +4376,7 @@ dependencies = [ "static_assertions", "subtle", "thiserror", + "tokio", "tracing", "uint", "x25519-dalek", @@ -4564,6 +4565,7 @@ dependencies = [ "futures", "gumdrop", "hyper", + "indexmap", "inferno", "lazy_static", "metrics", diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index 1d2a742c0..45187d9d4 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [features] default = [] -proptest-impl = ["proptest", "proptest-derive", "zebra-test", "rand", "rand_chacha"] +proptest-impl = ["proptest", "proptest-derive", "zebra-test", "rand", "rand_chacha", "tokio"] bench = ["zebra-test"] [dependencies] @@ -60,6 +60,7 @@ proptest-derive = { version = "0.3.0", optional = true } rand = { version = "0.8", optional = true } rand_chacha = { version = "0.3", optional = true } +tokio = { version = "1.15.0", optional = true } # ZF deps ed25519-zebra = "3.0.0" diff --git a/zebra-chain/src/chain_tip.rs b/zebra-chain/src/chain_tip.rs index becf414a8..3ad4624d3 100644 --- a/zebra-chain/src/chain_tip.rs +++ b/zebra-chain/src/chain_tip.rs @@ -4,6 +4,9 @@ use std::sync::Arc; use crate::{block, transaction}; +#[cfg(any(test, feature = "proptest-impl"))] +pub mod mock; + /// An interface for querying the chain tip. /// /// This trait helps avoid dependencies between: diff --git a/zebra-chain/src/chain_tip/mock.rs b/zebra-chain/src/chain_tip/mock.rs new file mode 100644 index 000000000..fe84362c3 --- /dev/null +++ b/zebra-chain/src/chain_tip/mock.rs @@ -0,0 +1,48 @@ +//! Mock [`ChainTip`]s for use in tests. + +use std::sync::Arc; + +use tokio::sync::watch; + +use crate::{block, chain_tip::ChainTip, transaction}; + +/// A sender that sets the `best_tip_height` of a [`MockChainTip`].] +pub type MockChainTipSender = watch::Sender>; + +/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally. +#[derive(Clone, Debug)] +pub struct MockChainTip { + best_tip_height: watch::Receiver>, +} + +impl MockChainTip { + /// Create a new [`MockChainTip`]. + /// + /// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip + /// height. + /// + /// Initially, the best tip height is [`None`]. + pub fn new() -> (Self, MockChainTipSender) { + let (sender, receiver) = watch::channel(None); + + let mock_chain_tip = MockChainTip { + best_tip_height: receiver, + }; + + (mock_chain_tip, sender) + } +} + +impl ChainTip for MockChainTip { + fn best_tip_height(&self) -> Option { + *self.best_tip_height.borrow() + } + + fn best_tip_hash(&self) -> Option { + unreachable!("Method not used in tests"); + } + + fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> { + unreachable!("Method not used in tests"); + } +} diff --git a/zebra-network/src/peer/minimum_peer_version/tests.rs b/zebra-network/src/peer/minimum_peer_version/tests.rs index 54fa687ad..ad9fdd22a 100644 --- a/zebra-network/src/peer/minimum_peer_version/tests.rs +++ b/zebra-network/src/peer/minimum_peer_version/tests.rs @@ -1,54 +1,17 @@ -use std::sync::Arc; +//! Test utilities and tests for minimum network peer version requirements. -use tokio::sync::watch; - -use zebra_chain::{block, chain_tip::ChainTip, parameters::Network, transaction}; +use zebra_chain::{ + chain_tip::mock::{MockChainTip, MockChainTipSender}, + parameters::Network, +}; use super::MinimumPeerVersion; #[cfg(test)] mod prop; -/// A mock [`ChainTip`] implementation that allows setting the `best_tip_height` externally. -#[derive(Clone)] -pub struct MockChainTip { - best_tip_height: watch::Receiver>, -} - -impl MockChainTip { - /// Create a new [`MockChainTip`]. - /// - /// Returns the [`MockChainTip`] instance and the endpoint to modiy the current best tip - /// height. - /// - /// Initially, the best tip height is [`None`]. - pub fn new() -> (Self, watch::Sender>) { - let (sender, receiver) = watch::channel(None); - - let mock_chain_tip = MockChainTip { - best_tip_height: receiver, - }; - - (mock_chain_tip, sender) - } -} - -impl ChainTip for MockChainTip { - fn best_tip_height(&self) -> Option { - *self.best_tip_height.borrow() - } - - fn best_tip_hash(&self) -> Option { - unreachable!("Method not used in `MinimumPeerVersion` tests"); - } - - fn best_tip_mined_transaction_ids(&self) -> Arc<[transaction::Hash]> { - unreachable!("Method not used in `MinimumPeerVersion` tests"); - } -} - impl MinimumPeerVersion { - pub fn with_mock_chain_tip(network: Network) -> (Self, watch::Sender>) { + pub fn with_mock_chain_tip(network: Network) -> (Self, MockChainTipSender) { let (chain_tip, best_tip_height) = MockChainTip::new(); let minimum_peer_version = MinimumPeerVersion::new(chain_tip, network); diff --git a/zebra-network/src/policies.rs b/zebra-network/src/policies.rs index 7cb6c48db..025eb2fc4 100644 --- a/zebra-network/src/policies.rs +++ b/zebra-network/src/policies.rs @@ -6,7 +6,7 @@ use tower::retry::Policy; /// A very basic retry policy with a limited number of retry attempts. /// /// XXX Remove this when https://github.com/tower-rs/tower/pull/414 lands. -#[derive(Clone, Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct RetryLimit { remaining_tries: usize, } @@ -21,25 +21,23 @@ impl RetryLimit { } impl Policy for RetryLimit { - type Future = Pin + Send + 'static>>; + type Future = Pin + Send + Sync + 'static>>; fn retry(&self, req: &Req, result: Result<&Res, &E>) -> Option { if let Err(e) = result { if self.remaining_tries > 0 { tracing::debug!(?req, ?e, remaining_tries = self.remaining_tries, "retrying"); + let remaining_tries = self.remaining_tries - 1; + let retry_outcome = RetryLimit { remaining_tries }; Some( - async move { - // Let other tasks run, so we're more likely to choose a different peer, - // and so that any notfound inv entries win the race to the PeerSet. - // - // TODO: move syncer retries into the PeerSet, - // so we always choose different peers (#3235) - tokio::task::yield_now().await; - RetryLimit { remaining_tries } - } - .boxed(), + // Let other tasks run, so we're more likely to choose a different peer, + // and so that any notfound inv entries win the race to the PeerSet. + // + // TODO: move syncer retries into the PeerSet, + // so we always choose different peers (#3235) + Box::pin(tokio::task::yield_now().map(move |()| retry_outcome)), ) } else { None diff --git a/zebra-test/src/mock_service.rs b/zebra-test/src/mock_service.rs index 239df850d..078a59226 100644 --- a/zebra-test/src/mock_service.rs +++ b/zebra-test/src/mock_service.rs @@ -306,12 +306,13 @@ impl MockService MockService(), + ); response_sender } @@ -362,13 +369,23 @@ impl MockService bool, - ) -> ResponseSender { + ) -> ResponseSender + where + Request: Debug, + { let response_sender = self.next_request().await; - assert!(condition(&response_sender.request)); + assert!( + condition(&response_sender.request), + "condition was false for request: {:?},\n \ + in {}", + response_sender.request, + std::any::type_name::(), + ); response_sender } @@ -400,14 +417,17 @@ impl MockService(), ); } } @@ -422,10 +442,15 @@ impl MockService ResponseSender { match self.try_next_request().await { Some(request) => request, - None => panic!("Timeout while waiting for a request"), + None => panic!( + "timeout while waiting for a request\n \ + in {}", + std::any::type_name::(), + ), } } } @@ -478,6 +503,7 @@ impl MockService MockService(), + ); Ok(response_sender) } @@ -538,13 +570,23 @@ impl MockService bool, - ) -> Result, TestCaseError> { + ) -> Result, TestCaseError> + where + Request: Debug, + { let response_sender = self.next_request().await?; - prop_assert!(condition(&response_sender.request)); + prop_assert!( + condition(&response_sender.request), + "condition was false for request: {:?},\n \ + in {}", + &response_sender.request, + std::any::type_name::(), + ); Ok(response_sender) } @@ -583,6 +625,7 @@ impl MockService Result<(), TestCaseError> where Request: Debug, @@ -591,8 +634,10 @@ impl MockService { prop_assert!( false, - "Received an unexpected request: {:?}", - response_sender.request + "received an unexpected request: {:?},\n \ + in {}", + response_sender.request, + std::any::type_name::(), ); unreachable!("prop_assert!(false) returns an early error"); } @@ -608,13 +653,19 @@ impl MockService Result, TestCaseError> { match self.try_next_request().await { Some(request) => Ok(request), None => { - prop_assert!(false, "Timeout while waiting for a request"); + prop_assert!( + false, + "timeout while waiting for a request\n \ + in {}", + std::any::type_name::(), + ); unreachable!("prop_assert!(false) returns an early error"); } } @@ -643,7 +694,7 @@ impl MockService continue, - Ok(Err(RecvError::Closed)) => unreachable!("Sender is never closed"), + Ok(Err(RecvError::Closed)) => unreachable!("sender is never closed"), Err(_timeout) => return None, } } @@ -685,7 +736,7 @@ impl ResponseSender { &self.request } - /// Respond to the request. + /// Respond to the request using a fixed response value. /// /// The `response` can be of the `Response` type or a [`Result`]. This allows sending an error /// representing an error while processing the request. @@ -693,7 +744,7 @@ impl ResponseSender { /// This method takes ownership of the [`ResponseSender`] so that only one response can be /// sent. /// - /// If this method is not called, the caller will panic. + /// If `respond` or `respond_with` are not called, the caller will panic. /// /// # Example /// @@ -733,6 +784,60 @@ impl ResponseSender { pub fn respond(self, response: impl ResponseResult) { let _ = self.response_sender.send(response.into_result()); } + + /// Respond to the request by calculating a value from the request. + /// + /// The response can be of the `Response` type or a [`Result`]. This allows sending an error + /// representing an error while processing the request. + /// + /// This method takes ownership of the [`ResponseSender`] so that only one response can be + /// sent. + /// + /// If `respond` or `respond_with` are not called, the caller will panic. + /// + /// # Example + /// + /// ``` + /// # use zebra_test::mock_service::MockService; + /// # use tower::{Service, ServiceExt}; + /// # + /// # let reactor = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .expect("Failed to build Tokio runtime"); + /// # + /// # reactor.block_on(async { + /// // Mock a service with a `String` as the service `Error` type. + /// let mut mock_service: MockService<_, _, _, String> = + /// MockService::build().for_unit_tests(); + /// + /// # let mut service = mock_service.clone(); + /// # let task = tokio::spawn(async move { + /// # let first_call_result = (&mut service).oneshot(1).await; + /// # let second_call_result = service.oneshot(1).await; + /// # + /// # (first_call_result, second_call_result) + /// # }); + /// # + /// mock_service + /// .expect_request(1) + /// .await + /// .respond_with(|req| format!("Received: {}", req)); + /// + /// mock_service + /// .expect_request(1) + /// .await + /// .respond_with(|req| Err(format!("Duplicate request: {}", req))); + /// # }); + /// ``` + pub fn respond_with(self, response_fn: F) + where + F: FnOnce(&Request) -> R, + R: ResponseResult, + { + let response_result = response_fn(self.request()).into_result(); + let _ = self.response_sender.send(response_result); + } } /// A representation of an assertion type. @@ -757,7 +862,7 @@ impl AssertionType for PropTestAssertion {} /// A helper trait to improve ergonomics when sending a response. /// /// This allows the [`ResponseSender::respond`] method to receive either a [`Result`] or just the -/// response type that is wrapped in an `Ok` variant. +/// response type, which it automatically wraps in an `Ok` variant. pub trait ResponseResult { /// Converts the type into a [`Result`] that can be sent as a response. fn into_result(self) -> Result; diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 0f0f51835..6c47f5039 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -18,6 +18,7 @@ zebra-state = { path = "../zebra-state" } abscissa_core = "0.5" chrono = "0.4" gumdrop = "0.7" +indexmap = "1.7.0" lazy_static = "1.4.0" serde = { version = "1", features = ["serde_derive"] } toml = "0.5" diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 91111ae24..36a47f1ac 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -144,9 +144,9 @@ impl StartCmd { .send(setup_data) .map_err(|_| eyre!("could not send setup data to inbound service"))?; - let syncer_error_future = syncer.sync(); + let syncer_task_handle = tokio::spawn(syncer.sync()); - let mut sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( + let mut block_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( sync_status.clone(), chain_tip_change.clone(), peer_set.clone(), @@ -169,11 +169,10 @@ impl StartCmd { info!("spawned initial Zebra tasks"); - // TODO: spawn the syncer task, after making the PeerSet marker::Sync and marker::Send - // turn these tasks into a FuturesUnordered? + // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered? - // ongoing futures & tasks - pin!(syncer_error_future); + // ongoing tasks + pin!(syncer_task_handle); pin!(mempool_crawler_task_handle); pin!(mempool_queue_checker_task_handle); pin!(tx_gossip_task_handle); @@ -187,12 +186,11 @@ impl StartCmd { let mut exit_when_task_finishes = true; let result = select! { - // We don't spawn the syncer future into a separate task yet. - // So syncer panics automatically propagate to the main zebrad task. - sync_result = &mut syncer_error_future => sync_result + sync_result = &mut syncer_task_handle => sync_result + .expect("unexpected panic in the syncer task") .map(|_| info!("syncer task exited")), - sync_gossip_result = &mut sync_gossip_task_handle => sync_gossip_result + block_gossip_result = &mut block_gossip_task_handle => block_gossip_result .expect("unexpected panic in the chain tip block gossip task") .map(|_| info!("chain tip block gossip task exited")) .map_err(|e| eyre!(e)), @@ -238,11 +236,9 @@ impl StartCmd { info!("exiting Zebra because an ongoing task exited: stopping other tasks"); - // futures - std::mem::drop(syncer_error_future); - // ongoing tasks - sync_gossip_task_handle.abort(); + syncer_task_handle.abort(); + block_gossip_task_handle.abort(); mempool_crawler_task_handle.abort(); mempool_queue_checker_task_handle.abort(); tx_gossip_task_handle.abort(); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 017a6198b..96f78cbad 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -6,6 +6,7 @@ use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration} use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; +use indexmap::IndexSet; use tokio::time::sleep; use tower::{ builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, @@ -14,6 +15,7 @@ use tower::{ use zebra_chain::{ block::{self, Block}, + chain_tip::ChainTip, parameters::genesis_hash, }; use zebra_consensus::{ @@ -21,7 +23,6 @@ use zebra_consensus::{ }; use zebra_network as zn; use zebra_state as zs; -use zs::LatestChainTip; use crate::{ components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError, @@ -184,14 +185,27 @@ struct CheckedTip { expected_next: block::Hash, } -pub struct ChainSync +pub struct ChainSync where - ZN: Service + Send + Clone + 'static, + ZN: Service + + Send + + Sync + + Clone + + 'static, ZN::Future: Send, - ZS: Service + Send + Clone + 'static, + ZS: Service + + Send + + Sync + + Clone + + 'static, ZS::Future: Send, - ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV: Service, Response = block::Hash, Error = BoxError> + + Send + + Sync + + Clone + + 'static, ZV::Future: Send, + ZSTip: ChainTip + Clone + Send + 'static, { // Configuration /// The genesis hash for the configured network @@ -214,6 +228,7 @@ where Downloads< Hedge>>, AlwaysHedge>, Timeout, + ZSTip, >, >, >, @@ -235,14 +250,27 @@ where /// This component is used for initial block sync, but the `Inbound` service is /// responsible for participating in the gossip protocols used for block /// diffusion. -impl ChainSync +impl ChainSync where - ZN: Service + Send + Clone + 'static, + ZN: Service + + Send + + Sync + + Clone + + 'static, ZN::Future: Send, - ZS: Service + Send + Clone + 'static, + ZS: Service + + Send + + Sync + + Clone + + 'static, ZS::Future: Send, - ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV: Service, Response = block::Hash, Error = BoxError> + + Send + + Sync + + Clone + + 'static, ZV::Future: Send, + ZSTip: ChainTip + Clone + Send + 'static, { /// Returns a new syncer instance, using: /// - chain: the zebra-chain `Network` to download (Mainnet or Testnet) @@ -257,7 +285,7 @@ where peers: ZN, verifier: ZV, state: ZS, - latest_chain_tip: LatestChainTip, + latest_chain_tip: ZSTip, ) -> (Self, SyncStatus) { let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT); // The Hedge middleware is the outermost layer, hedging requests @@ -442,17 +470,21 @@ where tokio::task::yield_now().await; } - requests.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call( + let ready_tip_network = self.tip_network.ready().await; + requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call( zn::Request::FindBlocks { known_blocks: block_locator.clone(), stop: None, }, - )); + ))); } - let mut download_set = HashSet::new(); + let mut download_set = IndexSet::new(); while let Some(res) = requests.next().await { - match res.map_err::(|e| eyre!(e)) { + match res + .expect("panic in spawned obtain tips request") + .map_err::(|e| eyre!(e)) + { Ok(zn::Response::BlockHashes(hashes)) => { tracing::trace!(?hashes); @@ -515,6 +547,9 @@ where ); } + // security: the first response determines our download order + // + // TODO: can we make the download order independent of response order? let prev_download_len = download_set.len(); download_set.extend(unknown_hashes); let new_download_len = download_set.len(); @@ -543,7 +578,7 @@ where metrics::gauge!("sync.obtain.queued.hash.count", new_downloads as f64); // security: use the actual number of new downloads from all peers, - // so a single trailing peer can't toggle our mempool + // so the last peer to respond can't toggle our mempool self.recent_syncs.push_obtain_tips_length(new_downloads); self.request_blocks(download_set).await?; @@ -555,7 +590,7 @@ where async fn extend_tips(&mut self) -> Result<(), Report> { let tips = std::mem::take(&mut self.prospective_tips); - let mut download_set = HashSet::new(); + let mut download_set = IndexSet::new(); tracing::info!(tips = ?tips.len(), "trying to extend chain tips"); for tip in tips { tracing::debug!(?tip, "asking peers to extend chain tip"); @@ -568,15 +603,19 @@ where tokio::task::yield_now().await; } - responses.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call( + let ready_tip_network = self.tip_network.ready().await; + responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call( zn::Request::FindBlocks { known_blocks: vec![tip.tip], stop: None, }, - )); + ))); } while let Some(res) = responses.next().await { - match res.map_err::(|e| eyre!(e)) { + match res + .expect("panic in spawned extend tips request") + .map_err::(|e| eyre!(e)) + { Ok(zn::Response::BlockHashes(hashes)) => { tracing::debug!(first = ?hashes.first(), len = ?hashes.len()); tracing::trace!(?hashes); @@ -654,6 +693,9 @@ where ); } + // security: the first response determines our download order + // + // TODO: can we make the download order independent of response order? let prev_download_len = download_set.len(); download_set.extend(unknown_hashes); let new_download_len = download_set.len(); @@ -673,7 +715,7 @@ where metrics::gauge!("sync.extend.queued.hash.count", new_downloads as f64); // security: use the actual number of new downloads from all peers, - // so a single trailing peer can't toggle our mempool + // so the last peer to respond can't toggle our mempool self.recent_syncs.push_extend_tips_length(new_downloads); self.request_blocks(download_set).await?; @@ -710,7 +752,7 @@ where } /// Queue download and verify tasks for each block that isn't currently known to our node - async fn request_blocks(&mut self, hashes: HashSet) -> Result<(), Report> { + async fn request_blocks(&mut self, hashes: IndexSet) -> Result<(), Report> { tracing::debug!(hashes.len = hashes.len(), "requesting blocks"); for hash in hashes.into_iter() { self.downloads.download_and_verify(hash).await?; @@ -740,7 +782,7 @@ where } } - fn update_metrics(&self) { + fn update_metrics(&mut self) { metrics::gauge!( "sync.prospective_tips.len", self.prospective_tips.len() as f64 @@ -775,6 +817,14 @@ where tracing::debug!(error = ?e, "block verification was cancelled, continuing"); false } + BlockDownloadVerifyError::BehindTipHeightLimit => { + tracing::debug!( + error = ?e, + "block height is behind the current state tip, \ + assuming the syncer will eventually catch up to the state, continuing" + ); + false + } // String matches BlockDownloadVerifyError::Invalid(VerifyChainError::Block( @@ -806,6 +856,7 @@ where if err_str.contains("AlreadyVerified") || err_str.contains("AlreadyInChain") || err_str.contains("Cancelled") + || err_str.contains("BehindTipHeight") || err_str.contains("block is already committed to the state") || err_str.contains("NotFound") { diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 9566c3117..7b00fc585 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -36,17 +36,20 @@ type BoxError = Box; /// `lookahead_limit / VERIFICATION_PIPELINE_SCALING_DIVISOR`. /// /// For the default lookahead limit, the extra number of blocks is -/// `2 * MAX_TIPS_RESPONSE_HASH_COUNT`. +/// `4 * MAX_TIPS_RESPONSE_HASH_COUNT`. /// -/// This allows the verifier and state queues to hold an extra two tips responses worth of blocks, +/// This allows the verifier and state queues to hold a few extra tips responses worth of blocks, /// even if the syncer queue is full. Any unused capacity is shared between both queues. /// +/// If this capacity is exceeded, the downloader will start failing download blocks with +/// [`BlockDownloadVerifyError::AboveLookaheadHeightLimit`], and the syncer will reset. +/// /// Since the syncer queue is limited to the `lookahead_limit`, /// the rest of the capacity is reserved for the other queues. /// There is no reserved capacity for the syncer queue: /// if the other queues stay full, the syncer will eventually time out and reset. const VERIFICATION_PIPELINE_SCALING_DIVISOR: usize = - DEFAULT_LOOKAHEAD_LIMIT / (2 * MAX_TIPS_RESPONSE_HASH_COUNT); + DEFAULT_LOOKAHEAD_LIMIT / (4 * MAX_TIPS_RESPONSE_HASH_COUNT); #[derive(Copy, Clone, Debug)] pub(super) struct AlwaysHedge; @@ -92,12 +95,17 @@ pub enum BlockDownloadVerifyError { /// Represents a [`Stream`] of download and verification tasks during chain sync. #[pin_project] #[derive(Debug)] -pub struct Downloads +pub struct Downloads where - ZN: Service + Send + 'static, + ZN: Service + Send + Sync + 'static, ZN::Future: Send, - ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV: Service, Response = block::Hash, Error = BoxError> + + Send + + Sync + + Clone + + 'static, ZV::Future: Send, + ZSTip: ChainTip + Clone + Send + 'static, { // Services /// A service that forwards requests to connected peers, and returns their @@ -108,7 +116,7 @@ where verifier: ZV, /// Allows efficient access to the best tip of the blockchain. - latest_chain_tip: zs::LatestChainTip, + latest_chain_tip: ZSTip, // Configuration /// The configured lookahead limit, after applying the minimum limit. @@ -125,12 +133,17 @@ where cancel_handles: HashMap>, } -impl Stream for Downloads +impl Stream for Downloads where - ZN: Service + Send + 'static, + ZN: Service + Send + Sync + 'static, ZN::Future: Send, - ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV: Service, Response = block::Hash, Error = BoxError> + + Send + + Sync + + Clone + + 'static, ZV::Future: Send, + ZSTip: ChainTip + Clone + Send + 'static, { type Item = Result; @@ -166,12 +179,17 @@ where } } -impl Downloads +impl Downloads where - ZN: Service + Send + 'static, + ZN: Service + Send + Sync + 'static, ZN::Future: Send, - ZV: Service, Response = block::Hash, Error = BoxError> + Send + Clone + 'static, + ZV: Service, Response = block::Hash, Error = BoxError> + + Send + + Sync + + Clone + + 'static, ZV::Future: Send, + ZSTip: ChainTip + Clone + Send + 'static, { /// Initialize a new download stream with the provided `network` and /// `verifier` services. Uses the `latest_chain_tip` and `lookahead_limit` @@ -180,12 +198,7 @@ where /// The [`Downloads`] stream is agnostic to the network policy, so retry and /// timeout limits should be applied to the `network` service passed into /// this constructor. - pub fn new( - network: ZN, - verifier: ZV, - latest_chain_tip: zs::LatestChainTip, - lookahead_limit: usize, - ) -> Self { + pub fn new(network: ZN, verifier: ZV, latest_chain_tip: ZSTip, lookahead_limit: usize) -> Self { Self { network, verifier, @@ -395,7 +408,7 @@ where } /// Get the number of currently in-flight download tasks. - pub fn in_flight(&self) -> usize { + pub fn in_flight(&mut self) -> usize { self.pending.len() } } diff --git a/zebrad/src/components/sync/tests.rs b/zebrad/src/components/sync/tests.rs index bc99ae6fc..afaa6058d 100644 --- a/zebrad/src/components/sync/tests.rs +++ b/zebrad/src/components/sync/tests.rs @@ -1 +1,4 @@ +//! Syncer tests + mod timing; +mod vectors; diff --git a/zebrad/src/components/sync/tests/vectors.rs b/zebrad/src/components/sync/tests/vectors.rs new file mode 100644 index 000000000..0427b8a59 --- /dev/null +++ b/zebrad/src/components/sync/tests/vectors.rs @@ -0,0 +1,980 @@ +//! Fixed test vectors for the syncer. + +use std::{collections::HashMap, iter, sync::Arc, time::Duration}; + +use color_eyre::Report; +use futures::{Future, FutureExt}; + +use zebra_chain::{ + block::{self, Block}, + chain_tip::mock::{MockChainTip, MockChainTipSender}, + serialization::ZcashDeserializeInto, +}; +use zebra_consensus::Config as ConsensusConfig; +use zebra_state::Config as StateConfig; +use zebra_test::mock_service::{MockService, PanicAssertion}; + +use zebra_network as zn; +use zebra_state as zs; + +use crate::{ + components::{ + sync::{self, SyncStatus}, + ChainSync, + }, + config::ZebradConfig, +}; + +/// Maximum time to wait for a request to any test service. +/// +/// The default [`MockService`] value can be too short for some of these tests that take a little +/// longer than expected to actually send the request. +/// +/// Increasing this value causes the tests to take longer to complete, so it can't be too large. +const MAX_SERVICE_REQUEST_DELAY: Duration = Duration::from_millis(500); + +/// Test that the syncer downloads genesis, blocks 1-2 using obtain_tips, and blocks 3-4 using extend_tips. +/// +/// This test also makes sure that the syncer downloads blocks in order. +#[tokio::test(flavor = "multi_thread")] +async fn sync_blocks_ok() -> Result<(), crate::BoxError> { + // Get services + let ( + chain_sync_future, + _sync_status, + mut chain_verifier, + mut peer_set, + mut state_service, + _mock_chain_tip_sender, + ) = setup(); + + // Get blocks + let block0: Arc = + zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?; + let block0_hash = block0.hash(); + + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?; + let block1_hash = block1.hash(); + + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?; + let block2_hash = block2.hash(); + + let block3: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?; + let block3_hash = block3.hash(); + + let block4: Arc = zebra_test::vectors::BLOCK_MAINNET_4_BYTES.zcash_deserialize_into()?; + let block4_hash = block4.hash(); + + let block5: Arc = zebra_test::vectors::BLOCK_MAINNET_5_BYTES.zcash_deserialize_into()?; + let block5_hash = block5.hash(); + + // Start the syncer + let chain_sync_task_handle = tokio::spawn(chain_sync_future); + + // ChainSync::request_genesis + + // State is checked for genesis + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Block 0 is fetched and committed to the state + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block0.clone()])); + + chain_verifier + .expect_request(block0) + .await + .respond(block0_hash); + + // Check that nothing unexpected happened. + // We expect more requests to the state service, because the syncer keeps on running. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for genesis again + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(Some(0))); + + // ChainSync::obtain_tips + + // State is asked for a block locator. + state_service + .expect_request(zs::Request::BlockLocator) + .await + .respond(zs::Response::BlockLocator(vec![block0_hash])); + + // Network is sent the block locator + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block1_hash, // tip + block2_hash, // expected_next + block3_hash, // (discarded - last hash, possibly incorrect) + ])); + + // State is checked for the first unknown block (block 1) + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test obtain tips error"))); + } + + // Check that nothing unexpected happened. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for all non-tip blocks (blocks 1 & 2) in response order + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + state_service + .expect_request(zs::Request::Depth(block2_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Blocks 1 & 2 are fetched in order, then verified concurrently + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block1.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block2.clone()])); + + // We can't guarantee the verification request order + let mut remaining_blocks: HashMap> = + [(block1_hash, block1), (block2_hash, block2)] + .iter() + .cloned() + .collect(); + + for _ in 1..=2 { + chain_verifier + .expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some()) + .await + .respond_with(|req| req.hash()); + } + assert_eq!( + remaining_blocks, + HashMap::new(), + "expected all non-tip blocks to be verified by obtain tips" + ); + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + // ChainSync::extend_tips + + // Network is sent a block locator based on the tip + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block1_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block2_hash, // tip (discarded - already fetched) + block3_hash, // expected_next + block4_hash, + block5_hash, // (discarded - last hash, possibly incorrect) + ])); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block1_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test extend tips error"))); + } + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + // Blocks 3 & 4 are fetched in order, then verified concurrently + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block3.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block4.clone()])); + + // We can't guarantee the verification request order + let mut remaining_blocks: HashMap> = + [(block3_hash, block3), (block4_hash, block4)] + .iter() + .cloned() + .collect(); + + for _ in 3..=4 { + chain_verifier + .expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some()) + .await + .respond_with(|req| req.hash()); + } + assert_eq!( + remaining_blocks, + HashMap::new(), + "expected all non-tip blocks to be verified by extend tips" + ); + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + let chain_sync_result = chain_sync_task_handle.now_or_never(); + assert!( + matches!(chain_sync_result, None), + "unexpected error or panic in chain sync task: {:?}", + chain_sync_result, + ); + + Ok(()) +} + +/// Test that the syncer downloads genesis, blocks 1-2 using obtain_tips, and blocks 3-4 using extend_tips, +/// with duplicate block hashes. +/// +/// This test also makes sure that the syncer downloads blocks in order. +#[tokio::test(flavor = "multi_thread")] +async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> { + // Get services + let ( + chain_sync_future, + _sync_status, + mut chain_verifier, + mut peer_set, + mut state_service, + _mock_chain_tip_sender, + ) = setup(); + + // Get blocks + let block0: Arc = + zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?; + let block0_hash = block0.hash(); + + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?; + let block1_hash = block1.hash(); + + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?; + let block2_hash = block2.hash(); + + let block3: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?; + let block3_hash = block3.hash(); + + let block4: Arc = zebra_test::vectors::BLOCK_MAINNET_4_BYTES.zcash_deserialize_into()?; + let block4_hash = block4.hash(); + + let block5: Arc = zebra_test::vectors::BLOCK_MAINNET_5_BYTES.zcash_deserialize_into()?; + let block5_hash = block5.hash(); + + // Start the syncer + let chain_sync_task_handle = tokio::spawn(chain_sync_future); + + // ChainSync::request_genesis + + // State is checked for genesis + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Block 0 is fetched and committed to the state + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block0.clone()])); + + chain_verifier + .expect_request(block0) + .await + .respond(block0_hash); + + // Check that nothing unexpected happened. + // We expect more requests to the state service, because the syncer keeps on running. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for genesis again + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(Some(0))); + + // ChainSync::obtain_tips + + // State is asked for a block locator. + state_service + .expect_request(zs::Request::BlockLocator) + .await + .respond(zs::Response::BlockLocator(vec![block0_hash])); + + // Network is sent the block locator + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block1_hash, + block1_hash, + block1_hash, // tip + block2_hash, // expected_next + block3_hash, // (discarded - last hash, possibly incorrect) + ])); + + // State is checked for the first unknown block (block 1) + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test obtain tips error"))); + } + + // Check that nothing unexpected happened. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for all non-tip blocks (blocks 1 & 2) in response order + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + state_service + .expect_request(zs::Request::Depth(block2_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Blocks 1 & 2 are fetched in order, then verified concurrently + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block1.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block2.clone()])); + + // We can't guarantee the verification request order + let mut remaining_blocks: HashMap> = + [(block1_hash, block1), (block2_hash, block2)] + .iter() + .cloned() + .collect(); + + for _ in 1..=2 { + chain_verifier + .expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some()) + .await + .respond_with(|req| req.hash()); + } + assert_eq!( + remaining_blocks, + HashMap::new(), + "expected all non-tip blocks to be verified by obtain tips" + ); + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + // ChainSync::extend_tips + + // Network is sent a block locator based on the tip + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block1_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block2_hash, // tip (discarded - already fetched) + block3_hash, // expected_next + block4_hash, + block3_hash, + block4_hash, + block5_hash, // (discarded - last hash, possibly incorrect) + ])); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block1_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test extend tips error"))); + } + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + // Blocks 3 & 4 are fetched in order, then verified concurrently + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block3.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block4.clone()])); + + // We can't guarantee the verification request order + let mut remaining_blocks: HashMap> = + [(block3_hash, block3), (block4_hash, block4)] + .iter() + .cloned() + .collect(); + + for _ in 3..=4 { + chain_verifier + .expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some()) + .await + .respond_with(|req| req.hash()); + } + assert_eq!( + remaining_blocks, + HashMap::new(), + "expected all non-tip blocks to be verified by extend tips" + ); + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + let chain_sync_result = chain_sync_task_handle.now_or_never(); + assert!( + matches!(chain_sync_result, None), + "unexpected error or panic in chain sync task: {:?}", + chain_sync_result, + ); + + Ok(()) +} + +/// Test that zebra-network rejects blocks with the wrong hash. +#[tokio::test] +async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> { + // Get services + let ( + chain_sync_future, + _sync_status, + mut chain_verifier, + mut peer_set, + mut state_service, + _mock_chain_tip_sender, + ) = setup(); + + // Get blocks + let block0: Arc = + zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?; + let block0_hash = block0.hash(); + + // Get a block that is a long way away from genesis + let block982k: Arc = + zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + + // Start the syncer + let chain_sync_task_handle = tokio::spawn(chain_sync_future); + + // State is checked for genesis + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Block 0 is fetched, but the peer returns a much higher block + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block982k.clone()])); + + // Block is dropped because it has the wrong hash. + // We expect more requests to the state service, because the syncer keeps on running. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + let chain_sync_result = chain_sync_task_handle.now_or_never(); + assert!( + matches!(chain_sync_result, None), + "unexpected error or panic in chain sync task: {:?}", + chain_sync_result, + ); + + Ok(()) +} + +/// Test that the sync downloader rejects blocks that are too high in obtain_tips. +/// +/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.) +#[tokio::test(flavor = "multi_thread")] +async fn sync_block_too_high_obtain_tips() -> Result<(), crate::BoxError> { + // Get services + let ( + chain_sync_future, + _sync_status, + mut chain_verifier, + mut peer_set, + mut state_service, + _mock_chain_tip_sender, + ) = setup(); + + // Get blocks + let block0: Arc = + zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?; + let block0_hash = block0.hash(); + + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?; + let block1_hash = block1.hash(); + + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?; + let block2_hash = block2.hash(); + + let block3: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?; + let block3_hash = block3.hash(); + + // Also get a block that is a long way away from genesis + let block982k: Arc = + zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + let block982k_hash = block982k.hash(); + + // Start the syncer + let chain_sync_task_handle = tokio::spawn(chain_sync_future); + + // ChainSync::request_genesis + + // State is checked for genesis + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Block 0 is fetched and committed to the state + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block0.clone()])); + + chain_verifier + .expect_request(block0) + .await + .respond(block0_hash); + + // Check that nothing unexpected happened. + // We expect more requests to the state service, because the syncer keeps on running. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for genesis again + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(Some(0))); + + // ChainSync::obtain_tips + + // State is asked for a block locator. + state_service + .expect_request(zs::Request::BlockLocator) + .await + .respond(zs::Response::BlockLocator(vec![block0_hash])); + + // Network is sent the block locator + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block982k_hash, + block1_hash, // tip + block2_hash, // expected_next + block3_hash, // (discarded - last hash, possibly incorrect) + ])); + + // State is checked for the first unknown block (block 982k) + state_service + .expect_request(zs::Request::Depth(block982k_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test obtain tips error"))); + } + + // Check that nothing unexpected happened. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for all non-tip blocks (blocks 982k, 1, 2) in response order + state_service + .expect_request(zs::Request::Depth(block982k_hash)) + .await + .respond(zs::Response::Depth(None)); + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + state_service + .expect_request(zs::Request::Depth(block2_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Blocks 982k, 1, 2 are fetched in order, then verified concurrently, + // but block 982k verification is skipped because it is too high. + peer_set + .expect_request(zn::Request::BlocksByHash( + iter::once(block982k_hash).collect(), + )) + .await + .respond(zn::Response::Blocks(vec![block982k.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block1.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block2.clone()])); + + // At this point, the following tasks race: + // - The valid chain verifier requests + // - The block too high error, which causes a syncer reset and ChainSync::obtain_tips + // - ChainSync::extend_tips for the next tip + + let chain_sync_result = chain_sync_task_handle.now_or_never(); + assert!( + matches!(chain_sync_result, None), + "unexpected error or panic in chain sync task: {:?}", + chain_sync_result, + ); + + Ok(()) +} + +/// Test that the sync downloader rejects blocks that are too high in extend_tips. +/// +/// TODO: also test that it rejects blocks behind the tip limit. (Needs ~100 fake blocks.) +#[tokio::test(flavor = "multi_thread")] +async fn sync_block_too_high_extend_tips() -> Result<(), crate::BoxError> { + // Get services + let ( + chain_sync_future, + _sync_status, + mut chain_verifier, + mut peer_set, + mut state_service, + _mock_chain_tip_sender, + ) = setup(); + + // Get blocks + let block0: Arc = + zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES.zcash_deserialize_into()?; + let block0_hash = block0.hash(); + + let block1: Arc = zebra_test::vectors::BLOCK_MAINNET_1_BYTES.zcash_deserialize_into()?; + let block1_hash = block1.hash(); + + let block2: Arc = zebra_test::vectors::BLOCK_MAINNET_2_BYTES.zcash_deserialize_into()?; + let block2_hash = block2.hash(); + + let block3: Arc = zebra_test::vectors::BLOCK_MAINNET_3_BYTES.zcash_deserialize_into()?; + let block3_hash = block3.hash(); + + let block4: Arc = zebra_test::vectors::BLOCK_MAINNET_4_BYTES.zcash_deserialize_into()?; + let block4_hash = block4.hash(); + + let block5: Arc = zebra_test::vectors::BLOCK_MAINNET_5_BYTES.zcash_deserialize_into()?; + let block5_hash = block5.hash(); + + // Also get a block that is a long way away from genesis + let block982k: Arc = + zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?; + let block982k_hash = block982k.hash(); + + // Start the syncer + let chain_sync_task_handle = tokio::spawn(chain_sync_future); + + // ChainSync::request_genesis + + // State is checked for genesis + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Block 0 is fetched and committed to the state + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block0.clone()])); + + chain_verifier + .expect_request(block0) + .await + .respond(block0_hash); + + // Check that nothing unexpected happened. + // We expect more requests to the state service, because the syncer keeps on running. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for genesis again + state_service + .expect_request(zs::Request::Depth(block0_hash)) + .await + .respond(zs::Response::Depth(Some(0))); + + // ChainSync::obtain_tips + + // State is asked for a block locator. + state_service + .expect_request(zs::Request::BlockLocator) + .await + .respond(zs::Response::BlockLocator(vec![block0_hash])); + + // Network is sent the block locator + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block1_hash, // tip + block2_hash, // expected_next + block3_hash, // (discarded - last hash, possibly incorrect) + ])); + + // State is checked for the first unknown block (block 1) + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block0_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test obtain tips error"))); + } + + // Check that nothing unexpected happened. + peer_set.expect_no_requests().await; + chain_verifier.expect_no_requests().await; + + // State is checked for all non-tip blocks (blocks 1 & 2) in response order + state_service + .expect_request(zs::Request::Depth(block1_hash)) + .await + .respond(zs::Response::Depth(None)); + state_service + .expect_request(zs::Request::Depth(block2_hash)) + .await + .respond(zs::Response::Depth(None)); + + // Blocks 1 & 2 are fetched in order, then verified concurrently + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block1_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block1.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block2_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block2.clone()])); + + // We can't guarantee the verification request order + let mut remaining_blocks: HashMap> = + [(block1_hash, block1), (block2_hash, block2)] + .iter() + .cloned() + .collect(); + + for _ in 1..=2 { + chain_verifier + .expect_request_that(|req| remaining_blocks.remove(&req.hash()).is_some()) + .await + .respond_with(|req| req.hash()); + } + assert_eq!( + remaining_blocks, + HashMap::new(), + "expected all non-tip blocks to be verified by obtain tips" + ); + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + // ChainSync::extend_tips + + // Network is sent a block locator based on the tip + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block1_hash], + stop: None, + }) + .await + .respond(zn::Response::BlockHashes(vec![ + block2_hash, // tip (discarded - already fetched) + block3_hash, // expected_next + block4_hash, + block982k_hash, + block5_hash, // (discarded - last hash, possibly incorrect) + ])); + + // Clear remaining block locator requests + for _ in 0..(sync::FANOUT - 1) { + peer_set + .expect_request(zn::Request::FindBlocks { + known_blocks: vec![block1_hash], + stop: None, + }) + .await + .respond(Err(zn::BoxError::from("synthetic test extend tips error"))); + } + + // Check that nothing unexpected happened. + chain_verifier.expect_no_requests().await; + state_service.expect_no_requests().await; + + // Blocks 3, 4, 982k are fetched in order, then verified concurrently, + // but block 982k verification is skipped because it is too high. + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block3_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block3.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash(iter::once(block4_hash).collect())) + .await + .respond(zn::Response::Blocks(vec![block4.clone()])); + peer_set + .expect_request(zn::Request::BlocksByHash( + iter::once(block982k_hash).collect(), + )) + .await + .respond(zn::Response::Blocks(vec![block982k.clone()])); + + // At this point, the following tasks race: + // - The valid chain verifier requests + // - The block too high error, which causes a syncer reset and ChainSync::obtain_tips + // - ChainSync::extend_tips for the next tip + + let chain_sync_result = chain_sync_task_handle.now_or_never(); + assert!( + matches!(chain_sync_result, None), + "unexpected error or panic in chain sync task: {:?}", + chain_sync_result, + ); + + Ok(()) +} + +fn setup() -> ( + // ChainSync + impl Future> + Send, + SyncStatus, + // ChainVerifier + MockService, block::Hash, PanicAssertion>, + // PeerSet + MockService, + // StateService + MockService, + MockChainTipSender, +) { + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let config = ZebradConfig { + consensus: consensus_config, + state: state_config, + ..Default::default() + }; + + // These tests run multiple tasks in parallel. + // So machines under heavy load need a longer delay. + // (For example, CI machines with limited cores.) + let peer_set = MockService::build() + .with_max_request_delay(MAX_SERVICE_REQUEST_DELAY) + .for_unit_tests(); + + let chain_verifier = MockService::build() + .with_max_request_delay(MAX_SERVICE_REQUEST_DELAY) + .for_unit_tests(); + + let state_service = MockService::build() + .with_max_request_delay(MAX_SERVICE_REQUEST_DELAY) + .for_unit_tests(); + + let (mock_chain_tip, mock_chain_tip_sender) = MockChainTip::new(); + + let (chain_sync, sync_status) = ChainSync::new( + &config, + peer_set.clone(), + chain_verifier.clone(), + state_service.clone(), + mock_chain_tip, + ); + + let chain_sync_future = chain_sync.sync(); + + ( + chain_sync_future, + sync_status, + chain_verifier, + peer_set, + state_service, + mock_chain_tip_sender, + ) +}