fix(sync): fix testnet syncer loop on large Orchard blocks (#4286)

* Return BlockDownloadVerifyError from download_and_verify

* Check block requests and genesis for temporary errors

* Ignore DuplicateBlockQueuedForDownload as a temporary error

* Propagate error info to the syncer main loop

* Sleep after temporary genesis download and verify errors
This commit is contained in:
teor 2022-05-05 08:04:34 +10:00 committed by GitHub
parent e9d37c6275
commit 56f766f9b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 37 deletions

View File

@ -381,7 +381,7 @@ where
/// Returns `Err` if there was an unrecoverable error and restarting the synchronization is /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
/// necessary. /// necessary.
#[instrument(skip(self))] #[instrument(skip(self))]
async fn try_to_sync(&mut self) -> Result<(), ()> { async fn try_to_sync(&mut self) -> Result<(), Report> {
self.prospective_tips = HashSet::new(); self.prospective_tips = HashSet::new();
info!( info!(
@ -390,14 +390,14 @@ where
); );
if let Err(e) = self.obtain_tips().await { if let Err(e) = self.obtain_tips().await {
info!("temporary error obtaining tips: {:#}", e); info!("temporary error obtaining tips: {:#}", e);
return Err(()); return Err(e);
} }
self.update_metrics(); self.update_metrics();
while !self.prospective_tips.is_empty() { while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready: // Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
self.handle_block_response(rsp).await?; Self::handle_block_response(rsp)?;
} }
self.update_metrics(); self.update_metrics();
@ -424,7 +424,7 @@ where
let response = self.downloads.next().await.expect("downloads is nonempty"); let response = self.downloads.next().await.expect("downloads is nonempty");
self.handle_block_response(response).await?; Self::handle_block_response(response)?;
self.update_metrics(); self.update_metrics();
} }
@ -439,7 +439,7 @@ where
if let Err(e) = self.extend_tips().await { if let Err(e) = self.extend_tips().await {
info!("temporary error extending tips: {:#}", e); info!("temporary error extending tips: {:#}", e);
return Err(()); return Err(e);
} }
self.update_metrics(); self.update_metrics();
} }
@ -594,7 +594,8 @@ where
// so the last peer to respond can't toggle our mempool // so the last peer to respond can't toggle our mempool
self.recent_syncs.push_obtain_tips_length(new_downloads); self.recent_syncs.push_obtain_tips_length(new_downloads);
self.request_blocks(download_set).await?; let response = self.request_blocks(download_set).await;
Self::handle_response(response)?;
Ok(()) Ok(())
} }
@ -731,7 +732,8 @@ where
// so the last peer to respond can't toggle our mempool // so the last peer to respond can't toggle our mempool
self.recent_syncs.push_extend_tips_length(new_downloads); self.recent_syncs.push_extend_tips_length(new_downloads);
self.request_blocks(download_set).await?; let response = self.request_blocks(download_set).await;
Self::handle_response(response)?;
Ok(()) Ok(())
} }
@ -748,14 +750,28 @@ where
// So we just download and verify the genesis block here. // So we just download and verify the genesis block here.
while !self.state_contains(self.genesis_hash).await? { while !self.state_contains(self.genesis_hash).await? {
info!("starting genesis block download and verify"); info!("starting genesis block download and verify");
self.downloads
.download_and_verify(self.genesis_hash) let response = self.downloads.download_and_verify(self.genesis_hash).await;
.await Self::handle_response(response).map_err(|e| eyre!(e))?;
.map_err(|e| eyre!(e))?;
match self.downloads.next().await.expect("downloads is nonempty") { let response = self.downloads.next().await.expect("downloads is nonempty");
match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"), Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(e) => { Err(error) => {
warn!(?e, "could not download or verify genesis block, retrying"); // TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
warn!(
?error,
"could not download or verify genesis block, retrying"
);
} else {
info!(
?error,
"temporary error downloading or verifying genesis block, retrying"
);
}
tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await; tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
} }
} }
@ -765,7 +781,10 @@ where
} }
/// Queue download and verify tasks for each block that isn't currently known to our node /// Queue download and verify tasks for each block that isn't currently known to our node
async fn request_blocks(&mut self, hashes: IndexSet<block::Hash>) -> Result<(), Report> { async fn request_blocks(
&mut self,
hashes: IndexSet<block::Hash>,
) -> Result<(), BlockDownloadVerifyError> {
debug!(hashes.len = hashes.len(), "requesting blocks"); debug!(hashes.len = hashes.len(), "requesting blocks");
for hash in hashes.into_iter() { for hash in hashes.into_iter() {
self.downloads.download_and_verify(hash).await?; self.downloads.download_and_verify(hash).await?;
@ -780,16 +799,30 @@ where
/// expected error occurred, so that the synchronization can continue normally. /// expected error occurred, so that the synchronization can continue normally.
/// ///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart. /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
async fn handle_block_response( fn handle_block_response(
&mut self,
response: Result<block::Hash, BlockDownloadVerifyError>, response: Result<block::Hash, BlockDownloadVerifyError>,
) -> Result<(), ()> { ) -> Result<(), BlockDownloadVerifyError> {
match response { match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"), Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(error) => { Err(_) => return Self::handle_response(response.map(|_| ())),
if Self::should_restart_sync(error) { }
return Err(());
} Ok(())
}
/// Handles a response to a syncer request.
///
/// Returns `Ok` if the request was successful, or if an expected error occurred,
/// so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
fn handle_response(
response: Result<(), BlockDownloadVerifyError>,
) -> Result<(), BlockDownloadVerifyError> {
if let Err(error) = response {
// TODO: exit syncer on permanent service errors (NetworkError, VerifierError)
if Self::should_restart_sync(&error) {
return Err(error);
} }
} }
@ -830,9 +863,9 @@ where
/// Return if the sync should be restarted based on the given error /// Return if the sync should be restarted based on the given error
/// from the block downloader and verifier stream. /// from the block downloader and verifier stream.
fn should_restart_sync(e: BlockDownloadVerifyError) -> bool { fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
match e { match e {
// Structural matches // Structural matches: downcasts
BlockDownloadVerifyError::Invalid(VerifyChainError::Checkpoint( BlockDownloadVerifyError::Invalid(VerifyChainError::Checkpoint(
VerifyCheckpointError::AlreadyVerified { .. }, VerifyCheckpointError::AlreadyVerified { .. },
)) => { )) => {
@ -847,6 +880,8 @@ where
debug!(error = ?e, "block is already in chain, possibly from a previous sync run, continuing"); debug!(error = ?e, "block is already in chain, possibly from a previous sync run, continuing");
false false
} }
// Structural matches: direct
BlockDownloadVerifyError::CancelledDuringDownload BlockDownloadVerifyError::CancelledDuringDownload
| BlockDownloadVerifyError::CancelledDuringVerification => { | BlockDownloadVerifyError::CancelledDuringVerification => {
debug!(error = ?e, "block verification was cancelled, continuing"); debug!(error = ?e, "block verification was cancelled, continuing");
@ -860,6 +895,14 @@ where
); );
false false
} }
BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
debug!(
error = ?e,
"queued duplicate block hash for download, \
assuming the syncer will eventually resolve duplicates, continuing"
);
false
}
// String matches // String matches
BlockDownloadVerifyError::Invalid(VerifyChainError::Block( BlockDownloadVerifyError::Invalid(VerifyChainError::Block(
@ -872,7 +915,9 @@ where
BlockDownloadVerifyError::DownloadFailed(ref source) BlockDownloadVerifyError::DownloadFailed(ref source)
if format!("{:?}", source).contains("NotFound") => if format!("{:?}", source).contains("NotFound") =>
{ {
// Covers both NotFoundResponse and NotFoundRegistry errors. // Covers these errors:
// - NotFoundResponse
// - NotFoundRegistry
// //
// TODO: improve this by checking the type (#2908) // TODO: improve this by checking the type (#2908)
// restart after a certain number of NotFound errors? // restart after a certain number of NotFound errors?
@ -887,13 +932,12 @@ where
// become incorrect e.g. after some refactoring, and it is difficult // become incorrect e.g. after some refactoring, and it is difficult
// to write a test to check it. The test below is a best-effort // to write a test to check it. The test below is a best-effort
// attempt to catch if that happens and log it. // attempt to catch if that happens and log it.
//
// TODO: add a proper test and remove this // TODO: add a proper test and remove this
// https://github.com/ZcashFoundation/zebra/issues/2909 // https://github.com/ZcashFoundation/zebra/issues/2909
let err_str = format!("{:?}", e); let err_str = format!("{:?}", e);
if err_str.contains("AlreadyVerified") if err_str.contains("AlreadyVerified")
|| err_str.contains("AlreadyInChain") || err_str.contains("AlreadyInChain")
|| err_str.contains("Cancelled")
|| err_str.contains("BehindTipHeight")
|| err_str.contains("block is already committed to the state") || err_str.contains("block is already committed to the state")
|| err_str.contains("NotFound") || err_str.contains("NotFound")
{ {

View File

@ -8,7 +8,6 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use color_eyre::eyre::{eyre, Report};
use futures::{ use futures::{
future::TryFutureExt, future::TryFutureExt,
ready, ready,
@ -63,14 +62,17 @@ impl<Request: Clone> hedge::Policy<Request> for AlwaysHedge {
#[derive(Error, Debug)] #[derive(Error, Debug)]
#[allow(dead_code)] #[allow(dead_code)]
pub enum BlockDownloadVerifyError { pub enum BlockDownloadVerifyError {
#[error("error downloading block")] #[error("error from the network service")]
DownloadFailed(#[source] BoxError), NetworkError(#[source] BoxError),
#[error("error from the verifier service")] #[error("error from the verifier service")]
VerifierError(#[source] BoxError), VerifierError(#[source] BoxError),
#[error("block did not pass consensus validation")] #[error("duplicate block hash queued for download: {hash:?}")]
Invalid(#[from] zebra_consensus::chain::VerifyChainError), DuplicateBlockQueuedForDownload { hash: block::Hash },
#[error("error downloading block")]
DownloadFailed(#[source] BoxError),
#[error("downloaded block was too far ahead of the chain tip")] #[error("downloaded block was too far ahead of the chain tip")]
AboveLookaheadHeightLimit, AboveLookaheadHeightLimit,
@ -81,6 +83,9 @@ pub enum BlockDownloadVerifyError {
#[error("downloaded block had an invalid height")] #[error("downloaded block had an invalid height")]
InvalidHeight, InvalidHeight,
#[error("block did not pass consensus validation")]
Invalid(#[from] zebra_consensus::chain::VerifyChainError),
#[error("block download / verification was cancelled during download")] #[error("block download / verification was cancelled during download")]
CancelledDuringDownload, CancelledDuringDownload,
@ -211,10 +216,13 @@ where
/// only if the network service fails. It returns immediately after queuing /// only if the network service fails. It returns immediately after queuing
/// the request. /// the request.
#[instrument(level = "debug", skip(self), fields(%hash))] #[instrument(level = "debug", skip(self), fields(%hash))]
pub async fn download_and_verify(&mut self, hash: block::Hash) -> Result<(), Report> { pub async fn download_and_verify(
&mut self,
hash: block::Hash,
) -> Result<(), BlockDownloadVerifyError> {
if self.cancel_handles.contains_key(&hash) { if self.cancel_handles.contains_key(&hash) {
metrics::counter!("sync.already.queued.dropped.block.hash.count", 1); metrics::counter!("sync.already.queued.dropped.block.hash.count", 1);
return Err(eyre!("duplicate hash queued for download: {:?}", hash)); return Err(BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { hash });
} }
// We construct the block requests sequentially, waiting for the peer // We construct the block requests sequentially, waiting for the peer
@ -224,14 +232,12 @@ where
// if we waited for readiness and did the service call in the spawned // if we waited for readiness and did the service call in the spawned
// tasks, all of the spawned tasks would race each other waiting for the // tasks, all of the spawned tasks would race each other waiting for the
// network to become ready. // network to become ready.
debug!("waiting to request block");
let block_req = self let block_req = self
.network .network
.ready() .ready()
.await .await
.map_err(|e| eyre!(e))? .map_err(BlockDownloadVerifyError::NetworkError)?
.call(zn::Request::BlocksByHash(std::iter::once(hash).collect())); .call(zn::Request::BlocksByHash(std::iter::once(hash).collect()));
debug!("requested block");
// This oneshot is used to signal cancellation to the download task. // This oneshot is used to signal cancellation to the download task.
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();