fix(sync): prevent synchronizer loop when very close to tip (#3854)

* Refactor to split `ChainSync::sync` method in two

Replace the use of loop labels and `continue` for control flow, and use
early return from a separate method instead. This also allows removing
the `started_once` flag.

* Refactor to create `handle_block_response` helper

Reduce duplicate code and make the main synchronization methods a little
more concise to improve readability.

* Only cancel downloads in case of error

Leave active downloads running if the tips have been exhausted, because
it could have reached the chain tip.
This commit is contained in:
Janito Vaqueiro Ferreira Filho 2022-03-17 21:31:12 -03:00 committed by GitHub
parent 39dfca8e64
commit 78080d88d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 98 additions and 79 deletions

View File

@ -346,110 +346,107 @@ where
(new_syncer, sync_status)
}
/// Runs the syncer to synchronize the chain and keep it synchronized.
#[instrument(skip(self))]
pub async fn sync(mut self) -> Result<(), Report> {
// We can't download the genesis block using our normal algorithm,
// due to protocol limitations
self.request_genesis().await?;
// Distinguishes a restart from a start, so we don't sleep when starting
// the sync process, but we can keep restart logic in one place.
let mut started_once = false;
'sync: loop {
if started_once {
info!(
timeout = ?SYNC_RESTART_DELAY,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting to restart sync"
);
self.prospective_tips = HashSet::new();
loop {
if self.try_to_sync().await.is_err() {
self.downloads.cancel_all();
self.update_metrics();
sleep(SYNC_RESTART_DELAY).await;
} else {
started_once = true;
}
self.update_metrics();
info!(
timeout = ?SYNC_RESTART_DELAY,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"starting sync, obtaining new tips"
"waiting to restart sync"
);
if let Err(e) = self.obtain_tips().await {
warn!(?e, "error obtaining tips");
continue 'sync;
sleep(SYNC_RESTART_DELAY).await;
}
}
/// Tries to synchronize the chain as far as it can.
///
/// Obtains some prospective tips and iteratively tries to extend them and download the missing
/// blocks.
///
/// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran
/// out of prospective tips. This happens when synchronization finishes or if Zebra ended up
/// following a fork. Either way, Zebra should attempt to obtain some more tips.
///
/// Returns `Err` if there was an unrecoverable error and restarting the synchronization is
/// necessary.
#[instrument(skip(self))]
async fn try_to_sync(&mut self) -> Result<(), ()> {
self.prospective_tips = HashSet::new();
info!(
state_tip = ?self.latest_chain_tip.best_tip_height(),
"starting sync, obtaining new tips"
);
if let Err(e) = self.obtain_tips().await {
warn!(?e, "error obtaining tips");
return Err(());
}
self.update_metrics();
while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
self.handle_block_response(rsp).await?;
}
self.update_metrics();
while !self.prospective_tips.is_empty() {
// Check whether any block tasks are currently ready:
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
match rsp {
Ok(hash) => {
trace!(?hash, "verified and committed block to state");
}
Err(e) => {
if Self::should_restart_sync(e) {
continue 'sync;
}
}
}
}
self.update_metrics();
// If we have too many pending tasks, wait for some to finish.
//
// Starting to wait is interesting, but logging each wait can be
// very verbose.
if self.downloads.in_flight() > self.lookahead_limit {
tracing::info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
"waiting for pending blocks",
);
}
while self.downloads.in_flight() > self.lookahead_limit {
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting for pending blocks",
);
match self.downloads.next().await.expect("downloads is nonempty") {
Ok(hash) => {
trace!(?hash, "verified and committed block to state");
}
Err(e) => {
if Self::should_restart_sync(e) {
continue 'sync;
}
}
}
self.update_metrics();
}
// Once we're below the lookahead limit, we can keep extending the tips.
info!(
// If we have too many pending tasks, wait for some to finish.
//
// Starting to wait is interesting, but logging each wait can be
// very verbose.
if self.downloads.in_flight() > self.lookahead_limit {
tracing::info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
"waiting for pending blocks",
);
}
while self.downloads.in_flight() > self.lookahead_limit {
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
"waiting for pending blocks",
);
if let Err(e) = self.extend_tips().await {
warn!(?e, "error extending tips");
continue 'sync;
}
let response = self.downloads.next().await.expect("downloads is nonempty");
self.handle_block_response(response).await?;
self.update_metrics();
}
info!("exhausted prospective tip set");
// Once we're below the lookahead limit, we can keep extending the tips.
info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
lookahead_limit = self.lookahead_limit,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
);
if let Err(e) = self.extend_tips().await {
warn!(?e, "error extending tips");
return Err(());
}
self.update_metrics();
}
info!("exhausted prospective tip set");
Ok(())
}
/// Given a block_locator list fan out request for subsequent hashes to
@ -777,6 +774,28 @@ where
Ok(())
}
/// Handles a response for a requested block.
///
/// Returns `Ok` if the block was successfully verified and commited to the state, or if an
/// expected error occurred, so that the synchronization can continue normally.
///
/// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart.
async fn handle_block_response(
&mut self,
response: Result<block::Hash, BlockDownloadVerifyError>,
) -> Result<(), ()> {
match response {
Ok(hash) => trace!(?hash, "verified and committed block to state"),
Err(error) => {
if Self::should_restart_sync(error) {
return Err(());
}
}
}
Ok(())
}
/// Returns `true` if the hash is present in the state, and `false`
/// if the hash is not present in the state.
///