diff --git a/zebrad/src/components/inbound/downloads.rs b/zebrad/src/components/inbound/downloads.rs index d138a1e99..97192d28b 100644 --- a/zebrad/src/components/inbound/downloads.rs +++ b/zebrad/src/components/inbound/downloads.rs @@ -21,6 +21,30 @@ use zebra_state as zs; type BoxError = Box; +/// The maximum number of concurrent inbound download and verify tasks. +/// +/// Set to one and a half checkpoint intervals, so that the inbound queue can +/// hold a complete checkpoint interval, if needed. We expect the syncer to +/// download and verify checkpoints, so this bound might never be reached. +const MAX_INBOUND_CONCURRENCY: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3 / 2; + +/// The action taken in response to a peer's gossipped block hash. +pub enum DownloadAction { + /// The block hash was successfully queued for download and verification. + AddedToQueue, + + /// The block hash is already queued, so this request was ignored. + /// + /// Another peer has already gossipped the same hash to us. + AlreadyQueued, + + /// The queue is at capacity, so this request was ignored. + /// + /// The sync service should discover this block later, when we are closer + /// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`]. + FullQueue, +} + /// Manages download and verification of blocks gossiped to this peer. #[pin_project] #[derive(Debug)] @@ -116,12 +140,27 @@ where /// Queue a block for download and verification. /// - /// Returns true if the block was newly queued, and false if it was already queued. + /// Returns the action taken in response to the queue request. #[instrument(skip(self, hash), fields(hash = %hash))] - pub fn download_and_verify(&mut self, hash: block::Hash) -> bool { + pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction { if self.cancel_handles.contains_key(&hash) { - tracing::debug!("hash already queued for download"); - return false; + tracing::debug!( + ?hash, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "block hash already queued for inbound download: ignored block" + ); + return DownloadAction::AlreadyQueued; + } + + if self.pending.len() >= MAX_INBOUND_CONCURRENCY { + tracing::info!( + ?hash, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "too many blocks queued for inbound download: ignored block" + ); + return DownloadAction::FullQueue; } // This oneshot is used to signal cancellation to the download task. @@ -182,7 +221,14 @@ where "blocks are only queued once" ); - tracing::debug!("queued hash for download"); - true + tracing::debug!( + ?hash, + queue_len = self.pending.len(), + ?MAX_INBOUND_CONCURRENCY, + "queued hash for download" + ); + metrics::gauge!("gossip.queued.block.count", self.pending.len() as _); + + DownloadAction::AddedToQueue } }