limits number of crds values returned when responding to pull requests (#13739)

Crds values buffered when responding to pull-requests can be very large taking a lot of memory.
Added a limit for number of buffered crds values based on outbound data budget.
This commit is contained in:
behzad nouri 2020-12-18 18:45:12 +00:00 committed by GitHub
parent 6a3797e164
commit 691031fefd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 38 deletions

View File

@ -107,6 +107,8 @@ const GOSSIP_PING_TOKEN_SIZE: usize = 32;
const GOSSIP_PING_CACHE_CAPACITY: usize = 16384;
const GOSSIP_PING_CACHE_TTL: Duration = Duration::from_secs(640);
pub const DEFAULT_CONTACT_DEBUG_INTERVAL: u64 = 10_000;
/// Minimum serialized size of a Protocol::PullResponse packet.
const PULL_RESPONSE_MIN_SERIALIZED_SIZE: usize = 167;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@ -1973,7 +1975,7 @@ impl ClusterInfo {
}
}
fn update_data_budget(&self, num_staked: usize) {
fn update_data_budget(&self, num_staked: usize) -> usize {
const INTERVAL_MS: u64 = 100;
// allow 50kBps per staked validator, epoch slots + votes ~= 1.5kB/slot ~= 4kB/s
const BYTES_PER_INTERVAL: usize = 5000;
@ -1984,7 +1986,7 @@ impl ClusterInfo {
bytes + num_staked * BYTES_PER_INTERVAL,
MAX_BUDGET_MULTIPLE * num_staked * BYTES_PER_INTERVAL,
)
});
})
}
// Returns a predicate checking if the pull request is from a valid
@ -2045,7 +2047,8 @@ impl ClusterInfo {
let callers = crds_value::filter_current(requests.iter().map(|r| &r.caller));
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
.process_pull_requests(callers.cloned(), timestamp());
self.update_data_budget(stakes.len());
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng();
@ -2065,7 +2068,7 @@ impl ClusterInfo {
"generate_pull_responses",
&self.stats.generate_pull_responses,
)
.generate_pull_responses(&caller_and_filters, now);
.generate_pull_responses(&caller_and_filters, output_size_limit, now);
let pull_responses: Vec<_> = pull_responses
.into_iter()
@ -3466,6 +3469,17 @@ mod tests {
);
}
#[test]
fn test_pull_response_min_serialized_size() {
let mut rng = rand::thread_rng();
for _ in 0..100 {
let crds_values = vec![CrdsValue::new_rand(&mut rng, None)];
let pull_response = Protocol::PullResponse(Pubkey::new_unique(), crds_values);
let size = serialized_size(&pull_response).unwrap();
assert!(PULL_RESPONSE_MIN_SERIALIZED_SIZE as u64 <= size);
}
}
#[test]
fn test_cluster_spy_gossip() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();

View File

@ -184,9 +184,11 @@ impl CrdsGossip {
pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
) -> Vec<Vec<CrdsValue>> {
self.pull.generate_pull_responses(&self.crds, filters, now)
self.pull
.generate_pull_responses(&self.crds, filters, output_size_limit, now)
}
pub fn filter_pull_responses(

View File

@ -14,6 +14,7 @@ use crate::crds::{Crds, VersionedCrdsValue};
use crate::crds_gossip::{get_stake, get_weight, CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS};
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use itertools::Itertools;
use rand::distributions::{Distribution, WeightedIndex};
use rand::Rng;
use rayon::{prelude::*, ThreadPool};
@ -304,9 +305,10 @@ impl CrdsGossipPull {
&self,
crds: &Crds,
requests: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
) -> Vec<Vec<CrdsValue>> {
self.filter_crds_values(crds, requests, now)
self.filter_crds_values(crds, requests, output_size_limit, now)
}
// Checks if responses should be inserted and
@ -474,6 +476,7 @@ impl CrdsGossipPull {
&self,
crds: &Crds,
filters: &[(CrdsValue, CrdsFilter)],
mut output_size_limit: usize, // Limit number of crds values returned.
now: u64,
) -> Vec<Vec<CrdsValue>> {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
@ -483,16 +486,20 @@ impl CrdsGossipPull {
let past = now.saturating_sub(msg_timeout);
let mut dropped_requests = 0;
let mut total_skipped = 0;
let ret = filters
let ret: Vec<_> = filters
.iter()
.map(|(caller, filter)| {
if output_size_limit == 0 {
return None;
}
let caller_wallclock = caller.wallclock();
if caller_wallclock >= future || caller_wallclock < past {
dropped_requests += 1;
return vec![];
return Some(vec![]);
}
let caller_wallclock = caller_wallclock.checked_add(jitter).unwrap_or(0);
crds.filter_bitmask(filter.mask, filter.mask_bits)
let out: Vec<_> = crds
.filter_bitmask(filter.mask, filter.mask_bits)
.filter_map(|item| {
debug_assert!(filter.test_mask(&item.value_hash));
//skip values that are too new
@ -505,12 +512,16 @@ impl CrdsGossipPull {
Some(item.value.clone())
}
})
.collect()
.take(output_size_limit)
.collect();
output_size_limit -= out.len();
Some(out)
})
.while_some()
.collect();
inc_new_counter_info!(
"gossip_filter_crds_values-dropped_requests",
dropped_requests
dropped_requests + filters.len() - ret.len()
);
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
ret
@ -1029,7 +1040,12 @@ mod test {
let dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap();
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
let rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
0,
);
assert_eq!(rsp[0].len(), 0);
@ -1042,8 +1058,12 @@ mod test {
.unwrap();
//should skip new value since caller is to old
let rsp =
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
let rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
);
assert_eq!(rsp[0].len(), 0);
assert_eq!(filters.len(), 1);
@ -1054,8 +1074,12 @@ mod test {
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1,
)));
let rsp =
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
let rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
);
assert_eq!(rsp.len(), 2);
assert_eq!(rsp[0].len(), 0);
assert_eq!(rsp[1].len(), 1); // Orders are also preserved.
@ -1092,7 +1116,12 @@ mod test {
let mut dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
let rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
0,
);
dest.process_pull_requests(
&mut dest_crds,
filters.into_iter().map(|(caller, _)| caller),
@ -1170,7 +1199,12 @@ mod test {
);
let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
let mut rsp = dest.generate_pull_responses(
&dest_crds,
&filters,
/*output_size_limit=*/ usize::MAX,
0,
);
dest.process_pull_requests(
&mut dest_crds,
filters.into_iter().map(|(caller, _)| caller),

View File

@ -52,27 +52,28 @@ impl DataBudget {
}
}
// Updates the budget if at least given milliseconds has passed since last
// update. Updater function maps current value of bytes to the new one.
pub fn update<F>(&self, duration_millis: u64, updater: F)
/// Updates the budget if at least given milliseconds has passed since last
/// update. Updater function maps current value of bytes to the new one.
/// Returns current data-budget after the update.
pub fn update<F>(&self, duration_millis: u64, updater: F) -> usize
where
F: Fn(usize) -> usize,
{
if !self.can_update(duration_millis) {
return;
}
let mut bytes = self.bytes.load(Ordering::Acquire);
loop {
match self.bytes.compare_exchange_weak(
bytes,
updater(bytes),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(b) => bytes = b,
if self.can_update(duration_millis) {
let mut bytes = self.bytes.load(Ordering::Acquire);
loop {
match self.bytes.compare_exchange_weak(
bytes,
updater(bytes),
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(b) => bytes = b,
}
}
}
self.bytes.load(Ordering::Acquire)
}
// Non-atomic clone only for tests and simulations.
@ -94,16 +95,16 @@ mod tests {
let budget = DataBudget::default();
assert!(!budget.take(1)); // budget = 0.
budget.update(1000, |bytes| bytes + 5); // budget updates to 5.
assert_eq!(budget.update(1000, |bytes| bytes + 5), 5); // budget updates to 5.
assert!(budget.take(1));
assert!(budget.take(2));
assert!(!budget.take(3)); // budget = 2, out of budget.
budget.update(30, |_| 10); // no update, budget = 2.
assert_eq!(budget.update(30, |_| 10), 2); // no update, budget = 2.
assert!(!budget.take(3)); // budget = 2, out of budget.
std::thread::sleep(Duration::from_millis(50));
budget.update(30, |bytes| bytes * 2); // budget updates to 4.
assert_eq!(budget.update(30, |bytes| bytes * 2), 4); // budget updates to 4.
assert!(budget.take(3));
assert!(budget.take(1));

View File

@ -458,7 +458,11 @@ fn network_run_pull(
let rsp = node
.lock()
.unwrap()
.generate_pull_responses(&filters, now)
.generate_pull_responses(
&filters,
/*output_size_limit=*/ usize::MAX,
now,
)
.into_iter()
.flatten()
.collect();