filter out old gossip pull requests (#11448)

* init

* builds

* stats

* revert

* tests

* clippy

* add some jitter

* shorter jitter timer

* update

* fixup! update

* use saturating_sub

* fix filters
This commit is contained in:
anatoly yakovenko 2020-08-11 06:26:42 -07:00 committed by GitHub
parent f12fc66a69
commit 713851b68d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 14 deletions

View File

@ -1746,7 +1746,7 @@ impl ClusterInfo {
"generate_pull_responses",
&self.stats.generate_pull_responses,
)
.generate_pull_responses(&caller_and_filters);
.generate_pull_responses(&caller_and_filters, now);
self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests)
.process_pull_requests(caller_and_filters, now);

View File

@ -159,8 +159,9 @@ impl CrdsGossip {
pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
now: u64,
) -> Vec<Vec<CrdsValue>> {
self.pull.generate_pull_responses(&self.crds, filters)
self.pull.generate_pull_responses(&self.crds, filters, now)
}
pub fn filter_pull_responses(

View File

@ -237,8 +237,9 @@ impl CrdsGossipPull {
&self,
crds: &Crds,
requests: &[(CrdsValue, CrdsFilter)],
now: u64,
) -> Vec<Vec<CrdsValue>> {
self.filter_crds_values(crds, requests)
self.filter_crds_values(crds, requests, now)
}
// Checks if responses should be inserted and
@ -371,15 +372,45 @@ impl CrdsGossipPull {
&self,
crds: &Crds,
filters: &[(CrdsValue, CrdsFilter)],
now: u64,
) -> Vec<Vec<CrdsValue>> {
let mut ret = vec![vec![]; filters.len()];
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4);
let start = filters.len();
//skip filters from callers that are too old
let future = now.saturating_add(msg_timeout);
let past = now.saturating_sub(msg_timeout);
let recent: Vec<_> = filters
.iter()
.filter(|(caller, _)| caller.wallclock() < future && caller.wallclock() >= past)
.collect();
inc_new_counter_info!(
"gossip_filter_crds_values-dropped_requests",
start - recent.len()
);
if recent.is_empty() {
return ret;
}
let mut total_skipped = 0;
for v in crds.table.values() {
filters.iter().enumerate().for_each(|(i, (_, filter))| {
recent
.iter()
.enumerate()
.for_each(|(i, (caller, filter))| {
//skip values that are too new
if v.value.wallclock()
> caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0)
{
total_skipped += 1;
return;
}
if !filter.contains(&v.value_hash) {
ret[i].push(v.value.clone());
}
});
}
inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped);
ret
}
pub fn make_timeouts_def(
@ -636,6 +667,62 @@ mod test {
}
}
#[test]
fn test_generate_pull_responses() {
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
let node_pubkey = entry.label().pubkey();
let node = CrdsGossipPull::default();
node_crds.insert(entry, 0).unwrap();
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
)));
node_crds.insert(new, 0).unwrap();
let req = node.new_pull_request(
&node_crds,
&node_pubkey,
0,
0,
&HashMap::new(),
PACKET_DATA_SIZE,
);
let mut dest_crds = Crds::default();
let dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap();
let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
assert_eq!(rsp[0].len(), 0);
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
)));
dest_crds
.insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS)
.unwrap();
//should skip new value since caller is to old
let rsp =
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
assert_eq!(rsp[0].len(), 0);
//should return new value since caller is new
filters[0].0 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1,
)));
let rsp =
dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS);
assert_eq!(rsp[0].len(), 1);
}
#[test]
fn test_process_pull_request() {
let mut node_crds = Crds::default();
@ -664,7 +751,7 @@ mod test {
let mut dest = CrdsGossipPull::default();
let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let rsp = dest.generate_pull_responses(&dest_crds, &filters);
let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
dest.process_pull_requests(&mut dest_crds, filters, 1);
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
assert!(dest_crds.lookup(&caller.label()).is_some());
@ -688,7 +775,7 @@ mod test {
let mut node_crds = Crds::default();
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
1,
)));
let node_pubkey = entry.label().pubkey();
let mut node = CrdsGossipPull::default();
@ -696,7 +783,7 @@ mod test {
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::new_rand(),
0,
1,
)));
node_crds.insert(new, 0).unwrap();
@ -735,7 +822,7 @@ mod test {
);
let (_, filters, caller) = req.unwrap();
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters);
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0);
dest.process_pull_requests(&mut dest_crds, filters, 0);
// if there is a false positive this is empty
// prob should be around 0.1 per iteration

View File

@ -436,7 +436,7 @@ fn network_run_pull(
let rsp = node
.lock()
.unwrap()
.generate_pull_responses(&filters)
.generate_pull_responses(&filters, now)
.into_iter()
.flatten()
.collect();