fix(sync): Temporarily set full verification concurrency to 30 blocks (#4726)

* Return the maximum checkpoint height from the chain verifier

* Return the verified block height from the sync downloader

* Track the verified height in the syncer

* Use a lower concurrency limit during full verification

* Get the tip from the state before the first verified block

* Limit the number of submitted download and verify blocks in a batch

* Adjust lookahead limits when transitioning to full verification

* Keep unused extra hashes and submit them to the downloader later

* Remove redundant verified_height and state_tip()

* Split the checkpoint and full verify concurrency configs

* Decrease full verification concurrency to 5 blocks

10 concurrent blocks causes 3 minute stalls on some blocks on my machine.
(And it has about 4x as many cores as a standard machine.)

* cargo +stable fmt --all

* Remove a log that's verbose with smaller lookahead limits

* Apply the full verify concurrency limit to the inbound service

* Add a summary of the config changes to the CHANGELOG

* Increase the default full verify concurrency limit to 30
This commit is contained in:
teor 2022-07-07 00:13:57 +10:00 committed by GitHub
parent 383f83e5d9
commit 87f4308caf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 363 additions and 161 deletions

View File

@ -7,8 +7,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## Next Release (Draft)
(Draft notes for the next release can be added here.)
This release improves Zebra's sync and verification performance under heavy load.
(TODO - complete the summary.)
### Configuration Changes
- Split the checkpoint and full verification [`sync` concurrency options](https://doc.zebra.zfnd.org/zebrad/config/struct.SyncSection.html) (#4726):
- Add a new `full_verify_concurrency_limit`
- Rename `max_concurrent_block_requests` to `download_concurrency_limit`
- Rename `lookahead_limit` to `checkpoint_verify_concurrency_limit`
For backwards compatibility, the old names are still accepted as aliases.
(TODO - insert changelog here)
## [Zebra 1.0.0-beta.12](https://github.com/ZcashFoundation/zebra/releases/tag/v1.0.0-beta.12) - 2022-06-29

View File

@ -27,7 +27,7 @@ use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::{instrument, Span};
use zebra_chain::{
block::{self, Block},
block::{self, Block, Height},
parameters::Network,
};
@ -163,7 +163,8 @@ where
/// config parameter and if the download is not already started.
///
/// Returns a block verifier, transaction verifier,
/// and the Groth16 parameter download task [`JoinHandle`].
/// the Groth16 parameter download task [`JoinHandle`],
/// and the maximum configured checkpoint verification height.
///
/// The consensus configuration is specified by `config`, and the Zcash network
/// to verify blocks for is specified by `network`.
@ -203,6 +204,7 @@ pub async fn init<S>(
transaction::Request,
>,
JoinHandle<()>,
Height,
)
where
S: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
@ -266,5 +268,10 @@ where
let chain = Buffer::new(BoxService::new(chain), VERIFIER_BUFFER_BOUND);
(chain, transaction, groth16_download_handle)
(
chain,
transaction,
groth16_download_handle,
max_checkpoint_height,
)
}

View File

@ -64,7 +64,7 @@ async fn verifiers_from_network(
+ 'static,
) {
let state_service = zs::init_test(network);
let (chain_verifier, _transaction_verifier, _groth16_download_handle) =
let (chain_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) =
crate::chain::init(Config::default(), network, state_service.clone(), true).await;
// We can drop the download task handle here, because:
@ -161,7 +161,7 @@ async fn verify_checkpoint(config: Config) -> Result<(), Report> {
// init_from_verifiers.
//
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (chain_verifier, _transaction_verifier, _groth16_download_handle) =
let (chain_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) =
super::init(config.clone(), network, zs::init_test(network), true).await;
// Add a timeout layer

View File

@ -67,8 +67,6 @@
//!
//! Some of the diagnostic features are optional, and need to be enabled at compile-time.
use std::cmp::max;
use abscissa_core::{config, Command, FrameworkError, Options, Runnable};
use color_eyre::eyre::{eyre, Report};
use futures::FutureExt;
@ -119,13 +117,16 @@ impl StartCmd {
let inbound = ServiceBuilder::new()
.load_shed()
.buffer(inbound::downloads::MAX_INBOUND_CONCURRENCY)
.service(Inbound::new(setup_rx));
.service(Inbound::new(
config.sync.full_verify_concurrency_limit,
setup_rx,
));
let (peer_set, address_book) =
zebra_network::init(config.network.clone(), inbound, latest_chain_tip.clone()).await;
info!("initializing verifiers");
let (chain_verifier, tx_verifier, mut groth16_download_handle) =
let (chain_verifier, tx_verifier, mut groth16_download_handle, max_checkpoint_height) =
zebra_consensus::chain::init(
config.consensus.clone(),
config.network.network,
@ -137,6 +138,7 @@ impl StartCmd {
info!("initializing syncer");
let (syncer, sync_status) = ChainSync::new(
&config,
max_checkpoint_height,
peer_set.clone(),
chain_verifier.clone(),
state.clone(),
@ -342,15 +344,19 @@ impl StartCmd {
fn state_buffer_bound() -> usize {
let config = app_config().clone();
// Ignore the checkpoint verify limit, because it is very large.
//
// TODO: do we also need to account for concurrent use across services?
// we could multiply the maximum by 3/2, or add a fixed constant
max(
config.sync.max_concurrent_block_requests,
max(
inbound::downloads::MAX_INBOUND_CONCURRENCY,
mempool::downloads::MAX_INBOUND_CONCURRENCY,
),
)
[
config.sync.download_concurrency_limit,
config.sync.full_verify_concurrency_limit,
inbound::downloads::MAX_INBOUND_CONCURRENCY,
mempool::downloads::MAX_INBOUND_CONCURRENCY,
]
.into_iter()
.max()
.unwrap()
}
}

View File

@ -35,9 +35,10 @@ use zebra_network::{
};
use zebra_node_services::mempool;
use crate::BoxError;
// Re-use the syncer timeouts for consistency.
use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::BoxError;
use InventoryResponse::*;
@ -85,6 +86,13 @@ pub enum Setup {
///
/// All requests are ignored.
Pending {
// Configuration
//
/// The configured full verification concurrency limit.
full_verify_concurrency_limit: usize,
// Services
//
/// A oneshot channel used to receive required services,
/// after they are set up.
setup: oneshot::Receiver<InboundSetupData>,
@ -94,6 +102,8 @@ pub enum Setup {
///
/// All requests are answered.
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,
@ -169,9 +179,15 @@ impl Inbound {
/// Create a new inbound service.
///
/// Dependent services are sent via the `setup` channel after initialization.
pub fn new(setup: oneshot::Receiver<InboundSetupData>) -> Inbound {
pub fn new(
full_verify_concurrency_limit: usize,
setup: oneshot::Receiver<InboundSetupData>,
) -> Inbound {
Inbound {
setup: Setup::Pending { setup },
setup: Setup::Pending {
full_verify_concurrency_limit,
setup,
},
}
}
@ -200,7 +216,10 @@ impl Service<zn::Request> for Inbound {
let result;
self.setup = match self.take_setup() {
Setup::Pending { mut setup } => match setup.try_recv() {
Setup::Pending {
full_verify_concurrency_limit,
mut setup,
} => match setup.try_recv() {
Ok(setup_data) => {
let InboundSetupData {
address_book,
@ -212,6 +231,7 @@ impl Service<zn::Request> for Inbound {
} = setup_data;
let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set.clone(), BLOCK_DOWNLOAD_TIMEOUT),
Timeout::new(block_verifier, BLOCK_VERIFY_TIMEOUT),
state.clone(),
@ -229,7 +249,10 @@ impl Service<zn::Request> for Inbound {
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
result = Ok(());
Setup::Pending { setup }
Setup::Pending {
full_verify_concurrency_limit,
setup,
}
}
Err(error @ TryRecvError::Closed) => {
// Mark the service as failed, because setup failed
@ -256,6 +279,7 @@ impl Service<zn::Request> for Inbound {
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}
result = Ok(());
Setup::Initialized {
address_book,
block_downloads,

View File

@ -25,6 +25,8 @@ use zebra_chain::{
use zebra_network as zn;
use zebra_state as zs;
use crate::components::sync::MIN_CONCURRENCY_LIMIT;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// The maximum number of concurrent inbound download and verify tasks.
@ -64,7 +66,7 @@ pub enum DownloadAction {
/// The queue is at capacity, so this request was ignored.
///
/// The sync service should discover this block later, when we are closer
/// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
/// to the tip. The queue's capacity is [`Downloads.full_verify_concurrency_limit`].
FullQueue,
}
@ -80,7 +82,13 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
// Configuration
//
/// The configured full verification concurrency limit, after applying the minimum limit.
full_verify_concurrency_limit: usize,
// Services
//
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
@ -95,6 +103,7 @@ where
latest_chain_tip: zs::LatestChainTip,
// Internal downloads state
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<JoinHandle<Result<block::Hash, (BoxError, block::Hash)>>>,
@ -162,8 +171,19 @@ where
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS, latest_chain_tip: zs::LatestChainTip) -> Self {
pub fn new(
full_verify_concurrency_limit: usize,
network: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: zs::LatestChainTip,
) -> Self {
// The syncer already warns about the minimum.
let full_verify_concurrency_limit =
full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);
Self {
full_verify_concurrency_limit,
network,
verifier,
state,
@ -182,8 +202,8 @@ where
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
concurrency_limit = self.full_verify_concurrency_limit,
"block hash already queued for inbound download: ignored block",
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);
@ -192,12 +212,12 @@ where
return DownloadAction::AlreadyQueued;
}
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
if self.pending.len() >= self.full_verify_concurrency_limit {
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
concurrency_limit = self.full_verify_concurrency_limit,
"too many blocks queued for inbound download: ignored block",
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);
@ -213,6 +233,7 @@ where
let network = self.network.clone();
let verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();
let full_verify_concurrency_limit = self.full_verify_concurrency_limit;
let fut = async move {
// Check if the block is already in the state.
@ -232,7 +253,7 @@ where
assert_eq!(
blocks.len(),
1,
"wrong number of blocks in response to a single hash"
"wrong number of blocks in response to a single hash",
);
blocks
@ -257,11 +278,11 @@ where
let tip_height = latest_chain_tip.best_tip_height();
let max_lookahead_height = if let Some(tip_height) = tip_height {
let lookahead = i32::try_from(MAX_INBOUND_CONCURRENCY).expect("fits in i32");
let lookahead = i32::try_from(full_verify_concurrency_limit).expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
} else {
let genesis_lookahead =
u32::try_from(MAX_INBOUND_CONCURRENCY - 1).expect("fits in u32");
u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)
};
@ -296,8 +317,8 @@ where
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = ?MAX_INBOUND_CONCURRENCY,
"gossiped block height too far ahead of the tip: dropped downloaded block"
lookahead_limit = full_verify_concurrency_limit,
"gossiped block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("gossip.max.height.limit.dropped.block.count", 1);
@ -309,7 +330,7 @@ where
?tip_height,
?min_accepted_height,
behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
"gossiped block height behind the finalized tip: dropped downloaded block"
"gossiped block height behind the finalized tip: dropped downloaded block",
);
metrics::counter!("gossip.min.height.limit.dropped.block.count", 1);
@ -353,8 +374,8 @@ where
debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued hash for download"
concurrency_limit = self.full_verify_concurrency_limit,
"queued hash for download",
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as f64);

View File

@ -29,7 +29,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::{Inbound, InboundSetupData},
inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData},
mempool::{
gossip_mempool_transaction_id, unmined_transactions_in_blocks, Config as MempoolConfig,
Mempool, MempoolError, SameEffectsChainRejectionError, UnboxMempoolError,
@ -708,7 +708,7 @@ async fn setup(
let mut state_service = ServiceBuilder::new().buffer(1).service(state);
// Download task panics and timeouts are propagated to the tests that use Groth16 verifiers.
let (block_verifier, _transaction_verifier, _groth16_download_handle) =
let (block_verifier, _transaction_verifier, _groth16_download_handle, _max_checkpoint_height) =
zebra_consensus::chain::init(
consensus_config.clone(),
network,
@ -785,7 +785,7 @@ async fn setup(
let inbound_service = ServiceBuilder::new()
.load_shed()
.service(Inbound::new(setup_rx));
.service(Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx));
let inbound_service = BoxService::new(inbound_service);
let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service);

View File

@ -29,7 +29,7 @@ use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::{
components::{
inbound::{Inbound, InboundSetupData},
inbound::{downloads::MAX_INBOUND_CONCURRENCY, Inbound, InboundSetupData},
mempool::{gossip_mempool_transaction_id, Config as MempoolConfig, Mempool},
sync::{self, BlockGossipError, SyncStatus},
},
@ -637,7 +637,7 @@ async fn setup(
// Inbound
let (setup_tx, setup_rx) = oneshot::channel();
let inbound_service = Inbound::new(setup_rx);
let inbound_service = Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx);
let inbound_service = ServiceBuilder::new()
.boxed_clone()
.load_shed()

View File

@ -2,7 +2,7 @@
//!
//! It is used when Zebra is a long way behind the current chain tip.
use std::{collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration};
use std::{cmp::max, collections::HashSet, pin::Pin, sync::Arc, task::Poll, time::Duration};
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
@ -14,7 +14,7 @@ use tower::{
};
use zebra_chain::{
block::{self, Block},
block::{self, Block, Height},
chain_tip::ChainTip,
parameters::genesis_hash,
};
@ -57,7 +57,7 @@ const FANOUT: usize = 3;
/// retries may be concurrent, inner retries are sequential.
const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
/// A lower bound on the user-specified lookahead limit.
/// A lower bound on the user-specified checkpoint verification concurrency limit.
///
/// Set to the maximum checkpoint interval, so the pipeline holds around a checkpoint's
/// worth of blocks.
@ -76,14 +76,20 @@ const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
/// Once these malicious blocks start failing validation, the syncer will cancel all
/// the pending download and verify tasks, drop all the blocks, and start a new
/// ObtainTips with a new set of peers.
pub const MIN_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
/// The default for the user-specified lookahead limit.
///
/// See [`MIN_LOOKAHEAD_LIMIT`] for details.
/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
///
/// TODO: increase to `MAX_CHECKPOINT_HEIGHT_GAP * 5`, after we implement orchard batching
pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3;
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize =
zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3;
/// A lower bound on the user-specified concurrency limit.
///
/// If the concurrency limit is 0, Zebra can't download or verify any blocks.
pub const MIN_CONCURRENCY_LIMIT: usize = 1;
/// The expected maximum number of hashes in an ObtainTips or ExtendTips response.
///
@ -91,7 +97,7 @@ pub const DEFAULT_LOOKAHEAD_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGH
/// but still limit the number of blocks in the pipeline between the downloader and
/// the state.
///
/// See [`MIN_LOOKAHEAD_LIMIT`] for details.
/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
/// Controls how long we wait for a tips response to return.
@ -214,13 +220,21 @@ where
ZSTip: ChainTip + Clone + Send + 'static,
{
// Configuration
//
/// The genesis hash for the configured network
genesis_hash: block::Hash,
/// The configured lookahead limit, after applying the minimum limit.
lookahead_limit: usize,
/// The largest block height for the checkpoint verifier, based on the current config.
max_checkpoint_height: Height,
/// The configured checkpoint verification concurrency limit, after applying the minimum limit.
checkpoint_verify_concurrency_limit: usize,
/// The configured full verification concurrency limit, after applying the minimum limit.
full_verify_concurrency_limit: usize,
// Services
//
/// A network service which is used to perform ObtainTips and ExtendTips
/// requests.
///
@ -246,6 +260,7 @@ where
latest_chain_tip: ZSTip,
// Internal sync state
//
/// The tips that the syncer is currently following.
prospective_tips: HashSet<CheckedTip>,
@ -291,11 +306,44 @@ where
/// Also returns a [`SyncStatus`] to check if the syncer has likely reached the chain tip.
pub fn new(
config: &ZebradConfig,
max_checkpoint_height: Height,
peers: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: ZSTip,
) -> (Self, SyncStatus) {
let mut download_concurrency_limit = config.sync.download_concurrency_limit;
let mut checkpoint_verify_concurrency_limit =
config.sync.checkpoint_verify_concurrency_limit;
let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit;
if download_concurrency_limit < MIN_CONCURRENCY_LIMIT {
warn!(
"configured download concurrency limit {} too low, increasing to {}",
config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT,
);
download_concurrency_limit = MIN_CONCURRENCY_LIMIT;
}
if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT {
warn!(
"configured checkpoint verify concurrency limit {} too low, increasing to {}",
config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT,
);
checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT;
}
if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT {
warn!(
"configured full verify concurrency limit {} too low, increasing to {}",
config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT,
);
full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT;
}
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
// The Hedge middleware is the outermost layer, hedging requests
// between two retry-wrapped networks. The innermost timeout
@ -306,12 +354,9 @@ where
// abstracts away spurious failures from individual peers
// making a less-fallible network service, and the Hedge layer
// tries to reduce latency of that less-fallible service.
//
// XXX add ServiceBuilder::hedge() so this becomes
// ServiceBuilder::new().hedge(...).retry(...)...
let block_network = Hedge::new(
ServiceBuilder::new()
.concurrency_limit(config.sync.max_concurrent_block_requests)
.concurrency_limit(download_concurrency_limit)
.retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
.timeout(BLOCK_DOWNLOAD_TIMEOUT)
.service(peers),
@ -324,27 +369,23 @@ where
// We apply a timeout to the verifier to avoid hangs due to missing earlier blocks.
let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
let mut lookahead_limit = config.sync.lookahead_limit;
if lookahead_limit < MIN_LOOKAHEAD_LIMIT {
warn!(
"configured lookahead limit {} too low, increasing to {}",
config.sync.lookahead_limit, MIN_LOOKAHEAD_LIMIT,
);
lookahead_limit = MIN_LOOKAHEAD_LIMIT;
}
let (sync_status, recent_syncs) = SyncStatus::new();
let new_syncer = Self {
genesis_hash: genesis_hash(config.network.network),
lookahead_limit,
max_checkpoint_height,
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
tip_network,
downloads: Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
lookahead_limit,
// TODO: change the download lookahead for full verification?
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
)),
state,
latest_chain_tip,
@ -397,58 +438,62 @@ where
state_tip = ?self.latest_chain_tip.best_tip_height(),
"starting sync, obtaining new tips"
);
if let Err(e) = self.obtain_tips().await {
let mut extra_hashes = self.obtain_tips().await.map_err(|e| {
info!("temporary error obtaining tips: {:#}", e);
return Err(e);
}
e
})?;
self.update_metrics();
while !self.prospective_tips.is_empty() {
while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() {
// Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
Self::handle_block_response(rsp)?;
self.handle_block_response(rsp)?;
}
self.update_metrics();
// If we have too many pending tasks, wait for some to finish.
//
// Starting to wait is interesting, but logging each wait can be
// very verbose.
if self.downloads.in_flight() > self.lookahead_limit {
tracing::info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
"waiting for pending blocks",
);
}
while self.downloads.in_flight() > self.lookahead_limit {
while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) {
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
extra_hashes = extra_hashes.len(),
lookahead_limit = self.lookahead_limit(extra_hashes.len()),
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting for pending blocks",
);
let response = self.downloads.next().await.expect("downloads is nonempty");
Self::handle_block_response(response)?;
self.handle_block_response(response)?;
self.update_metrics();
}
// Once we're below the lookahead limit, we can keep extending the tips.
info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
);
// Once we're below the lookahead limit, we can request more blocks or hashes.
if !extra_hashes.is_empty() {
debug!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
extra_hashes = extra_hashes.len(),
lookahead_limit = self.lookahead_limit(extra_hashes.len()),
state_tip = ?self.latest_chain_tip.best_tip_height(),
"requesting more blocks",
);
if let Err(e) = self.extend_tips().await {
info!("temporary error extending tips: {:#}", e);
return Err(e);
let response = self.request_blocks(extra_hashes).await;
extra_hashes = Self::handle_hash_response(response)?;
} else {
info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
extra_hashes = extra_hashes.len(),
lookahead_limit = self.lookahead_limit(extra_hashes.len()),
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
);
extra_hashes = self.extend_tips().await.map_err(|e| {
info!("temporary error extending tips: {:#}", e);
e
})?;
}
self.update_metrics();
}
@ -461,7 +506,7 @@ where
/// Given a block_locator list fan out request for subsequent hashes to
/// multiple peers
#[instrument(skip(self))]
async fn obtain_tips(&mut self) -> Result<(), Report> {
async fn obtain_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
let block_locator = self
.state
.ready()
@ -604,13 +649,12 @@ where
self.recent_syncs.push_obtain_tips_length(new_downloads);
let response = self.request_blocks(download_set).await;
Self::handle_response(response)?;
Ok(())
Self::handle_hash_response(response).map_err(Into::into)
}
#[instrument(skip(self))]
async fn extend_tips(&mut self) -> Result<(), Report> {
async fn extend_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
let tips = std::mem::take(&mut self.prospective_tips);
let mut download_set = IndexSet::new();
@ -742,9 +786,8 @@ where
self.recent_syncs.push_extend_tips_length(new_downloads);
let response = self.request_blocks(download_set).await;
Self::handle_response(response)?;
Ok(())
Self::handle_hash_response(response).map_err(Into::into)
}
/// Download and verify the genesis block, if it isn't currently known to
@ -766,7 +809,9 @@ where
let response = self.downloads.next().await.expect("downloads is nonempty");
match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"),
Ok(response) => self
.handle_block_response(Ok(response))
.expect("never returns Err for Ok"),
Err(error) => {
// TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
@ -789,34 +834,92 @@ where
Ok(())
}
/// Queue download and verify tasks for each block that isn't currently known to our node
/// Queue download and verify tasks for each block that isn't currently known to our node.
///
/// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel?
async fn request_blocks(
&mut self,
hashes: IndexSet<block::Hash>,
) -> Result<(), BlockDownloadVerifyError> {
debug!(hashes.len = hashes.len(), "requesting blocks");
mut hashes: IndexSet<block::Hash>,
) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
let lookahead_limit = self.lookahead_limit(hashes.len());
debug!(
hashes.len = hashes.len(),
?lookahead_limit,
"requesting blocks",
);
let extra_hashes = if hashes.len() > lookahead_limit {
hashes.split_off(lookahead_limit)
} else {
IndexSet::new()
};
for hash in hashes.into_iter() {
self.downloads.download_and_verify(hash).await?;
}
Ok(())
Ok(extra_hashes)
}
/// The configured lookahead limit, based on the currently verified height,
/// and the number of hashes we haven't queued yet..
fn lookahead_limit(&self, new_hashes: usize) -> usize {
let max_checkpoint_height: usize = self
.max_checkpoint_height
.0
.try_into()
.expect("fits in usize");
// When the state is empty, we want to verify using checkpoints
let verified_height: usize = self
.latest_chain_tip
.best_tip_height()
.unwrap_or(Height(0))
.0
.try_into()
.expect("fits in usize");
if verified_height >= max_checkpoint_height {
self.full_verify_concurrency_limit
} else if (verified_height + new_hashes) >= max_checkpoint_height {
// If we're just about to start full verification, allow enough for the remaining checkpoint,
// and also enough for a separate full verification lookahead.
let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height;
self.full_verify_concurrency_limit + checkpoint_hashes
} else {
self.checkpoint_verify_concurrency_limit
}
}
/// Handles a response for a requested block.
///
/// Returns `Ok` if the block was successfully verified and committed to the state, or if an
/// expected error occurred, so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
/// See [`Self::handle_response`] for more details.
fn handle_block_response(
response: Result<block::Hash, BlockDownloadVerifyError>,
&mut self,
response: Result<(Height, block::Hash), BlockDownloadVerifyError>,
) -> Result<(), BlockDownloadVerifyError> {
match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(_) => return Self::handle_response(response.map(|_| ())),
}
Ok((height, hash)) => {
trace!(?height, ?hash, "verified and committed block to state");
Ok(())
Ok(())
}
Err(_) => Self::handle_response(response),
}
}
/// Handles a response to block hash submission, passing through any extra hashes.
///
/// See [`Self::handle_response`] for more details.
fn handle_hash_response(
response: Result<IndexSet<block::Hash>, BlockDownloadVerifyError>,
) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
match response {
Ok(extra_hashes) => Ok(extra_hashes),
Err(_) => Self::handle_response(response).map(|()| IndexSet::new()),
}
}
/// Handles a response to a syncer request.
@ -825,23 +928,26 @@ where
/// so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
fn handle_response(
response: Result<(), BlockDownloadVerifyError>,
fn handle_response<T>(
response: Result<T, BlockDownloadVerifyError>,
) -> Result<(), BlockDownloadVerifyError> {
if let Err(error) = response {
// TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
return Err(error);
match response {
Ok(_t) => Ok(()),
Err(error) => {
// TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
Err(error)
} else {
Ok(())
}
}
}
Ok(())
}
/// Returns `true` if the hash is present in the state, and `false`
/// if the hash is not present in the state.
///
/// BUG: check if the hash is in any chain (#862)
/// TODO BUG: check if the hash is in any chain (#862)
/// Depth only checks the main chain.
async fn state_contains(&mut self, hash: block::Hash) -> Result<bool, Report> {
match self

View File

@ -20,7 +20,7 @@ use tower::{hedge, Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{
block::{self, Block},
block::{self, Block, Height},
chain_tip::ChainTip,
};
use zebra_network as zn;
@ -169,8 +169,9 @@ where
// Internal downloads state
/// A list of pending block download and verify tasks.
#[pin]
pending:
FuturesUnordered<JoinHandle<Result<block::Hash, (BlockDownloadVerifyError, block::Hash)>>>,
pending: FuturesUnordered<
JoinHandle<Result<(Height, block::Hash), (BlockDownloadVerifyError, block::Hash)>>,
>,
/// A list of channels that can be used to cancel pending block download and
/// verify tasks.
@ -189,7 +190,7 @@ where
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
type Item = Result<block::Hash, BlockDownloadVerifyError>;
type Item = Result<(Height, block::Hash), BlockDownloadVerifyError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
@ -204,9 +205,10 @@ where
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("block download and verify tasks must not panic") {
Ok(hash) => {
Ok((height, hash)) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Ok(hash)))
Poll::Ready(Some(Ok((height, hash))))
}
Err((e, hash)) => {
this.cancel_handles.remove(&hash);
@ -325,6 +327,7 @@ where
// that will timeout before being verified.
let tip_height = latest_chain_tip.best_tip_height();
// TODO: don't use VERIFICATION_PIPELINE_SCALING_MULTIPLIER for full verification?
let max_lookahead_height = if let Some(tip_height) = tip_height {
// Scale the height limit with the lookahead limit,
// so users with low capacity or under DoS can reduce them both.
@ -373,9 +376,7 @@ where
?tip_height,
?max_lookahead_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: dropped downloaded block. \
Hint: Try increasing the value of the lookahead_limit field \
in the sync section of the configuration file."
"synced block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("sync.max.height.limit.dropped.block.count", 1);
@ -435,12 +436,14 @@ where
metrics::counter!("sync.verified.block.count", 1);
}
verification.map_err(|err| {
match err.downcast::<zebra_consensus::chain::VerifyChainError>() {
Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash },
Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
}
})
verification
.map(|hash| (block_height, hash))
.map_err(|err| {
match err.downcast::<zebra_consensus::chain::VerifyChainError>() {
Ok(error) => BlockDownloadVerifyError::Invalid { error: *error, height: block_height, hash },
Err(error) => BlockDownloadVerifyError::ValidationRequestError { error, height: block_height, hash },
}
})
}
.in_current_span()
// Tack the hash onto the error so we can remove the cancel handle

View File

@ -11,7 +11,10 @@ use std::{
use futures::future;
use tokio::time::{timeout, Duration};
use zebra_chain::parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING};
use zebra_chain::{
block::Height,
parameters::{Network, POST_BLOSSOM_POW_TARGET_SPACING},
};
use zebra_network::constants::{
DEFAULT_CRAWL_NEW_PEER_INTERVAL, HANDSHAKE_TIMEOUT, INVENTORY_ROTATION_INTERVAL,
};
@ -163,6 +166,7 @@ fn request_genesis_is_rate_limited() {
// start the sync
let (mut chain_sync, _) = ChainSync::new(
&ZebradConfig::default(),
Height(0),
peer_service,
verifier_service,
state_service,

View File

@ -6,7 +6,7 @@ use color_eyre::Report;
use futures::{Future, FutureExt};
use zebra_chain::{
block::{self, Block},
block::{self, Block, Height},
chain_tip::mock::{MockChainTip, MockChainTipSender},
serialization::ZcashDeserializeInto,
};
@ -966,6 +966,7 @@ fn setup() -> (
let (chain_sync, sync_status) = ChainSync::new(
&config,
Height(0),
peer_set.clone(),
chain_verifier.clone(),
state_service.clone(),

View File

@ -174,16 +174,15 @@ impl Default for MetricsSection {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct SyncSection {
/// The maximum number of concurrent block download requests during sync.
/// The number of parallel block download requests.
///
/// This is set to a low value by default, to avoid task and
/// network contention. Increasing this value may improve
/// performance on machines with many cores and a fast network
/// connection.
pub max_concurrent_block_requests: usize,
/// performance on machines with a fast network connection.
#[serde(alias = "max_concurrent_block_requests")]
pub download_concurrency_limit: usize,
/// Controls how far ahead of the chain tip the syncer tries to
/// download before waiting for queued verifications to complete.
/// The number of blocks submitted in parallel to the checkpoint verifier.
///
/// Increasing this limit increases the buffer size, so it reduces
/// the impact of an individual block request failing. However, it
@ -198,16 +197,35 @@ pub struct SyncSection {
/// astray.
///
/// For reliable checkpoint syncing, Zebra enforces a
/// [`MIN_LOOKAHEAD_LIMIT`](sync::MIN_LOOKAHEAD_LIMIT).
pub lookahead_limit: usize,
/// [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`](sync::MIN_CHECKPOINT_CONCURRENCY_LIMIT).
///
/// This is set to a high value by default, to avoid verification pipeline stalls.
/// Decreasing this value reduces RAM usage.
#[serde(alias = "lookahead_limit")]
pub checkpoint_verify_concurrency_limit: usize,
/// The number of blocks submitted in parallel to the full verifier.
///
/// This is set to a low value by default, to avoid verification timeouts on large blocks.
/// Increasing this value may improve performance on machines with many cores.
pub full_verify_concurrency_limit: usize,
}
impl Default for SyncSection {
fn default() -> Self {
Self {
// TODO: increase to 50, after we implement orchard batching
max_concurrent_block_requests: 40,
lookahead_limit: sync::DEFAULT_LOOKAHEAD_LIMIT,
// 2/3 of the default outbound peer limit.
download_concurrency_limit: 50,
// A few max-length checkpoints.
checkpoint_verify_concurrency_limit: sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT,
// Guaranteed to verify a bunch of large blocks in under 60 seconds
// (so that the committed block height changes in every progress log).
//
// TODO: when we implement orchard proof batching, try increasing to 100 or more
// limit full verification concurrency based on block transaction counts?
full_verify_concurrency_limit: 30,
}
}
}

View File

@ -36,7 +36,7 @@ pub fn default_test_config() -> Result<ZebradConfig> {
let sync = SyncSection {
// Avoid downloading unnecessary blocks.
lookahead_limit: sync::MIN_LOOKAHEAD_LIMIT,
checkpoint_verify_concurrency_limit: sync::MIN_CHECKPOINT_CONCURRENCY_LIMIT,
..SyncSection::default()
};

View File

@ -298,7 +298,8 @@ impl LightwalletdTestType {
Err(error) => return Some(Err(error)),
};
config.sync.lookahead_limit = zebrad::components::sync::DEFAULT_LOOKAHEAD_LIMIT;
config.sync.checkpoint_verify_concurrency_limit =
zebrad::components::sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT;
config.state.ephemeral = false;
config.state.cache_dir = zebra_state_path;

View File

@ -75,7 +75,7 @@ pub const FINISH_PARTIAL_SYNC_TIMEOUT: Duration = Duration::from_secs(60 * 60);
/// But if we're going to be downloading lots of blocks, we use the default lookahead limit,
/// so that the sync is faster. This can increase the RAM needed for tests.
pub const MIN_HEIGHT_FOR_DEFAULT_LOOKAHEAD: Height =
Height(3 * sync::DEFAULT_LOOKAHEAD_LIMIT as u32);
Height(3 * sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT as u32);
/// What the expected behavior of the mempool is for a test that uses [`sync_until`].
pub enum MempoolBehavior {
@ -204,7 +204,8 @@ pub fn sync_until(
// Use the default lookahead limit if we're syncing lots of blocks.
// (Most tests use a smaller limit to minimise redundant block downloads.)
if height > MIN_HEIGHT_FOR_DEFAULT_LOOKAHEAD {
config.sync.lookahead_limit = sync::DEFAULT_LOOKAHEAD_LIMIT;
config.sync.checkpoint_verify_concurrency_limit =
sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT;
}
let tempdir = if let Some(reuse_tempdir) = reuse_tempdir {
@ -317,7 +318,7 @@ pub fn cached_mandatory_checkpoint_test_config() -> Result<ZebradConfig> {
// If we're syncing past the checkpoint with cached state, we don't need the extra lookahead.
// But the extra downloaded blocks shouldn't slow down the test that much,
// and testing with the defaults gives us better test coverage.
config.sync.lookahead_limit = sync::DEFAULT_LOOKAHEAD_LIMIT;
config.sync.checkpoint_verify_concurrency_limit = sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT;
Ok(config)
}