implements DataBudget using atomics (#12856)

This commit is contained in:
behzad nouri 2020-10-15 11:33:58 +00:00 committed by GitHub
parent a44e4d386f
commit 05cf15a382
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 133 additions and 38 deletions

View File

@ -21,6 +21,7 @@ use crate::{
self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlotsIndex, LowestSlot, SnapshotHash,
Version, Vote, MAX_WALLCLOCK,
},
data_budget::DataBudget,
epoch_slots::EpochSlots,
result::{Error, Result},
weighted_shuffle::weighted_shuffle,
@ -103,12 +104,6 @@ pub enum ClusterInfoError {
BadContactInfo,
BadGossipAddress,
}
#[derive(Clone)]
pub struct DataBudget {
bytes: usize, // amount of bytes we have in the budget to send
last_timestamp_ms: u64, // Last time that we upped the bytes count,
// used to detect when to up the bytes budget again
}
struct GossipWriteLock<'a> {
gossip: RwLockWriteGuard<'a, CrdsGossip>,
@ -252,7 +247,7 @@ pub struct ClusterInfo {
pub(crate) keypair: Arc<Keypair>,
/// The network entrypoint
entrypoint: RwLock<Option<ContactInfo>>,
outbound_budget: RwLock<DataBudget>,
outbound_budget: DataBudget,
my_contact_info: RwLock<ContactInfo>,
id: Pubkey,
stats: GossipStats,
@ -407,10 +402,7 @@ impl ClusterInfo {
gossip: RwLock::new(CrdsGossip::default()),
keypair,
entrypoint: RwLock::new(None),
outbound_budget: RwLock::new(DataBudget {
bytes: 0,
last_timestamp_ms: 0,
}),
outbound_budget: DataBudget::default(),
my_contact_info: RwLock::new(contact_info),
id,
stats: GossipStats::default(),
@ -437,7 +429,7 @@ impl ClusterInfo {
gossip: RwLock::new(gossip),
keypair: self.keypair.clone(),
entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()),
outbound_budget: RwLock::new(self.outbound_budget.read().unwrap().clone()),
outbound_budget: self.outbound_budget.clone_non_atomic(),
my_contact_info: RwLock::new(my_contact_info),
id: *new_id,
stats: GossipStats::default(),
@ -1796,24 +1788,18 @@ impl ClusterInfo {
}
}
fn update_data_budget(&self, stakes: &HashMap<Pubkey, u64>) {
let mut w_outbound_budget = self.outbound_budget.write().unwrap();
let now = timestamp();
fn update_data_budget(&self, num_staked: usize) {
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default
if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS {
let len = std::cmp::max(stakes.len(), 2);
w_outbound_budget.bytes += len * BYTES_PER_INTERVAL;
w_outbound_budget.bytes = std::cmp::min(
w_outbound_budget.bytes,
MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL,
);
w_outbound_budget.last_timestamp_ms = now;
}
let num_staked = num_staked.max(2);
self.outbound_budget.update(INTERVAL_MS, |bytes| {
std::cmp::min(
bytes + num_staked * BYTES_PER_INTERVAL,
MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL,
)
});
}
// Pull requests take an incoming bloom filter of contained entries from a node
@ -1828,7 +1814,7 @@ impl ClusterInfo {
let mut caller_and_filters = vec![];
let mut addrs = vec![];
let mut time = Measure::start("handle_pull_requests");
self.update_data_budget(stakes);
self.update_data_budget(stakes.len());
for pull_data in requests {
caller_and_filters.push((pull_data.caller, pull_data.filter));
addrs.push(pull_data.from_addr);
@ -1908,17 +1894,13 @@ impl ClusterInfo {
let response = pull_responses[stat.to].0[stat.responses_index].clone();
let protocol = Protocol::PullResponse(self_id, vec![response]);
let new_packet = Packet::from_data(&from_addr, protocol);
{
let mut w_outbound_budget = self.outbound_budget.write().unwrap();
if w_outbound_budget.bytes > new_packet.meta.size {
sent.insert(index);
w_outbound_budget.bytes -= new_packet.meta.size;
total_bytes += new_packet.meta.size;
packets.packets.push(new_packet)
} else {
inc_new_counter_info!("gossip_pull_request-no_budget", 1);
break;
}
if self.outbound_budget.take(new_packet.meta.size) {
sent.insert(index);
total_bytes += new_packet.meta.size;
packets.packets.push(new_packet)
} else {
inc_new_counter_info!("gossip_pull_request-no_budget", 1);
break;
}
}
time.stop();

112
core/src/data_budget.rs Normal file
View File

@ -0,0 +1,112 @@
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[derive(Default)]
pub struct DataBudget {
// Amount of bytes we have in the budget to send.
bytes: AtomicUsize,
// Last time that we upped the bytes count, used
// to detect when to up the bytes budget again
last_timestamp_ms: AtomicU64,
}
impl DataBudget {
// If there are enough bytes in the budget, consumes from
// the budget and returns true. Otherwise returns false.
#[must_use]
pub fn take(&self, size: usize) -> bool {
let mut budget = self.bytes.load(Ordering::Acquire);
loop {
if budget < size {
return false;
}
match self.bytes.compare_exchange_weak(
budget,
budget - size,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(bytes) => budget = bytes,
}
}
}
// Updates timestamp and returns true, if at least given milliseconds
// has passed since last update. Otherwise returns false.
fn can_update(&self, duration_millis: u64) -> bool {
let now = solana_sdk::timing::timestamp();
let mut last_timestamp = self.last_timestamp_ms.load(Ordering::Acquire);
loop {
if now < last_timestamp + duration_millis {
return false;
}
match self.last_timestamp_ms.compare_exchange_weak(
last_timestamp,
now,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return true,
Err(ts) => last_timestamp = ts,
}
}
}
// Updates the budget if at least given milliseconds has passed since last
// update. Updater function maps current value of bytes to the new one.
pub fn update<F>(&self, duration_millis: u64, updater: F)
where
F: Fn(usize) -> usize,
{
if !self.can_update(duration_millis) {
return;
}
let mut bytes = self.bytes.load(Ordering::Acquire);
loop {
match self.bytes.compare_exchange_weak(
bytes,
updater(bytes),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(b) => bytes = b,
}
}
}
// Non-atomic clone only for tests and simulations.
pub fn clone_non_atomic(&self) -> Self {
Self {
bytes: AtomicUsize::new(self.bytes.load(Ordering::Acquire)),
last_timestamp_ms: AtomicU64::new(self.last_timestamp_ms.load(Ordering::Acquire)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_data_budget() {
let budget = DataBudget::default();
assert!(!budget.take(1)); // budget = 0.
budget.update(1000, |bytes| bytes + 5); // budget updates to 5.
assert!(budget.take(1));
assert!(budget.take(2));
assert!(!budget.take(3)); // budget = 2, out of budget.
budget.update(30, |_| 10); // no update, budget = 2.
assert!(!budget.take(3)); // budget = 2, out of budget.
std::thread::sleep(Duration::from_millis(50));
budget.update(30, |bytes| bytes * 2); // budget updates to 4.
assert!(budget.take(3));
assert!(budget.take(1));
assert!(!budget.take(1)); // budget = 0.
}
}

View File

@ -31,6 +31,7 @@ pub mod crds_gossip_pull;
pub mod crds_gossip_push;
pub mod crds_shards;
pub mod crds_value;
pub mod data_budget;
pub mod epoch_slots;
pub mod fetch_stage;
pub mod fork_choice;