fix(sync): Pause new downloads when Zebra reaches the lookahead limit (#5561)

* Use correct release for getblocktemplate config

* Include at least 2 full checkpoints in the lookahead limit

* Increase full sync timeout to 36 hours

* Only log "synced block height too far ahead of the tip" once

* Replace AboveLookaheadHeightLimit error with pausing the syncer

* Use AboveLookaheadHeightLimit for blocks a very long way from the tip

* Also add the getblocktemplate config, and fix the test message

* Remove an outdated TODO comment

* Allow syncing again when a small number of blocks are in the queue

* Allow some dead code
This commit is contained in:
teor 2022-11-09 14:42:04 +10:00 committed by GitHub
parent 1a6222337e
commit c4fad29824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 323 additions and 62 deletions

View File

@ -377,7 +377,8 @@ jobs:
app_name: zebrad
test_id: full-sync-to-tip
test_description: Test a full sync up to the tip
test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=600'
# The value of FULL_SYNC_MAINNET_TIMEOUT_MINUTES is currently ignored.
test_variables: '-e TEST_FULL_SYNC=1 -e ZEBRA_FORCE_USE_COLOR=1 -e FULL_SYNC_MAINNET_TIMEOUT_MINUTES=0'
# This test runs for longer than 6 hours, so it needs multiple jobs
is_long_test: true
needs_zebra_state: false

View File

@ -35,7 +35,9 @@ pub use request::{FinalizedBlock, HashOrHeight, PreparedBlock, ReadRequest, Requ
pub use response::{ReadResponse, Response};
pub use service::{
chain_tip::{ChainTipChange, LatestChainTip, TipAction},
init, spawn_init, OutputIndex, OutputLocation, TransactionLocation,
init, spawn_init,
watch_receiver::WatchReceiver,
OutputIndex, OutputLocation, TransactionLocation,
};
#[cfg(any(test, feature = "proptest-impl"))]

View File

@ -8,7 +8,7 @@ use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tokio::{sync::watch, time::sleep};
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt,
@ -83,8 +83,7 @@ pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPO
/// The default for the user-specified lookahead limit.
///
/// See [`MIN_CHECKPOINT_CONCURRENCY_LIMIT`] for details.
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize =
zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 2;
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;
/// A lower bound on the user-specified concurrency limit.
///
@ -359,6 +358,10 @@ where
/// The lengths of recent sync responses.
recent_syncs: RecentSyncLengths,
/// Receiver that is `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
}
/// Polls the network to determine whether further blocks are available and
@ -438,6 +441,7 @@ where
}
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
// The Hedge middleware is the outermost layer, hedging requests
// between two retry-wrapped networks. The innermost timeout
// layer is relatively unimportant, because slow requests will
@ -464,27 +468,33 @@ where
let (sync_status, recent_syncs) = SyncStatus::new();
let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);
let downloads = Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
past_lookahead_limit_sender,
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
));
let new_syncer = Self {
genesis_hash: genesis_hash(config.network.network),
max_checkpoint_height,
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
tip_network,
downloads: Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
// TODO: change the download lookahead for full verification?
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
)),
downloads,
state,
latest_chain_tip,
prospective_tips: HashSet::new(),
recent_syncs,
past_lookahead_limit_receiver,
};
(new_syncer, sync_status)
@ -545,7 +555,14 @@ where
}
self.update_metrics();
while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) {
// Pause new downloads while the syncer or downloader are past their lookahead limits.
//
// To avoid a deadlock or long waits for blocks to expire, we ignore the download
// lookahead limit when there are only a small number of blocks waiting.
while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
|| (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
&& self.past_lookahead_limit_receiver.cloned_watch_data())
{
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
@ -957,7 +974,7 @@ where
}
/// The configured lookahead limit, based on the currently verified height,
/// and the number of hashes we haven't queued yet..
/// and the number of hashes we haven't queued yet.
fn lookahead_limit(&self, new_hashes: usize) -> usize {
let max_checkpoint_height: usize = self
.max_checkpoint_height

View File

@ -4,7 +4,7 @@ use std::{
collections::HashMap,
convert::{self, TryFrom},
pin::Pin,
sync::Arc,
sync::{Arc, TryLockError},
task::{Context, Poll},
};
@ -15,7 +15,11 @@ use futures::{
};
use pin_project::pin_project;
use thiserror::Error;
use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
time::timeout,
};
use tower::{hedge, Service, ServiceExt};
use tracing_futures::Instrument;
@ -42,14 +46,17 @@ type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// to hold a few extra tips responses worth of blocks,
/// even if the syncer queue is full. Any unused capacity is shared between both queues.
///
/// If this capacity is exceeded, the downloader will start failing download blocks with
/// [`BlockDownloadVerifyError::AboveLookaheadHeightLimit`], and the syncer will reset.
/// If this capacity is exceeded, the downloader will tell the syncer to pause new downloads.
///
/// Since the syncer queue is limited to the `lookahead_limit`,
/// the rest of the capacity is reserved for the other queues.
/// There is no reserved capacity for the syncer queue:
/// if the other queues stay full, the syncer will eventually time out and reset.
pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 5;
pub const VERIFICATION_PIPELINE_SCALING_MULTIPLIER: usize = 2;
/// The maximum height difference between Zebra's state tip and a downloaded block.
/// Blocks higher than this will get dropped and return an error.
pub const VERIFICATION_PIPELINE_DROP_LIMIT: i32 = 50_000;
#[derive(Copy, Clone, Debug)]
pub(super) struct AlwaysHedge;
@ -89,6 +96,14 @@ pub enum BlockDownloadVerifyError {
hash: block::Hash,
},
/// A downloaded block was a long way ahead of the state chain tip.
/// This error should be very rare during normal operation.
///
/// We need to reset the syncer on this error, to allow the verifier and state to catch up,
/// or prevent it following a bad chain.
///
/// If we don't reset the syncer on this error, it will continue downloading blocks from a bad
/// chain, or blocks far ahead of the current state tip.
#[error("downloaded block was too far ahead of the chain tip: {height:?} {hash:?}")]
AboveLookaheadHeightLimit {
height: block::Height,
@ -157,6 +172,7 @@ where
ZSTip: ChainTip + Clone + Send + 'static,
{
// Services
//
/// A service that forwards requests to connected peers, and returns their
/// responses.
network: ZN,
@ -168,13 +184,24 @@ where
latest_chain_tip: ZSTip,
// Configuration
//
/// The configured lookahead limit, after applying the minimum limit.
lookahead_limit: usize,
/// The largest block height for the checkpoint verifier, based on the current config.
max_checkpoint_height: Height,
// Shared syncer state
//
/// Sender that is set to `true` when the downloader is past the lookahead limit.
/// This is based on the downloaded block height and the state tip height.
past_lookahead_limit_sender: Arc<std::sync::Mutex<watch::Sender<bool>>>,
/// Receiver for `past_lookahead_limit_sender`, which is used to avoid accessing the mutex.
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
// Internal downloads state
//
/// A list of pending block download and verify tasks.
#[pin]
pending: FuturesUnordered<
@ -259,15 +286,23 @@ where
network: ZN,
verifier: ZV,
latest_chain_tip: ZSTip,
past_lookahead_limit_sender: watch::Sender<bool>,
lookahead_limit: usize,
max_checkpoint_height: Height,
) -> Self {
let past_lookahead_limit_receiver =
zs::WatchReceiver::new(past_lookahead_limit_sender.subscribe());
Self {
network,
verifier,
latest_chain_tip,
lookahead_limit,
max_checkpoint_height,
past_lookahead_limit_sender: Arc::new(std::sync::Mutex::new(
past_lookahead_limit_sender,
)),
past_lookahead_limit_receiver,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
@ -307,9 +342,13 @@ where
let mut verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();
let lookahead_limit = self.lookahead_limit;
let max_checkpoint_height = self.max_checkpoint_height;
let past_lookahead_limit_sender = self.past_lookahead_limit_sender.clone();
let past_lookahead_limit_receiver = self.past_lookahead_limit_receiver.clone();
let task = tokio::spawn(
async move {
// Download the block.
@ -346,19 +385,26 @@ where
// that will timeout before being verified.
let tip_height = latest_chain_tip.best_tip_height();
// TODO: don't use VERIFICATION_PIPELINE_SCALING_MULTIPLIER for full verification?
let max_lookahead_height = if let Some(tip_height) = tip_height {
let (lookahead_drop_height, lookahead_pause_height, lookahead_reset_height) = if let Some(tip_height) = tip_height {
// Scale the height limit with the lookahead limit,
// so users with low capacity or under DoS can reduce them both.
let lookahead = i32::try_from(
let lookahead_pause = i32::try_from(
lookahead_limit + lookahead_limit * VERIFICATION_PIPELINE_SCALING_MULTIPLIER,
)
.expect("fits in i32");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
.expect("fits in i32");
((tip_height + VERIFICATION_PIPELINE_DROP_LIMIT).expect("tip is much lower than Height::MAX"),
(tip_height + lookahead_pause).expect("tip is much lower than Height::MAX"),
(tip_height + lookahead_pause/2).expect("tip is much lower than Height::MAX"))
} else {
let genesis_drop = VERIFICATION_PIPELINE_DROP_LIMIT.try_into().expect("fits in u32");
let genesis_lookahead =
u32::try_from(lookahead_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)
(block::Height(genesis_drop),
block::Height(genesis_lookahead),
block::Height(genesis_lookahead/2))
};
// Get the finalized tip height, assuming we're using the non-finalized state.
@ -388,28 +434,59 @@ where
return Err(BlockDownloadVerifyError::InvalidHeight { hash });
};
if block_height > max_lookahead_height {
info!(
?hash,
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("sync.max.height.limit.dropped.block.count", 1);
// This error should be very rare during normal operation.
//
// We need to reset the syncer on this error,
// to allow the verifier and state to catch up,
// or prevent it following a bad chain.
//
// If we don't reset the syncer on this error,
// it will continue downloading blocks from a bad chain,
// (or blocks far ahead of the current state tip).
if block_height > lookahead_drop_height {
Err(BlockDownloadVerifyError::AboveLookaheadHeightLimit { height: block_height, hash })?;
} else if block_height < min_accepted_height {
} else if block_height > lookahead_pause_height {
// This log can be very verbose, usually hundreds of blocks are dropped.
// So we only log at info level for the first above-height block.
if !past_lookahead_limit_receiver.cloned_watch_data() {
info!(
?hash,
?block_height,
?tip_height,
?lookahead_pause_height,
?lookahead_reset_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: \
waiting for downloaded blocks to commit to the state",
);
// Set the watched value to true, since we're over the limit.
//
// It is ok to block here, because we're going to pause new downloads anyway.
// But if Zebra is shutting down, ignore the send error.
let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(true);
} else {
debug!(
?hash,
?block_height,
?tip_height,
?lookahead_pause_height,
?lookahead_reset_height,
lookahead_limit = ?lookahead_limit,
"synced block height too far ahead of the tip: \
waiting for downloaded blocks to commit to the state",
);
}
metrics::counter!("sync.max.height.limit.paused.count", 1);
} else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() {
// Try to reset the watched value to false, since we're well under the limit.
match past_lookahead_limit_sender.try_lock() {
Ok(watch_sender_guard) => {
// If Zebra is shutting down, ignore the send error.
let _ = watch_sender_guard.send(true);
metrics::counter!("sync.max.height.limit.reset.count", 1);
},
Err(TryLockError::Poisoned(_)) => panic!("thread panicked while holding the past_lookahead_limit_sender mutex guard"),
// We'll try allowing new downloads when we get the next block
Err(TryLockError::WouldBlock) => {}
}
metrics::counter!("sync.max.height.limit.reset.attempt.count", 1);
}
if block_height < min_accepted_height {
debug!(
?hash,
?block_height,
@ -504,8 +581,14 @@ where
assert!(self.cancel_handles.is_empty());
}
/// Get the number of currently in-flight download tasks.
/// Get the number of currently in-flight download and verify tasks.
pub fn in_flight(&mut self) -> usize {
self.pending.len()
}
/// Returns true if there are no in-flight download and verify tasks.
#[allow(dead_code)]
pub fn is_empty(&mut self) -> bool {
self.pending.is_empty()
}
}

View File

@ -495,9 +495,9 @@ async fn sync_blocks_duplicate_hashes_ok() -> Result<(), crate::BoxError> {
Ok(())
}
/// Test that zebra-network rejects blocks with the wrong hash.
/// Test that zebra-network rejects blocks that are a long way ahead of the state tip.
#[tokio::test]
async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> {
async fn sync_block_lookahead_drop() -> Result<(), crate::BoxError> {
// Get services
let (
chain_sync_future,
@ -526,13 +526,15 @@ async fn sync_block_wrong_hash() -> Result<(), crate::BoxError> {
.await
.respond(zs::Response::Depth(None));
// Block 0 is fetched, but the peer returns a much higher block
// Block 0 is fetched, but the peer returns a much higher block.
// (Mismatching hashes are usually ignored by the network service,
// but we use them here to test the syncer lookahead.)
peer_set
.expect_request(zn::Request::BlocksByHash(iter::once(block0_hash).collect()))
.await
.respond(zn::Response::Blocks(vec![Available(block982k.clone())]));
// Block is dropped because it has the wrong hash.
// Block is dropped because it is too far ahead of the tip.
// We expect more requests to the state service, because the syncer keeps on running.
peer_set.expect_no_requests().await;
chain_verifier.expect_no_requests().await;

View File

@ -169,6 +169,9 @@ use common::{
/// This limit only applies to some tests.
pub const MAX_ASYNC_BLOCKING_TIME: Duration = zebra_test::mock_service::DEFAULT_MAX_REQUEST_DELAY;
/// The test config file prefix for `--feature getblocktemplate-rpcs` configs.
pub const GET_BLOCK_TEMPLATE_CONFIG_PREFIX: &str = "getblocktemplate-";
#[test]
fn generate_no_args() -> Result<()> {
let _init_guard = zebra_test::init();
@ -702,11 +705,22 @@ fn last_config_is_stored() -> Result<()> {
Err(eyre!(
"latest zebrad config is not being tested for compatibility.\n\
Run:\n\
zebrad generate |\n\
sed \"s/cache_dir = '.*'/cache_dir = 'cache_dir'/\" >\n\
zebrad/tests/common/configs/<next-release-tag>.toml\n\
and commit the latest config to Zebra's git repository"
Run: \n\
cargo build {}--bin zebrad && \n\
zebrad generate | \n\
sed \"s/cache_dir = '.*'/cache_dir = 'cache_dir'/\" > \n\
zebrad/tests/common/configs/{}<next-release-tag>.toml \n\
and commit the latest config to Zebra's git repository",
if cfg!(feature = "getblocktemplate-rpcs") {
"--features=getblocktemplate-rpcs "
} else {
""
},
if cfg!(feature = "getblocktemplate-rpcs") {
GET_BLOCK_TEMPLATE_CONFIG_PREFIX
} else {
""
},
))
}
@ -799,7 +813,7 @@ fn stored_configs_works() -> Result<()> {
.file_name()
.into_string()
.expect("all files names should be string convertible")
.starts_with("getblocktemplate-")
.starts_with(GET_BLOCK_TEMPLATE_CONFIG_PREFIX)
{
continue;
}

View File

@ -0,0 +1,72 @@
# Default configuration for zebrad.
#
# This file can be used as a skeleton for custom configs.
#
# Unspecified fields use default values. Optional fields are Some(field) if the
# field is present and None if it is absent.
#
# This file is generated as an example using zebrad's current defaults.
# You should set only the config options you want to keep, and delete the rest.
# Only a subset of fields are present in the skeleton, since optional values
# whose default is None are omitted.
#
# The config format (including a complete list of sections and fields) is
# documented here:
# https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html
#
# zebrad attempts to load configs in the following order:
#
# 1. The -c flag on the command line, e.g., `zebrad -c myconfig.toml start`;
# 2. The file `zebrad.toml` in the users's preference directory (platform-dependent);
# 3. The default config.
[consensus]
checkpoint_sync = true
debug_skip_parameter_preload = false
[mempool]
eviction_memory_time = '1h'
tx_cost_limit = 80000000
[metrics]
[mining]
[network]
crawl_new_peer_interval = '1m 1s'
initial_mainnet_peers = [
'dnsseed.z.cash:8233',
'dnsseed.str4d.xyz:8233',
'mainnet.seeder.zfnd.org:8233',
'mainnet.is.yolo.money:8233',
]
initial_testnet_peers = [
'dnsseed.testnet.z.cash:18233',
'testnet.seeder.zfnd.org:18233',
'testnet.is.yolo.money:18233',
]
listen_addr = '0.0.0.0:8233'
network = 'Mainnet'
peerset_initial_target_size = 25
[rpc]
debug_force_finished_sync = false
parallel_cpu_threads = 1
[state]
cache_dir = 'cache_dir'
delete_old_database = true
ephemeral = false
[sync]
checkpoint_verify_concurrency_limit = 1000
download_concurrency_limit = 50
full_verify_concurrency_limit = 20
parallel_cpu_threads = 0
[tracing]
buffer_limit = 128000
force_use_color = false
use_color = true
use_journald = false

View File

@ -0,0 +1,70 @@
# Default configuration for zebrad.
#
# This file can be used as a skeleton for custom configs.
#
# Unspecified fields use default values. Optional fields are Some(field) if the
# field is present and None if it is absent.
#
# This file is generated as an example using zebrad's current defaults.
# You should set only the config options you want to keep, and delete the rest.
# Only a subset of fields are present in the skeleton, since optional values
# whose default is None are omitted.
#
# The config format (including a complete list of sections and fields) is
# documented here:
# https://doc.zebra.zfnd.org/zebrad/config/struct.ZebradConfig.html
#
# zebrad attempts to load configs in the following order:
#
# 1. The -c flag on the command line, e.g., `zebrad -c myconfig.toml start`;
# 2. The file `zebrad.toml` in the users's preference directory (platform-dependent);
# 3. The default config.
[consensus]
checkpoint_sync = true
debug_skip_parameter_preload = false
[mempool]
eviction_memory_time = '1h'
tx_cost_limit = 80000000
[metrics]
[network]
crawl_new_peer_interval = '1m 1s'
initial_mainnet_peers = [
'dnsseed.z.cash:8233',
'dnsseed.str4d.xyz:8233',
'mainnet.seeder.zfnd.org:8233',
'mainnet.is.yolo.money:8233',
]
initial_testnet_peers = [
'dnsseed.testnet.z.cash:18233',
'testnet.seeder.zfnd.org:18233',
'testnet.is.yolo.money:18233',
]
listen_addr = '0.0.0.0:8233'
network = 'Mainnet'
peerset_initial_target_size = 25
[rpc]
debug_force_finished_sync = false
parallel_cpu_threads = 1
[state]
cache_dir = 'cache_dir'
delete_old_database = true
ephemeral = false
[sync]
checkpoint_verify_concurrency_limit = 1000
download_concurrency_limit = 50
full_verify_concurrency_limit = 20
parallel_cpu_threads = 0
[tracing]
buffer_limit = 128000
force_use_color = false
use_color = true
use_journald = false

View File

@ -74,7 +74,7 @@ pub const FINISH_PARTIAL_SYNC_TIMEOUT: Duration = Duration::from_secs(11 * 60 *
/// The maximum time to wait for Zebrad to synchronize up to the chain tip starting from the
/// genesis block.
pub const FINISH_FULL_SYNC_TIMEOUT: Duration = Duration::from_secs(32 * 60 * 60);
pub const FINISH_FULL_SYNC_TIMEOUT: Duration = Duration::from_secs(36 * 60 * 60);
/// The test sync height where we switch to using the default lookahead limit.
///