diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 226608581..f52ef565d 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -1289,11 +1289,15 @@ where // // // The inbound service must be called immediately after a buffer slot is reserved. + // + // The inbound service never times out in readiness, because the load shed layer is always + // ready, and returns an error in response to the request instead. if self.svc.ready().await.is_err() { self.fail_with(PeerError::ServiceShutdown).await; return; } + // Inbound service request timeouts are handled by the timeout layer in `start::start()`. let rsp = match self.svc.call(req.clone()).await { Err(e) => { if e.is::() { diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 871d45366..e560a7fcb 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -168,6 +168,7 @@ toml = "0.8.3" futures = "0.3.29" rayon = "1.7.0" tokio = { version = "1.33.0", features = ["time", "rt-multi-thread", "macros", "tracing", "signal"] } +tokio-stream = { version = "0.1.14", features = ["time"] } tower = { version = "0.4.13", features = ["hedge", "limit"] } pin-project = "1.1.3" diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index e93aa8517..f25b461f8 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -9,7 +9,7 @@ use std::{ collections::HashSet, future::Future, pin::Pin, - sync::Arc, + sync::{Arc, TryLockError}, task::{Context, Poll}, time::Duration, }; @@ -278,7 +278,11 @@ impl Service for Inbound { } } Err(TryRecvError::Empty) => { - // There's no setup data yet, so keep waiting for it + // There's no setup data yet, so keep waiting for it. + // + // We could use Future::poll() to get a waker and return Poll::Pending here. + // But we want to drop excess requests during startup instead. Otherwise, + // the inbound service gets overloaded, and starts disconnecting peers. result = Ok(()); Setup::Pending { full_verify_concurrency_limit, @@ -307,6 +311,11 @@ impl Service for Inbound { mempool, state, } => { + // # Correctness + // + // Clear the stream but ignore the final Pending return value. + // If we returned Pending here, and there were no waiting block downloads, + // then inbound requests would wait for the next block download, and hang forever. while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {} result = Ok(()); @@ -366,20 +375,35 @@ impl Service for Inbound { // // # Correctness // - // Briefly hold the address book threaded mutex while - // cloning the address book. Then sanitize in the future, - // after releasing the lock. - let peers = address_book.lock().unwrap().clone(); + // If the address book is busy, try again inside the future. If it can't be locked + // twice, ignore the request. + let address_book = address_book.clone(); + + let get_peers = move || match address_book.try_lock() { + Ok(address_book) => Some(address_book.clone()), + Err(TryLockError::WouldBlock) => None, + Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"), + }; + + let peers = get_peers(); async move { - // Correctness: get the current time after acquiring the address book lock. + // Correctness: get the current time inside the future. // - // This time is used to filter outdated peers, so it doesn't really matter + // This time is used to filter outdated peers, so it doesn't matter much // if we get it when the future is created, or when it starts running. let now = Utc::now(); + // If we didn't get the peers when the future was created, wait for other tasks + // to run, then try again when the future first runs. + if peers.is_none() { + tokio::task::yield_now().await; + } + let peers = peers.or_else(get_peers); + let is_busy = peers.is_none(); + // Send a sanitized response - let mut peers = peers.sanitized(now); + let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now)); // Truncate the list let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR); @@ -387,8 +411,20 @@ impl Service for Inbound { peers.truncate(address_limit); if peers.is_empty() { - // We don't know if the peer response will be empty until we've sanitized them. - debug!("ignoring `Peers` request from remote peer because our address book is empty"); + // Sometimes we don't know if the peer response will be empty until we've + // sanitized them. + if is_busy { + info!( + "ignoring `Peers` request from remote peer because our address \ + book is busy" + ); + } else { + debug!( + "ignoring `Peers` request from remote peer because our address \ + book has no available peers" + ); + } + Ok(zn::Response::Nil) } else { Ok(zn::Response::Peers(peers)) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index aef623e45..0e9aa2d38 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -22,12 +22,13 @@ use std::{ collections::HashSet, future::Future, iter, - pin::Pin, + pin::{pin, Pin}, task::{Context, Poll}, }; use futures::{future::FutureExt, stream::Stream}; use tokio::sync::broadcast; +use tokio_stream::StreamExt; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ @@ -42,7 +43,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response}; use zebra_state as zs; use zebra_state::{ChainTipChange, TipAction}; -use crate::components::sync::SyncStatus; +use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus}; pub mod config; mod crawler; @@ -580,9 +581,11 @@ impl Service for Mempool { let best_tip_height = self.latest_chain_tip.best_tip_height(); // Clean up completed download tasks and add to mempool if successful. - while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { + while let Poll::Ready(Some(r)) = + pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx) + { match r { - Ok((tx, expected_tip_height)) => { + Ok(Ok((tx, expected_tip_height))) => { // # Correctness: // // It's okay to use tip height here instead of the tip hash since @@ -609,12 +612,20 @@ impl Service for Mempool { tx_downloads.download_if_needed_and_verify(tx.transaction.into()); } } - Err((txid, error)) => { + Ok(Err((txid, error))) => { tracing::debug!(?txid, ?error, "mempool transaction failed to verify"); metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => error.to_string()); storage.reject_if_needed(txid, error); } + Err(_elapsed) => { + // A timeout happens when the stream hangs waiting for another service, + // so there is no specific transaction ID. + + tracing::info!("mempool transaction failed to verify due to timeout"); + + metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => "timeout"); + } }; } diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 610691f79..6d1129e2b 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -50,7 +50,11 @@ use std::{collections::HashSet, time::Duration}; use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt}; -use tokio::{sync::watch, task::JoinHandle, time::sleep}; +use tokio::{ + sync::watch, + task::JoinHandle, + time::{sleep, timeout}, +}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; use tracing_futures::Instrument; @@ -77,7 +81,7 @@ const FANOUT: usize = 3; /// /// Using a prime number makes sure that mempool crawler fanouts /// don't synchronise with other crawls. -const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73); +pub const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73); /// The time to wait for a peer response. /// @@ -191,7 +195,14 @@ where loop { self.wait_until_enabled().await?; - self.crawl_transactions().await?; + // Avoid hangs when the peer service is not ready, or due to bugs in async code. + timeout(RATE_LIMIT_DELAY, self.crawl_transactions()) + .await + .unwrap_or_else(|timeout| { + // Temporary errors just get logged and ignored. + info!("mempool crawl timed out: {timeout:?}"); + Ok(()) + })?; sleep(RATE_LIMIT_DELAY).await; } } diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 9dc6da285..db98a1de8 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -2,13 +2,17 @@ //! //! It is used when Zebra is a long way behind the current chain tip. -use std::{cmp::max, collections::HashSet, pin::Pin, task::Poll, time::Duration}; +use std::{cmp::max, collections::HashSet, convert, pin::Pin, task::Poll, time::Duration}; use color_eyre::eyre::{eyre, Report}; use futures::stream::{FuturesUnordered, StreamExt}; use indexmap::IndexSet; use serde::{Deserialize, Serialize}; -use tokio::{sync::watch, task::JoinError, time::sleep}; +use tokio::{ + sync::watch, + task::JoinError, + time::{sleep, timeout}, +}; use tower::{ builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout, Service, ServiceExt, @@ -210,7 +214,10 @@ const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67); /// If this timeout is removed (or set too low), Zebra will immediately retry /// to download and verify the genesis block from its peers. This can cause /// a denial of service on those peers. -const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(5); +/// +/// If this timeout is too short, old or buggy nodes will keep making useless +/// network requests. If there are a lot of them, it could overwhelm the network. +const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10); /// Sync configuration section. #[derive(Clone, Debug, Deserialize, Serialize)] @@ -541,7 +548,8 @@ where /// following a fork. Either way, Zebra should attempt to obtain some more tips. /// /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is - /// necessary. + /// necessary. This includes outer timeouts, where an entire syncing step takes an extremely + /// long time. (These usually indicate hangs.) #[instrument(skip(self))] async fn try_to_sync(&mut self) -> Result<(), Report> { self.prospective_tips = HashSet::new(); @@ -550,71 +558,24 @@ where state_tip = ?self.latest_chain_tip.best_tip_height(), "starting sync, obtaining new tips" ); - let mut extra_hashes = self.obtain_tips().await.map_err(|e| { - info!("temporary error obtaining tips: {:#}", e); - e - })?; + let mut extra_hashes = timeout(SYNC_RESTART_DELAY, self.obtain_tips()) + .await + .map_err(Into::into) + // TODO: replace with flatten() when it stabilises (#70142) + .and_then(convert::identity) + .map_err(|e| { + info!("temporary error obtaining tips: {:#}", e); + e + })?; self.update_metrics(); while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() { - // Check whether any block tasks are currently ready: - while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { - self.handle_block_response(rsp)?; - } - self.update_metrics(); - - // Pause new downloads while the syncer or downloader are past their lookahead limits. - // - // To avoid a deadlock or long waits for blocks to expire, we ignore the download - // lookahead limit when there are only a small number of blocks waiting. - while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) - || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2 - && self.past_lookahead_limit_receiver.cloned_watch_data()) - { - trace!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - extra_hashes = extra_hashes.len(), - lookahead_limit = self.lookahead_limit(extra_hashes.len()), - state_tip = ?self.latest_chain_tip.best_tip_height(), - "waiting for pending blocks", - ); - - let response = self.downloads.next().await.expect("downloads is nonempty"); - - self.handle_block_response(response)?; - self.update_metrics(); - } - - // Once we're below the lookahead limit, we can request more blocks or hashes. - if !extra_hashes.is_empty() { - debug!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - extra_hashes = extra_hashes.len(), - lookahead_limit = self.lookahead_limit(extra_hashes.len()), - state_tip = ?self.latest_chain_tip.best_tip_height(), - "requesting more blocks", - ); - - let response = self.request_blocks(extra_hashes).await; - extra_hashes = Self::handle_hash_response(response)?; - } else { - info!( - tips.len = self.prospective_tips.len(), - in_flight = self.downloads.in_flight(), - extra_hashes = extra_hashes.len(), - lookahead_limit = self.lookahead_limit(extra_hashes.len()), - state_tip = ?self.latest_chain_tip.best_tip_height(), - "extending tips", - ); - - extra_hashes = self.extend_tips().await.map_err(|e| { - info!("temporary error extending tips: {:#}", e); - e - })?; - } - self.update_metrics(); + // Avoid hangs due to service readiness or other internal operations + extra_hashes = timeout(BLOCK_VERIFY_TIMEOUT, self.try_to_sync_once(extra_hashes)) + .await + .map_err(Into::into) + // TODO: replace with flatten() when it stabilises (#70142) + .and_then(convert::identity)?; } info!("exhausted prospective tip set"); @@ -622,6 +583,83 @@ where Ok(()) } + /// Tries to synchronize the chain once, using the existing `extra_hashes`. + /// + /// Tries to extend the existing tips and download the missing blocks. + /// + /// Returns `Ok(extra_hashes)` if it was able to extend once and synchronize sone of the chain. + /// Returns `Err` if there was an unrecoverable error and restarting the synchronization is + /// necessary. + #[instrument(skip(self))] + async fn try_to_sync_once( + &mut self, + mut extra_hashes: IndexSet, + ) -> Result, Report> { + // Check whether any block tasks are currently ready. + while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) { + // Some temporary errors are ignored, and syncing continues with other blocks. + // If it turns out they were actually important, syncing will run out of blocks, and + // the syncer will reset itself. + self.handle_block_response(rsp)?; + } + self.update_metrics(); + + // Pause new downloads while the syncer or downloader are past their lookahead limits. + // + // To avoid a deadlock or long waits for blocks to expire, we ignore the download + // lookahead limit when there are only a small number of blocks waiting. + while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) + || (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2 + && self.past_lookahead_limit_receiver.cloned_watch_data()) + { + trace!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + extra_hashes = extra_hashes.len(), + lookahead_limit = self.lookahead_limit(extra_hashes.len()), + state_tip = ?self.latest_chain_tip.best_tip_height(), + "waiting for pending blocks", + ); + + let response = self.downloads.next().await.expect("downloads is nonempty"); + + self.handle_block_response(response)?; + self.update_metrics(); + } + + // Once we're below the lookahead limit, we can request more blocks or hashes. + if !extra_hashes.is_empty() { + debug!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + extra_hashes = extra_hashes.len(), + lookahead_limit = self.lookahead_limit(extra_hashes.len()), + state_tip = ?self.latest_chain_tip.best_tip_height(), + "requesting more blocks", + ); + + let response = self.request_blocks(extra_hashes).await; + extra_hashes = Self::handle_hash_response(response)?; + } else { + info!( + tips.len = self.prospective_tips.len(), + in_flight = self.downloads.in_flight(), + extra_hashes = extra_hashes.len(), + lookahead_limit = self.lookahead_limit(extra_hashes.len()), + state_tip = ?self.latest_chain_tip.best_tip_height(), + "extending tips", + ); + + extra_hashes = self.extend_tips().await.map_err(|e| { + info!("temporary error extending tips: {:#}", e); + e + })?; + } + self.update_metrics(); + + Ok(extra_hashes) + } + /// Given a block_locator list fan out request for subsequent hashes to /// multiple peers #[instrument(skip(self))] @@ -932,16 +970,19 @@ where while !self.state_contains(self.genesis_hash).await? { info!("starting genesis block download and verify"); - let response = self.downloads.download_and_verify(self.genesis_hash).await; - Self::handle_response(response).map_err(|e| eyre!(e))?; - - let response = self.downloads.next().await.expect("downloads is nonempty"); + let response = timeout(SYNC_RESTART_DELAY, self.request_genesis_once()) + .await + .map_err(Into::into); + // 3 layers of results is not ideal, but we need the timeout on the outside. match response { - Ok(response) => self + Ok(Ok(Ok(response))) => self .handle_block_response(Ok(response)) .expect("never returns Err for Ok"), - Err(error) => { + // Handle fatal errors + Ok(Err(fatal_error)) => Err(fatal_error)?, + // Handle timeouts and block errors + Err(error) | Ok(Ok(Err(error))) => { // TODO: exit syncer on permanent service errors (NetworkError, VerifierError) if Self::should_restart_sync(&error) { warn!( @@ -963,6 +1004,20 @@ where Ok(()) } + /// Try to download and verify the genesis block once. + /// + /// Fatal errors are returned in the outer result, temporary errors in the inner one. + async fn request_genesis_once( + &mut self, + ) -> Result, Report> { + let response = self.downloads.download_and_verify(self.genesis_hash).await; + Self::handle_response(response).map_err(|e| eyre!(e))?; + + let response = self.downloads.next().await.expect("downloads is nonempty"); + + Ok(response) + } + /// Queue download and verify tasks for each block that isn't currently known to our node. /// /// TODO: turn obtain and extend tips into a separate task, which sends hashes via a channel? diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index f43d80052..c3322d089 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -4,7 +4,7 @@ use std::{ collections::HashMap, convert::{self, TryFrom}, pin::Pin, - sync::{Arc, TryLockError}, + sync::Arc, task::{Context, Poll}, }; @@ -154,6 +154,17 @@ pub enum BlockDownloadVerifyError { height: block::Height, hash: block::Hash, }, + + #[error( + "timeout during service readiness, download, verification, or internal downloader operation" + )] + Timeout, +} + +impl From for BlockDownloadVerifyError { + fn from(_value: tokio::time::error::Elapsed) -> Self { + BlockDownloadVerifyError::Timeout + } } /// Represents a [`Stream`] of download and verification tasks during chain sync. @@ -471,17 +482,12 @@ where metrics::counter!("sync.max.height.limit.paused.count", 1); } else if block_height <= lookahead_reset_height && past_lookahead_limit_receiver.cloned_watch_data() { - // Try to reset the watched value to false, since we're well under the limit. - match past_lookahead_limit_sender.try_lock() { - Ok(watch_sender_guard) => { - // If Zebra is shutting down, ignore the send error. - let _ = watch_sender_guard.send(true); - metrics::counter!("sync.max.height.limit.reset.count", 1); - }, - Err(TryLockError::Poisoned(_)) => panic!("thread panicked while holding the past_lookahead_limit_sender mutex guard"), - // We'll try allowing new downloads when we get the next block - Err(TryLockError::WouldBlock) => {} - } + // Reset the watched value to false, since we're well under the limit. + // We need to block here, because if we don't the syncer can hang. + + // But if Zebra is shutting down, ignore the send error. + let _ = past_lookahead_limit_sender.lock().expect("thread panicked while holding the past_lookahead_limit_sender mutex guard").send(false); + metrics::counter!("sync.max.height.limit.reset.count", 1); metrics::counter!("sync.max.height.limit.reset.attempt.count", 1); } @@ -571,14 +577,26 @@ where pub fn cancel_all(&mut self) { // Replace the pending task list with an empty one and drop it. let _ = std::mem::take(&mut self.pending); + // Signal cancellation to all running tasks. // Since we already dropped the JoinHandles above, they should // fail silently. for (_hash, cancel) in self.cancel_handles.drain() { let _ = cancel.send(()); } + assert!(self.pending.is_empty()); assert!(self.cancel_handles.is_empty()); + + // Set the lookahead limit to false, since we're empty (so we're under the limit). + // + // It is ok to block here, because we're doing a reset and sleep anyway. + // But if Zebra is shutting down, ignore the send error. + let _ = self + .past_lookahead_limit_sender + .lock() + .expect("thread panicked while holding the past_lookahead_limit_sender mutex guard") + .send(false); } /// Get the number of currently in-flight download and verify tasks.