reduces number of gossip pull requests/responses (#29974)

This commit is contained in:
behzad nouri 2023-01-30 17:59:56 +00:00 committed by GitHub
parent 76ae62596a
commit 4cc07a176e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 41 additions and 37 deletions

View File

@ -1883,8 +1883,9 @@ impl ClusterInfo {
fn update_data_budget(&self, num_staked: usize) -> usize {
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
// epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
// Allow 10kB/s per staked validator.
const BYTES_PER_INTERVAL: usize = 1024;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default
let num_staked = num_staked.max(2);
self.outbound_budget.update(INTERVAL_MS, |bytes| {

View File

@ -54,6 +54,8 @@ pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
pub const CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS: u64 = 60000;
// Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
// Maximum number of pull requests to send out each time around.
const MAX_NUM_PULL_REQUESTS: usize = 1024;
pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64;
@ -159,10 +161,14 @@ impl CrdsFilterSet {
}
fn add(&self, hash_value: Hash) {
let index = CrdsFilter::hash_as_u64(&hash_value)
.checked_shr(64 - self.mask_bits)
.unwrap_or(0);
self.filters[index as usize].add(&hash_value);
let shift = u64::BITS.checked_sub(self.mask_bits).unwrap();
let index = usize::try_from(
CrdsFilter::hash_as_u64(&hash_value)
.checked_shr(shift)
.unwrap_or_default(),
)
.unwrap();
self.filters[index].add(&hash_value);
}
}
@ -266,8 +272,15 @@ impl CrdsGossipPull {
if nodes.is_empty() {
return Err(CrdsGossipError::NoPeers);
}
let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size);
if filters.len() > MAX_NUM_PULL_REQUESTS {
for i in 0..MAX_NUM_PULL_REQUESTS {
let j = rng.gen_range(i, filters.len());
filters.swap(i, j);
}
filters.truncate(MAX_NUM_PULL_REQUESTS);
}
// Associate each pull-request filter with a randomly selected peer.
let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
let dist = WeightedIndex::new(&weights).unwrap();
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
Ok(nodes.zip(filters).into_group_map())

View File

@ -49,8 +49,8 @@ const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
pub const CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS: u64 = 30000;
const CRDS_GOSSIP_PRUNE_MSG_TIMEOUT_MS: u64 = 500;
const CRDS_GOSSIP_PRUNE_STAKE_THRESHOLD_PCT: f64 = 0.15;
const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 3;
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT * 3;
const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT * 2;
pub struct CrdsGossipPush {
/// Max bytes per message

View File

@ -6,7 +6,7 @@ pub struct DataBudget {
bytes: AtomicUsize,
// Last time that we upped the bytes count, used
// to detect when to up the bytes budget again
last_timestamp_ms: AtomicU64,
asof: AtomicU64,
}
impl DataBudget {
@ -14,7 +14,7 @@ impl DataBudget {
pub fn restricted() -> Self {
Self {
bytes: AtomicUsize::default(),
last_timestamp_ms: AtomicU64::new(u64::MAX),
asof: AtomicU64::new(u64::MAX),
}
}
@ -22,19 +22,19 @@ impl DataBudget {
// the budget and returns true. Otherwise returns false.
#[must_use]
pub fn take(&self, size: usize) -> bool {
let mut budget = self.bytes.load(Ordering::Acquire);
let mut bytes = self.bytes.load(Ordering::Acquire);
loop {
if budget < size {
return false;
}
match self.bytes.compare_exchange_weak(
budget,
budget.saturating_sub(size),
bytes = match self.bytes.compare_exchange_weak(
bytes,
match bytes.checked_sub(size) {
None => return false,
Some(bytes) => bytes,
},
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(bytes) => budget = bytes,
Err(bytes) => bytes,
}
}
}
@ -43,21 +43,19 @@ impl DataBudget {
// has passed since last update. Otherwise returns false.
fn can_update(&self, duration_millis: u64) -> bool {
let now = solana_sdk::timing::timestamp();
let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire);
loop {
if now < last_timestamp.saturating_add(duration_millis) {
return false;
}
match self.last_timestamp_ms.compare_exchange_weak(
last_timestamp,
let mut asof = self.asof.load(Ordering::Acquire);
while asof.saturating_add(duration_millis) <= now {
asof = match self.asof.compare_exchange_weak(
asof,
now,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(ts) => last_timestamp = ts,
Err(asof) => asof,
}
}
false
}
/// Updates the budget if at least given milliseconds has passed since last
@ -70,27 +68,19 @@ impl DataBudget {
if self.can_update(duration_millis) {
let mut bytes = self.bytes.load(Ordering::Acquire);
loop {
match self.bytes.compare_exchange_weak(
bytes = match self.bytes.compare_exchange_weak(
bytes,
updater(bytes),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(b) => bytes = b,
Err(bytes) => bytes,
}
}
}
self.bytes.load(Ordering::Acquire)
}
// Non-atomic clone only for tests and simulations.
pub fn clone_non_atomic(&self) -> Self {
Self {
bytes: AtomicUsize::new(self.bytes.load(Ordering::Acquire)),
last_timestamp_ms: AtomicU64::new(self.last_timestamp_ms.load(Ordering::Acquire)),
}
}
}
#[cfg(test)]