diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e0c855bd..4c0ff790c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Next Release (Draft) -(Draft notes for the next release can be added here.) +This release improves Zebra's sync and verification performance under heavy load. +(TODO - complete the summary.) +### Configuration Changes + +- Split the checkpoint and full verification [`sync` concurrency options](https://doc.zebra.zfnd.org/zebrad/config/struct.SyncSection.html) (#4726): + - Add a new `full_verify_concurrency_limit` + - Rename `max_concurrent_block_requests` to `download_concurrency_limit` + - Rename `lookahead_limit` to `checkpoint_verify_concurrency_limit` + For backwards compatibility, the old names are still accepted as aliases. + +(TODO - insert changelog here) ## [Zebra 1.0.0-beta.12](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-beta.12) - 2022-06-29 diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index c244184e6..a2a413a89 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -27,7 +27,7 @@ use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use tracing::{instrument, Span}; use zebra_chain::{ - block::{self, Block}, + block::{self, Block, Height}, parameters::Network, }; @@ -163,7 +163,8 @@ where /// config parameter and if the download is not already started. /// /// Returns a block verifier, transaction verifier, -/// and the Groth16 parameter download task [`JoinHandle`]. +/// the Groth16 parameter download task [`JoinHandle`], +/// and the maximum configured checkpoint verification height. /// /// The consensus configuration is specified by `config`, and the Zcash network /// to verify blocks for is specified by `network`. @@ -203,6 +204,7 @@ pub async fn init( transaction::Request, >, JoinHandle<()>, + Height, ) where S: Service + Send + Clone + 'static, @@ -266,5 +268,10 @@ where let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND); - (chain, transaction, groth16_download_handle) + ( + chain, + transaction, + groth16_download_handle, + max_checkpoint_height, + ) } diff --git a/zebra-consensus/src/chain/tests.rs b/zebra-consensus/src/chain/tests.rs index ad3f7b41a..305b65445 100644 --- a/zebra-consensus/src/chain/tests.rs +++ b/zebra-consensus/src/chain/tests.rs @@ -64,7 +64,7 @@ async fn verifiers_from_network( + 'static, ) { let state_service = zs::init_test(network); - let (chain_verifier, _transaction_verifier, _groth16_download_handle) = + let (chain_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) = crate::chain::init(Config::default(), network, state_service.clone(), true).await; // We can drop the download task handle here, because: @@ -161,7 +161,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> { // init_from_verifiers. // // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. - let (chain_verifier, _transaction_verifier, _groth16_download_handle) = + let (chain_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) = super::init(config.clone(), network, zs::init_test(network), true).await; // Add a timeout layer diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index d4b48d4ee..9c792e340 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -67,8 +67,6 @@ //! //! Some of the diagnostic features are optional, and need to be enabled at compile-time. -use std::cmp::max; - use abscissa_core::{config, Command, FrameworkError, Options, Runnable}; use color_eyre::eyre::{eyre, Report}; use futures::FutureExt; @@ -119,13 +117,16 @@ impl StartCmd { let inbound = ServiceBuilder::new() .load_shed() .buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY) - .service(Inbound::new(setup_rx)); + .service(Inbound::new( + config.sync.full_verify_concurrency_limit, + setup_rx, + )); let (peer_set, address_book) = zebra_network::init(config.network.clone(), inbound, latest_chain_tip.clone()).await; info!("initializing verifiers"); - let (chain_verifier, tx_verifier, mut groth16_download_handle) = + let (chain_verifier, tx_verifier, mut groth16_download_handle, max_checkpoint_height) = zebra_consensus::chain::init( config.consensus.clone(), config.network.network, @@ -137,6 +138,7 @@ impl StartCmd { info!("initializing syncer"); let (syncer, sync_status) = ChainSync::new( &config, + max_checkpoint_height, peer_set.clone(), chain_verifier.clone(), state.clone(), @@ -342,15 +344,19 @@ impl StartCmd { fn state_buffer_bound() -> usize { let config = app_config().clone(); + // Ignore the checkpoint verify limit, because it is very large. + // // TODO: do we also need to account for concurrent use across services? // we could multiply the maximum by 3/2, or add a fixed constant - max( - config.sync.max_concurrent_block_requests, - max( - inbound::downloads::MAX_INBOUND_CONCURRENCY, - mempool::downloads::MAX_INBOUND_CONCURRENCY, - ), - ) + [ + config.sync.download_concurrency_limit, + config.sync.full_verify_concurrency_limit, + inbound::downloads::MAX_INBOUND_CONCURRENCY, + mempool::downloads::MAX_INBOUND_CONCURRENCY, + ] + .into_iter() + .max() + .unwrap() } } diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 0eb5a34d3..363b6411a 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -35,9 +35,10 @@ use zebra_network::{ }; use zebra_node_services::mempool; +use crate::BoxError; + // Re-use the syncer timeouts for consistency. use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; -use crate::BoxError; use InventoryResponse::*; @@ -85,6 +86,13 @@ pub enum Setup { /// /// All requests are ignored. Pending { + // Configuration + // + /// The configured full verification concurrency limit. + full_verify_concurrency_limit: usize, + + // Services + // /// A oneshot channel used to receive required services, /// after they are set up. setup: oneshot::Receiver, @@ -94,6 +102,8 @@ pub enum Setup { /// /// All requests are answered. Initialized { + // Services + // /// A shared list of peer addresses. address_book: Arc>, @@ -169,9 +179,15 @@ impl Inbound { /// Create a new inbound service. /// /// Dependent services are sent via the `setup` channel after initialization. - pub fn new(setup: oneshot::Receiver) -> Inbound { + pub fn new( + full_verify_concurrency_limit: usize, + setup: oneshot::Receiver, + ) -> Inbound { Inbound { - setup: Setup::Pending { setup }, + setup: Setup::Pending { + full_verify_concurrency_limit, + setup, + }, } } @@ -200,7 +216,10 @@ impl Service for Inbound { let result; self.setup = match self.take_setup() { - Setup::Pending { mut setup } => match setup.try_recv() { + Setup::Pending { + full_verify_concurrency_limit, + mut setup, + } => match setup.try_recv() { Ok(setup_data) => { let InboundSetupData { address_book, @@ -212,6 +231,7 @@ impl Service for Inbound { } = setup_data; let block_downloads = Box::pin(BlockDownloads::new( + full_verify_concurrency_limit, Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT), Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT), state.clone(), @@ -229,7 +249,10 @@ impl Service for Inbound { Err(TryRecvError::Empty) => { // There's no setup data yet, so keep waiting for it result = Ok(()); - Setup::Pending { setup } + Setup::Pending { + full_verify_concurrency_limit, + setup, + } } Err(error @ TryRecvError::Closed) => { // Mark the service as failed, because setup failed @@ -256,6 +279,7 @@ impl Service for Inbound { while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {} result = Ok(()); + Setup::Initialized { address_book, block_downloads, diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index 89b031ab7..250b90483 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -25,6 +25,8 @@ use zebra_chain::{ use zebra_network as zn; use zebra_state as zs; +use crate::components::sync::MIN_CONCURRENCY_LIMIT; + type BoxError = Box; /// The maximum number of concurrent inbound download and verify tasks. @@ -64,7 +66,7 @@ pub enum DownloadAction { /// The queue is at capacity, so this request was ignored. /// /// The sync service should discover this block later, when we are closer - /// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. + /// to the tip. The queue's capacity is [`Downloads.full_verify_concurrency_limit`]. FullQueue, } @@ -80,7 +82,13 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send, { + // Configuration + // + /// The configured full verification concurrency limit, after applying the minimum limit. + full_verify_concurrency_limit: usize, + // Services + // /// A service that forwards requests to connected peers, and returns their /// responses. network: ZN, @@ -95,6 +103,7 @@ where latest_chain_tip: zs::LatestChainTip, // Internal downloads state + // /// A list of pending block download and verify tasks. #[pin] pending: FuturesUnordered>>, @@ -162,8 +171,19 @@ where /// The [`Downloads`] stream is agnostic to the network policy, so retry and /// timeout limits should be applied to the `network` service passed into /// this constructor. - pub fn new(network: ZN, verifier: ZV, state: ZS, latest_chain_tip: zs::LatestChainTip) -> Self { + pub fn new( + full_verify_concurrency_limit: usize, + network: ZN, + verifier: ZV, + state: ZS, + latest_chain_tip: zs::LatestChainTip, + ) -> Self { + // The syncer already warns about the minimum. + let full_verify_concurrency_limit = + full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY); + Self { + full_verify_concurrency_limit, network, verifier, state, @@ -182,8 +202,8 @@ where debug!( ?hash, queue_len = self.pending.len(), - ?MAX_INBOUND_CONCURRENCY, - "block hash already queued for inbound download: ignored block" + concurrency_limit = self.full_verify_concurrency_limit, + "block hash already queued for inbound download: ignored block", ); metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64); @@ -192,12 +212,12 @@ where return DownloadAction::AlreadyQueued; } - if self.pending.len() >= MAX_INBOUND_CONCURRENCY { + if self.pending.len() >= self.full_verify_concurrency_limit { debug!( ?hash, queue_len = self.pending.len(), - ?MAX_INBOUND_CONCURRENCY, - "too many blocks queued for inbound download: ignored block" + concurrency_limit = self.full_verify_concurrency_limit, + "too many blocks queued for inbound download: ignored block", ); metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64); @@ -213,6 +233,7 @@ where let network = self.network.clone(); let verifier = self.verifier.clone(); let latest_chain_tip = self.latest_chain_tip.clone(); + let full_verify_concurrency_limit = self.full_verify_concurrency_limit; let fut = async move { // Check if the block is already in the state. @@ -232,7 +253,7 @@ where assert_eq!( blocks.len(), 1, - "wrong number of blocks in response to a single hash" + "wrong number of blocks in response to a single hash", ); blocks @@ -257,11 +278,11 @@ where let tip_height = latest_chain_tip.best_tip_height(); let max_lookahead_height = if let Some(tip_height) = tip_height { - let lookahead = i32::try_from(MAX_INBOUND_CONCURRENCY).expect("fits in i32"); + let lookahead = i32::try_from(full_verify_concurrency_limit).expect("fits in i32"); (tip_height + lookahead).expect("tip is much lower than Height::MAX") } else { let genesis_lookahead = - u32::try_from(MAX_INBOUND_CONCURRENCY - 1).expect("fits in u32"); + u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32"); block::Height(genesis_lookahead) }; @@ -296,8 +317,8 @@ where ?block_height, ?tip_height, ?max_lookahead_height, - lookahead_limit = ?MAX_INBOUND_CONCURRENCY, - "gossiped block height too far ahead of the tip: dropped downloaded block" + lookahead_limit = full_verify_concurrency_limit, + "gossiped block height too far ahead of the tip: dropped downloaded block", ); metrics::counter!("gossip.max.height.limit.dropped.block.count", 1); @@ -309,7 +330,7 @@ where ?tip_height, ?min_accepted_height, behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT, - "gossiped block height behind the finalized tip: dropped downloaded block" + "gossiped block height behind the finalized tip: dropped downloaded block", ); metrics::counter!("gossip.min.height.limit.dropped.block.count", 1); @@ -353,8 +374,8 @@ where debug!( ?hash, queue_len = self.pending.len(), - ?MAX_INBOUND_CONCURRENCY, - "queued hash for download" + concurrency_limit = self.full_verify_concurrency_limit, + "queued hash for download", ); metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64); diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index afdca7880..b39ade688 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -29,7 +29,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ components::{ - inbound::{Inbound, InboundSetupData}, + inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData}, mempool::{ gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig, Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError, @@ -708,7 +708,7 @@ async fn setup( let mut state_service = ServiceBuilder::new().buffer(1).service(state); // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. - let (block_verifier, _transaction_verifier, _groth16_download_handle) = + let (block_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) = zebra_consensus::chain::init( consensus_config.clone(), network, @@ -785,7 +785,7 @@ async fn setup( let inbound_service = ServiceBuilder::new() .load_shed() - .service(Inbound::new(setup_rx)); + .service(Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx)); let inbound_service = BoxService::new(inbound_service); let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service); diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index c6dbb46f2..dc5ef0096 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -29,7 +29,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ components::{ - inbound::{Inbound, InboundSetupData}, + inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData}, mempool::{gossip_mempool_transaction_id, Config as MempoolConfig, Mempool}, sync::{self, BlockGossipError, SyncStatus}, }, @@ -637,7 +637,7 @@ async fn setup( // Inbound let (setup_tx, setup_rx) = oneshot::channel(); - let inbound_service = Inbound::new(setup_rx); + let inbound_service = Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx); let inbound_service = ServiceBuilder::new() .boxed_clone() .load_shed() diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 0d96276d6..f3c0a4b91 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -2,7 +2,7 @@ //! //! It is used when Zebra is a long way behind the current chain tip. -use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration}; +use std::{cmp::max, collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration}; use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; @@ -14,7 +14,7 @@ use tower::{ }; use zebra_chain::{ - block::{self, Block}, + block::{self, Block, Height}, chain_tip::ChainTip, parameters::genesis_hash, }; @@ -57,7 +57,7 @@ const FANOUT: usize = 3; /// retries may be concurrent, inner retries are sequential. const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3; -/// A lower bound on the user-specified lookahead limit. +/// A lower bound on the user-specified checkpoint verification concurrency limit. /// /// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's /// worth of blocks. @@ -76,14 +76,20 @@ const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3; /// Once these malicious blocks start failing validation, the syncer will cancel all /// the pending download and verify tasks, drop all the blocks, and start a new /// ObtainTips with a new set of peers. -pub const MIN_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP; +pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP; /// The default for the user-specified lookahead limit. /// -/// See [`MIN_LOOKAHEAD_LIMIT`] for details. +/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details. /// /// TODO: increase to `MAX_CHECKPOINT_HEIGHT_GAP * 5`, after we implement orchard batching -pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3; +pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = + zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3; + +/// A lower bound on the user-specified concurrency limit. +/// +/// If the concurrency limit is 0, Zebra can't download or verify any blocks. +pub const MIN_CONCURRENCY_LIMIT: usize = 1; /// The expected maximum number of hashes in an ObtainTips or ExtendTips response. /// @@ -91,7 +97,7 @@ pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGH /// but still limit the number of blocks in the pipeline between the downloader and /// the state. /// -/// See [`MIN_LOOKAHEAD_LIMIT`] for details. +/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details. pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500; /// Controls how long we wait for a tips response to return. @@ -214,13 +220,21 @@ where ZSTip: ChainTip + Clone + Send + 'static, { // Configuration + // /// The genesis hash for the configured network genesis_hash: block::Hash, - /// The configured lookahead limit, after applying the minimum limit. - lookahead_limit: usize, + /// The largest block height for the checkpoint verifier, based on the current config. + max_checkpoint_height: Height, + + /// The configured checkpoint verification concurrency limit, after applying the minimum limit. + checkpoint_verify_concurrency_limit: usize, + + /// The configured full verification concurrency limit, after applying the minimum limit. + full_verify_concurrency_limit: usize, // Services + // /// A network service which is used to perform ObtainTips and ExtendTips /// requests. /// @@ -246,6 +260,7 @@ where latest_chain_tip: ZSTip, // Internal sync state + // /// The tips that the syncer is currently following. prospective_tips: HashSet, @@ -291,11 +306,44 @@ where /// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip. pub fn new( config: &ZebradConfig, + max_checkpoint_height: Height, peers: ZN, verifier: ZV, state: ZS, latest_chain_tip: ZSTip, ) -> (Self, SyncStatus) { + let mut download_concurrency_limit = config.sync.download_concurrency_limit; + let mut checkpoint_verify_concurrency_limit = + config.sync.checkpoint_verify_concurrency_limit; + let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit; + + if download_concurrency_limit < MIN_CONCURRENCY_LIMIT { + warn!( + "configured download concurrency limit {} too low, increasing to {}", + config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT, + ); + + download_concurrency_limit = MIN_CONCURRENCY_LIMIT; + } + + if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT { + warn!( + "configured checkpoint verify concurrency limit {} too low, increasing to {}", + config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT, + ); + + checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT; + } + + if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT { + warn!( + "configured full verify concurrency limit {} too low, increasing to {}", + config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT, + ); + + full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT; + } + let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT); // The Hedge middleware is the outermost layer, hedging requests // between two retry-wrapped networks. The innermost timeout @@ -306,12 +354,9 @@ where // abstracts away spurious failures from individual peers // making a less-fallible network service, and the Hedge layer // tries to reduce latency of that less-fallible service. - // - // XXX add ServiceBuilder::hedge() so this becomes - // ServiceBuilder::new().hedge(...).retry(...)... let block_network = Hedge::new( ServiceBuilder::new() - .concurrency_limit(config.sync.max_concurrent_block_requests) + .concurrency_limit(download_concurrency_limit) .retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT)) .timeout(BLOCK_DOWNLOAD_TIMEOUT) .service(peers), @@ -324,27 +369,23 @@ where // We apply a timeout to the verifier to avoid hangs due to missing earlier blocks. let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT); - let mut lookahead_limit = config.sync.lookahead_limit; - if lookahead_limit < MIN_LOOKAHEAD_LIMIT { - warn!( - "configured lookahead limit {} too low, increasing to {}", - config.sync.lookahead_limit, MIN_LOOKAHEAD_LIMIT, - ); - - lookahead_limit = MIN_LOOKAHEAD_LIMIT; - } - let (sync_status, recent_syncs) = SyncStatus::new(); let new_syncer = Self { genesis_hash: genesis_hash(config.network.network), - lookahead_limit, + max_checkpoint_height, + checkpoint_verify_concurrency_limit, + full_verify_concurrency_limit, tip_network, downloads: Box::pin(Downloads::new( block_network, verifier, latest_chain_tip.clone(), - lookahead_limit, + // TODO: change the download lookahead for full verification? + max( + checkpoint_verify_concurrency_limit, + full_verify_concurrency_limit, + ), )), state, latest_chain_tip, @@ -397,58 +438,62 @@ where state_tip = ?self.latest_chain_tip.best_tip_height(), "starting sync, obtaining new tips" ); - if let Err(e) = self.obtain_tips().await { + let mut extra_hashes = self.obtain_tips().await.map_err(|e| { info!("temporary error obtaining tips: {:#}", e); - return Err(e); - } + e + })?; self.update_metrics(); - while !self.prospective_tips.is_empty() { + while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() { // Check whether any block tasks are currently ready: while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { - Self::handle_block_response(rsp)?; + self.handle_block_response(rsp)?; } self.update_metrics(); - // If we have too many pending tasks, wait for some to finish. - // - // Starting to wait is interesting, but logging each wait can be - // very verbose. - if self.downloads.in_flight() > self.lookahead_limit { - tracing::info!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - lookahead_limit = self.lookahead_limit, - "waiting for pending blocks", - ); - } - while self.downloads.in_flight() > self.lookahead_limit { + while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) { trace!( tips.len = self.prospective_tips.len(), in_flight = self.downloads.in_flight(), - lookahead_limit = self.lookahead_limit, + extra_hashes = extra_hashes.len(), + lookahead_limit = self.lookahead_limit(extra_hashes.len()), state_tip = ?self.latest_chain_tip.best_tip_height(), "waiting for pending blocks", ); let response = self.downloads.next().await.expect("downloads is nonempty"); - Self::handle_block_response(response)?; + self.handle_block_response(response)?; self.update_metrics(); } - // Once we're below the lookahead limit, we can keep extending the tips. - info!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - lookahead_limit = self.lookahead_limit, - state_tip = ?self.latest_chain_tip.best_tip_height(), - "extending tips", - ); + // Once we're below the lookahead limit, we can request more blocks or hashes. + if !extra_hashes.is_empty() { + debug!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + extra_hashes = extra_hashes.len(), + lookahead_limit = self.lookahead_limit(extra_hashes.len()), + state_tip = ?self.latest_chain_tip.best_tip_height(), + "requesting more blocks", + ); - if let Err(e) = self.extend_tips().await { - info!("temporary error extending tips: {:#}", e); - return Err(e); + let response = self.request_blocks(extra_hashes).await; + extra_hashes = Self::handle_hash_response(response)?; + } else { + info!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + extra_hashes = extra_hashes.len(), + lookahead_limit = self.lookahead_limit(extra_hashes.len()), + state_tip = ?self.latest_chain_tip.best_tip_height(), + "extending tips", + ); + + extra_hashes = self.extend_tips().await.map_err(|e| { + info!("temporary error extending tips: {:#}", e); + e + })?; } self.update_metrics(); } @@ -461,7 +506,7 @@ where /// Given a block_locator list fan out request for subsequent hashes to /// multiple peers #[instrument(skip(self))] - async fn obtain_tips(&mut self) -> Result<(), Report> { + async fn obtain_tips(&mut self) -> Result, Report> { let block_locator = self .state .ready() @@ -604,13 +649,12 @@ where self.recent_syncs.push_obtain_tips_length(new_downloads); let response = self.request_blocks(download_set).await; - Self::handle_response(response)?; - Ok(()) + Self::handle_hash_response(response).map_err(Into::into) } #[instrument(skip(self))] - async fn extend_tips(&mut self) -> Result<(), Report> { + async fn extend_tips(&mut self) -> Result, Report> { let tips = std::mem::take(&mut self.prospective_tips); let mut download_set = IndexSet::new(); @@ -742,9 +786,8 @@ where self.recent_syncs.push_extend_tips_length(new_downloads); let response = self.request_blocks(download_set).await; - Self::handle_response(response)?; - Ok(()) + Self::handle_hash_response(response).map_err(Into::into) } /// Download and verify the genesis block, if it isn't currently known to @@ -766,7 +809,9 @@ where let response = self.downloads.next().await.expect("downloads is nonempty"); match response { - Ok(hash) => trace!(?hash, "verified and committed block to state"), + Ok(response) => self + .handle_block_response(Ok(response)) + .expect("never returns Err for Ok"), Err(error) => { // TODO: exit syncer on permanent service errors (NetworkError, VerifierError) if Self::should_restart_sync(&error) { @@ -789,34 +834,92 @@ where Ok(()) } - /// Queue download and verify tasks for each block that isn't currently known to our node + /// Queue download and verify tasks for each block that isn't currently known to our node. + /// + /// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel? async fn request_blocks( &mut self, - hashes: IndexSet, - ) -> Result<(), BlockDownloadVerifyError> { - debug!(hashes.len = hashes.len(), "requesting blocks"); + mut hashes: IndexSet, + ) -> Result, BlockDownloadVerifyError> { + let lookahead_limit = self.lookahead_limit(hashes.len()); + + debug!( + hashes.len = hashes.len(), + ?lookahead_limit, + "requesting blocks", + ); + + let extra_hashes = if hashes.len() > lookahead_limit { + hashes.split_off(lookahead_limit) + } else { + IndexSet::new() + }; + for hash in hashes.into_iter() { self.downloads.download_and_verify(hash).await?; } - Ok(()) + Ok(extra_hashes) + } + + /// The configured lookahead limit, based on the currently verified height, + /// and the number of hashes we haven't queued yet.. + fn lookahead_limit(&self, new_hashes: usize) -> usize { + let max_checkpoint_height: usize = self + .max_checkpoint_height + .0 + .try_into() + .expect("fits in usize"); + + // When the state is empty, we want to verify using checkpoints + let verified_height: usize = self + .latest_chain_tip + .best_tip_height() + .unwrap_or(Height(0)) + .0 + .try_into() + .expect("fits in usize"); + + if verified_height >= max_checkpoint_height { + self.full_verify_concurrency_limit + } else if (verified_height + new_hashes) >= max_checkpoint_height { + // If we're just about to start full verification, allow enough for the remaining checkpoint, + // and also enough for a separate full verification lookahead. + let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height; + + self.full_verify_concurrency_limit + checkpoint_hashes + } else { + self.checkpoint_verify_concurrency_limit + } } /// Handles a response for a requested block. /// - /// Returns `Ok` if the block was successfully verified and committed to the state, or if an - /// expected error occurred, so that the synchronization can continue normally. - /// - /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart. + /// See [`Self::handle_response`] for more details. fn handle_block_response( - response: Result, + &mut self, + response: Result<(Height, block::Hash), BlockDownloadVerifyError>, ) -> Result<(), BlockDownloadVerifyError> { match response { - Ok(hash) => trace!(?hash, "verified and committed block to state"), - Err(_) => return Self::handle_response(response.map(|_| ())), - } + Ok((height, hash)) => { + trace!(?height, ?hash, "verified and committed block to state"); - Ok(()) + Ok(()) + } + Err(_) => Self::handle_response(response), + } + } + + /// Handles a response to block hash submission, passing through any extra hashes. + /// + /// See [`Self::handle_response`] for more details. + fn handle_hash_response( + response: Result, BlockDownloadVerifyError>, + ) -> Result, BlockDownloadVerifyError> { + match response { + Ok(extra_hashes) => Ok(extra_hashes), + Err(_) => Self::handle_response(response).map(|()| IndexSet::new()), + } } /// Handles a response to a syncer request. @@ -825,23 +928,26 @@ where /// so that the synchronization can continue normally. /// /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart. - fn handle_response( - response: Result<(), BlockDownloadVerifyError>, + fn handle_response( + response: Result, ) -> Result<(), BlockDownloadVerifyError> { - if let Err(error) = response { - // TODO: exit syncer on permanent service errors (NetworkError, VerifierError) - if Self::should_restart_sync(&error) { - return Err(error); + match response { + Ok(_t) => Ok(()), + Err(error) => { + // TODO: exit syncer on permanent service errors (NetworkError, VerifierError) + if Self::should_restart_sync(&error) { + Err(error) + } else { + Ok(()) + } } } - - Ok(()) } /// Returns `true` if the hash is present in the state, and `false` /// if the hash is not present in the state. /// - /// BUG: check if the hash is in any chain (#862) + /// TODO BUG: check if the hash is in any chain (#862) /// Depth only checks the main chain. async fn state_contains(&mut self, hash: block::Hash) -> Result { match self diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 02cf168fd..4c65ae92a 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -20,7 +20,7 @@ use tower::{hedge, Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::{ - block::{self, Block}, + block::{self, Block, Height}, chain_tip::ChainTip, }; use zebra_network as zn; @@ -169,8 +169,9 @@ where // Internal downloads state /// A list of pending block download and verify tasks. #[pin] - pending: - FuturesUnordered>>, + pending: FuturesUnordered< + JoinHandle>, + >, /// A list of channels that can be used to cancel pending block download and /// verify tasks. @@ -189,7 +190,7 @@ where ZV::Future: Send, ZSTip: ChainTip + Clone + Send + 'static, { - type Item = Result; + type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -204,9 +205,10 @@ where // TODO: this would be cleaner with poll_map (#2693) if let Some(join_result) = ready!(this.pending.poll_next(cx)) { match join_result.expect("block download and verify tasks must not panic") { - Ok(hash) => { + Ok((height, hash)) => { this.cancel_handles.remove(&hash); - Poll::Ready(Some(Ok(hash))) + + Poll::Ready(Some(Ok((height, hash)))) } Err((e, hash)) => { this.cancel_handles.remove(&hash); @@ -325,6 +327,7 @@ where // that will timeout before being verified. let tip_height = latest_chain_tip.best_tip_height(); + // TODO: don't use VERIFICATION_PIPELINE_SCALING_MULTIPLIER for full verification? let max_lookahead_height = if let Some(tip_height) = tip_height { // Scale the height limit with the lookahead limit, // so users with low capacity or under DoS can reduce them both. @@ -373,9 +376,7 @@ where ?tip_height, ?max_lookahead_height, lookahead_limit = ?lookahead_limit, - "synced block height too far ahead of the tip: dropped downloaded block. \ - Hint: Try increasing the value of the lookahead_limit field \ - in the sync section of the configuration file." + "synced block height too far ahead of the tip: dropped downloaded block", ); metrics::counter!("sync.max.height.limit.dropped.block.count", 1); @@ -435,12 +436,14 @@ where metrics::counter!("sync.verified.block.count", 1); } - verification.map_err(|err| { - match err.downcast::() { - Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash }, - Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash }, - } - }) + verification + .map(|hash| (block_height, hash)) + .map_err(|err| { + match err.downcast::() { + Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash }, + Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash }, + } + }) } .in_current_span() // Tack the hash onto the error so we can remove the cancel handle diff --git a/zebrad/src/components/sync/tests/timing.rs b/zebrad/src/components/sync/tests/timing.rs index b2b45e6d3..ba2fe6485 100644 --- a/zebrad/src/components/sync/tests/timing.rs +++ b/zebrad/src/components/sync/tests/timing.rs @@ -11,7 +11,10 @@ use std::{ use futures::future; use tokio::time::{timeout, Duration}; -use zebra_chain::parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING}; +use zebra_chain::{ + block::Height, + parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING}, +}; use zebra_network::constants::{ DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT, INVENTORY_ROTATION_INTERVAL, }; @@ -163,6 +166,7 @@ fn request_genesis_is_rate_limited() { // start the sync let (mut chain_sync, _) = ChainSync::new( &ZebradConfig::default(), + Height(0), peer_service, verifier_service, state_service, diff --git a/zebrad/src/components/sync/tests/vectors.rs b/zebrad/src/components/sync/tests/vectors.rs index 871ffdfb4..3e7ab2b1d 100644 --- a/zebrad/src/components/sync/tests/vectors.rs +++ b/zebrad/src/components/sync/tests/vectors.rs @@ -6,7 +6,7 @@ use color_eyre::Report; use futures::{Future, FutureExt}; use zebra_chain::{ - block::{self, Block}, + block::{self, Block, Height}, chain_tip::mock::{MockChainTip, MockChainTipSender}, serialization::ZcashDeserializeInto, }; @@ -966,6 +966,7 @@ fn setup() -> ( let (chain_sync, sync_status) = ChainSync::new( &config, + Height(0), peer_set.clone(), chain_verifier.clone(), state_service.clone(), diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index 4dfb77599..c8c4592e1 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -174,16 +174,15 @@ impl Default for MetricsSection { #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields, default)] pub struct SyncSection { - /// The maximum number of concurrent block download requests during sync. + /// The number of parallel block download requests. /// /// This is set to a low value by default, to avoid task and /// network contention. Increasing this value may improve - /// performance on machines with many cores and a fast network - /// connection. - pub max_concurrent_block_requests: usize, + /// performance on machines with a fast network connection. + #[serde(alias = "max_concurrent_block_requests")] + pub download_concurrency_limit: usize, - /// Controls how far ahead of the chain tip the syncer tries to - /// download before waiting for queued verifications to complete. + /// The number of blocks submitted in parallel to the checkpoint verifier. /// /// Increasing this limit increases the buffer size, so it reduces /// the impact of an individual block request failing. However, it @@ -198,16 +197,35 @@ pub struct SyncSection { /// astray. /// /// For reliable checkpoint syncing, Zebra enforces a - /// [`MIN_LOOKAHEAD_LIMIT`](sync::MIN_LOOKAHEAD_LIMIT). - pub lookahead_limit: usize, + /// [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`](sync::MIN_CHECKPOINT_CONCURRENCY_LIMIT). + /// + /// This is set to a high value by default, to avoid verification pipeline stalls. + /// Decreasing this value reduces RAM usage. + #[serde(alias = "lookahead_limit")] + pub checkpoint_verify_concurrency_limit: usize, + + /// The number of blocks submitted in parallel to the full verifier. + /// + /// This is set to a low value by default, to avoid verification timeouts on large blocks. + /// Increasing this value may improve performance on machines with many cores. + pub full_verify_concurrency_limit: usize, } impl Default for SyncSection { fn default() -> Self { Self { - // TODO: increase to 50, after we implement orchard batching - max_concurrent_block_requests: 40, - lookahead_limit: sync::DEFAULT_LOOKAHEAD_LIMIT, + // 2/3 of the default outbound peer limit. + download_concurrency_limit: 50, + + // A few max-length checkpoints. + checkpoint_verify_concurrency_limit: sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT, + + // Guaranteed to verify a bunch of large blocks in under 60 seconds + // (so that the committed block height changes in every progress log). + // + // TODO: when we implement orchard proof batching, try increasing to 100 or more + // limit full verification concurrency based on block transaction counts? + full_verify_concurrency_limit: 30, } } } diff --git a/zebrad/tests/common/config.rs b/zebrad/tests/common/config.rs index 25afa16db..3a22e4ee4 100644 --- a/zebrad/tests/common/config.rs +++ b/zebrad/tests/common/config.rs @@ -36,7 +36,7 @@ pub fn default_test_config() -> Result { let sync = SyncSection { // Avoid downloading unnecessary blocks. - lookahead_limit: sync::MIN_LOOKAHEAD_LIMIT, + checkpoint_verify_concurrency_limit: sync::MIN_CHECKPOINT_CONCURRENCY_LIMIT, ..SyncSection::default() }; diff --git a/zebrad/tests/common/lightwalletd.rs b/zebrad/tests/common/lightwalletd.rs index a50de8934..8581da6fe 100644 --- a/zebrad/tests/common/lightwalletd.rs +++ b/zebrad/tests/common/lightwalletd.rs @@ -298,7 +298,8 @@ impl LightwalletdTestType { Err(error) => return Some(Err(error)), }; - config.sync.lookahead_limit = zebrad::components::sync::DEFAULT_LOOKAHEAD_LIMIT; + config.sync.checkpoint_verify_concurrency_limit = + zebrad::components::sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT; config.state.ephemeral = false; config.state.cache_dir = zebra_state_path; diff --git a/zebrad/tests/common/sync.rs b/zebrad/tests/common/sync.rs index f0e6fca16..b43cfe116 100644 --- a/zebrad/tests/common/sync.rs +++ b/zebrad/tests/common/sync.rs @@ -75,7 +75,7 @@ pub const FINISH_PARTIAL_SYNC_TIMEOUT: Duration = Duration::from_secs(60 * 60); /// But if we're going to be downloading lots of blocks, we use the default lookahead limit, /// so that the sync is faster. This can increase the RAM needed for tests. pub const MIN_HEIGHT_FOR_DEFAULT_LOOKAHEAD: Height = - Height(3 * sync::DEFAULT_LOOKAHEAD_LIMIT as u32); + Height(3 * sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT as u32); /// What the expected behavior of the mempool is for a test that uses [`sync_until`]. pub enum MempoolBehavior { @@ -204,7 +204,8 @@ pub fn sync_until( // Use the default lookahead limit if we're syncing lots of blocks. // (Most tests use a smaller limit to minimise redundant block downloads.) if height > MIN_HEIGHT_FOR_DEFAULT_LOOKAHEAD { - config.sync.lookahead_limit = sync::DEFAULT_LOOKAHEAD_LIMIT; + config.sync.checkpoint_verify_concurrency_limit = + sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT; } let tempdir = if let Some(reuse_tempdir) = reuse_tempdir { @@ -317,7 +318,7 @@ pub fn cached_mandatory_checkpoint_test_config() -> Result { // If we're syncing past the checkpoint with cached state, we don't need the extra lookahead. // But the extra downloaded blocks shouldn't slow down the test that much, // and testing with the defaults gives us better test coverage. - config.sync.lookahead_limit = sync::DEFAULT_LOOKAHEAD_LIMIT; + config.sync.checkpoint_verify_concurrency_limit = sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT; Ok(config) }