diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index a699d086e..756153b12 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -346,110 +346,107 @@ where (new_syncer, sync_status) } + /// Runs the syncer to synchronize the chain and keep it synchronized. #[instrument(skip(self))] pub async fn sync(mut self) -> Result<(), Report> { // We can't download the genesis block using our normal algorithm, // due to protocol limitations self.request_genesis().await?; - // Distinguishes a restart from a start, so we don't sleep when starting - // the sync process, but we can keep restart logic in one place. - let mut started_once = false; - - 'sync: loop { - if started_once { - info!( - timeout = ?SYNC_RESTART_DELAY, - state_tip = ?self.latest_chain_tip.best_tip_height(), - "waiting to restart sync" - ); - self.prospective_tips = HashSet::new(); + loop { + if self.try_to_sync().await.is_err() { self.downloads.cancel_all(); - self.update_metrics(); - sleep(SYNC_RESTART_DELAY).await; - } else { - started_once = true; } + self.update_metrics(); + info!( + timeout = ?SYNC_RESTART_DELAY, state_tip = ?self.latest_chain_tip.best_tip_height(), - "starting sync, obtaining new tips" + "waiting to restart sync" ); - if let Err(e) = self.obtain_tips().await { - warn!(?e, "error obtaining tips"); - continue 'sync; + sleep(SYNC_RESTART_DELAY).await; + } + } + + /// Tries to synchronize the chain as far as it can. + /// + /// Obtains some prospective tips and iteratively tries to extend them and download the missing + /// blocks. + /// + /// Returns `Ok` if it was able to synchronize as much of the chain as it could, and then ran + /// out of prospective tips. This happens when synchronization finishes or if Zebra ended up + /// following a fork. Either way, Zebra should attempt to obtain some more tips. + /// + /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is + /// necessary. + #[instrument(skip(self))] + async fn try_to_sync(&mut self) -> Result<(), ()> { + self.prospective_tips = HashSet::new(); + + info!( + state_tip = ?self.latest_chain_tip.best_tip_height(), + "starting sync, obtaining new tips" + ); + if let Err(e) = self.obtain_tips().await { + warn!(?e, "error obtaining tips"); + return Err(()); + } + self.update_metrics(); + + while !self.prospective_tips.is_empty() { + // Check whether any block tasks are currently ready: + while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { + self.handle_block_response(rsp).await?; } self.update_metrics(); - while !self.prospective_tips.is_empty() { - // Check whether any block tasks are currently ready: - while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { - match rsp { - Ok(hash) => { - trace!(?hash, "verified and committed block to state"); - } - Err(e) => { - if Self::should_restart_sync(e) { - continue 'sync; - } - } - } - } - self.update_metrics(); - - // If we have too many pending tasks, wait for some to finish. - // - // Starting to wait is interesting, but logging each wait can be - // very verbose. - if self.downloads.in_flight() > self.lookahead_limit { - tracing::info!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - lookahead_limit = self.lookahead_limit, - "waiting for pending blocks", - ); - } - while self.downloads.in_flight() > self.lookahead_limit { - trace!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - lookahead_limit = self.lookahead_limit, - state_tip = ?self.latest_chain_tip.best_tip_height(), - "waiting for pending blocks", - ); - - match self.downloads.next().await.expect("downloads is nonempty") { - Ok(hash) => { - trace!(?hash, "verified and committed block to state"); - } - - Err(e) => { - if Self::should_restart_sync(e) { - continue 'sync; - } - } - } - self.update_metrics(); - } - - // Once we're below the lookahead limit, we can keep extending the tips. - info!( + // If we have too many pending tasks, wait for some to finish. + // + // Starting to wait is interesting, but logging each wait can be + // very verbose. + if self.downloads.in_flight() > self.lookahead_limit { + tracing::info!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + lookahead_limit = self.lookahead_limit, + "waiting for pending blocks", + ); + } + while self.downloads.in_flight() > self.lookahead_limit { + trace!( tips.len = self.prospective_tips.len(), in_flight = self.downloads.in_flight(), lookahead_limit = self.lookahead_limit, state_tip = ?self.latest_chain_tip.best_tip_height(), - "extending tips", + "waiting for pending blocks", ); - if let Err(e) = self.extend_tips().await { - warn!(?e, "error extending tips"); - continue 'sync; - } + let response = self.downloads.next().await.expect("downloads is nonempty"); + + self.handle_block_response(response).await?; self.update_metrics(); } - info!("exhausted prospective tip set"); + // Once we're below the lookahead limit, we can keep extending the tips. + info!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + lookahead_limit = self.lookahead_limit, + state_tip = ?self.latest_chain_tip.best_tip_height(), + "extending tips", + ); + + if let Err(e) = self.extend_tips().await { + warn!(?e, "error extending tips"); + return Err(()); + } + self.update_metrics(); } + + info!("exhausted prospective tip set"); + + Ok(()) } /// Given a block_locator list fan out request for subsequent hashes to @@ -777,6 +774,28 @@ where Ok(()) } + /// Handles a response for a requested block. + /// + /// Returns `Ok` if the block was successfully verified and commited to the state, or if an + /// expected error occurred, so that the synchronization can continue normally. + /// + /// Returns `Err` if an unexpected error occurred, to force the synchronizer to restart. + async fn handle_block_response( + &mut self, + response: Result, + ) -> Result<(), ()> { + match response { + Ok(hash) => trace!(?hash, "verified and committed block to state"), + Err(error) => { + if Self::should_restart_sync(error) { + return Err(()); + } + } + } + + Ok(()) + } + /// Returns `true` if the hash is present in the state, and `false` /// if the hash is not present in the state. ///