Limit concurrent inbound gossipped block requests

Uses the "load shed directly" design pattern from #1618.
This commit is contained in:
teor 2021-01-22 20:43:00 +10:00
parent 3d9888f736
commit 21b0360114
1 changed files with 52 additions and 6 deletions

View File

@ -21,6 +21,30 @@ use zebra_state as zs;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// The maximum number of concurrent inbound download and verify tasks.
///
/// Set to one and a half checkpoint intervals, so that the inbound queue can
/// hold a complete checkpoint interval, if needed. We expect the syncer to
/// download and verify checkpoints, so this bound might never be reached.
const MAX_INBOUND_CONCURRENCY: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP * 3 / 2;
/// The action taken in response to a peer's gossipped block hash.
pub enum DownloadAction {
/// The block hash was successfully queued for download and verification.
AddedToQueue,
/// The block hash is already queued, so this request was ignored.
///
/// Another peer has already gossipped the same hash to us.
AlreadyQueued,
/// The queue is at capacity, so this request was ignored.
///
/// The sync service should discover this block later, when we are closer
/// to the tip. The queue's capacity is [`MAX_INBOUND_CONCURRENCY`].
FullQueue,
}
/// Manages download and verification of blocks gossiped to this peer.
#[pin_project]
#[derive(Debug)]
@ -116,12 +140,27 @@ where
/// Queue a block for download and verification.
///
/// Returns true if the block was newly queued, and false if it was already queued.
/// Returns the action taken in response to the queue request.
#[instrument(skip(self, hash), fields(hash = %hash))]
pub fn download_and_verify(&mut self, hash: block::Hash) -> bool {
pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction {
if self.cancel_handles.contains_key(&hash) {
tracing::debug!("hash already queued for download");
return false;
tracing::debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"block hash already queued for inbound download: ignored block"
);
return DownloadAction::AlreadyQueued;
}
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
tracing::info!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many blocks queued for inbound download: ignored block"
);
return DownloadAction::FullQueue;
}
// This oneshot is used to signal cancellation to the download task.
@ -182,7 +221,14 @@ where
"blocks are only queued once"
);
tracing::debug!("queued hash for download");
true
tracing::debug!(
?hash,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued hash for download"
);
metrics::gauge!("gossip.queued.block.count", self.pending.len() as _);
DownloadAction::AddedToQueue
}
}