From 713851b68dd3a57fcf2ed2d2a6116379cecdb9cc Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Tue, 11 Aug 2020 06:26:42 -0700 Subject: [PATCH] filter out old gossip pull requests (#11448) * init * builds * stats * revert * tests * clippy * add some jitter * shorter jitter timer * update * fixup! update * use saturating_sub * fix filters --- core/src/cluster_info.rs | 2 +- core/src/crds_gossip.rs | 3 +- core/src/crds_gossip_pull.rs | 109 +++++++++++++++++++++++++++++++---- core/tests/crds_gossip.rs | 2 +- 4 files changed, 102 insertions(+), 14 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4e7d08ccf2..bbbc47d2af 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1746,7 +1746,7 @@ impl ClusterInfo { "generate_pull_responses", &self.stats.generate_pull_responses, ) - .generate_pull_responses(&caller_and_filters); + .generate_pull_responses(&caller_and_filters, now); self.time_gossip_write_lock("process_pull_reqs", &self.stats.process_pull_requests) .process_pull_requests(caller_and_filters, now); diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 817701b61a..53520930f2 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -159,8 +159,9 @@ impl CrdsGossip { pub fn generate_pull_responses( &self, filters: &[(CrdsValue, CrdsFilter)], + now: u64, ) -> Vec> { - self.pull.generate_pull_responses(&self.crds, filters) + self.pull.generate_pull_responses(&self.crds, filters, now) } pub fn filter_pull_responses( diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 11a2ef8afe..e3fc75f6e1 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -237,8 +237,9 @@ impl CrdsGossipPull { &self, crds: &Crds, requests: &[(CrdsValue, CrdsFilter)], + now: u64, ) -> Vec> { - self.filter_crds_values(crds, requests) + self.filter_crds_values(crds, requests, now) } // Checks if responses should be inserted and @@ -371,15 +372,45 @@ impl CrdsGossipPull { &self, crds: &Crds, filters: &[(CrdsValue, CrdsFilter)], + now: u64, ) -> Vec> { let mut ret = vec![vec![]; filters.len()]; - for v in crds.table.values() { - filters.iter().enumerate().for_each(|(i, (_, filter))| { - if !filter.contains(&v.value_hash) { - ret[i].push(v.value.clone()); - } - }); + let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; + let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4); + let start = filters.len(); + //skip filters from callers that are too old + let future = now.saturating_add(msg_timeout); + let past = now.saturating_sub(msg_timeout); + let recent: Vec<_> = filters + .iter() + .filter(|(caller, _)| caller.wallclock() < future && caller.wallclock() >= past) + .collect(); + inc_new_counter_info!( + "gossip_filter_crds_values-dropped_requests", + start - recent.len() + ); + if recent.is_empty() { + return ret; } + let mut total_skipped = 0; + for v in crds.table.values() { + recent + .iter() + .enumerate() + .for_each(|(i, (caller, filter))| { + //skip values that are too new + if v.value.wallclock() + > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0) + { + total_skipped += 1; + return; + } + if !filter.contains(&v.value_hash) { + ret[i].push(v.value.clone()); + } + }); + } + inc_new_counter_info!("gossip_filter_crds_values-dropped_values", total_skipped); ret } pub fn make_timeouts_def( @@ -636,6 +667,62 @@ mod test { } } + #[test] + fn test_generate_pull_responses() { + let mut node_crds = Crds::default(); + let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( + &Pubkey::new_rand(), + 0, + ))); + let node_pubkey = entry.label().pubkey(); + let node = CrdsGossipPull::default(); + node_crds.insert(entry, 0).unwrap(); + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( + &Pubkey::new_rand(), + 0, + ))); + node_crds.insert(new, 0).unwrap(); + let req = node.new_pull_request( + &node_crds, + &node_pubkey, + 0, + 0, + &HashMap::new(), + PACKET_DATA_SIZE, + ); + + let mut dest_crds = Crds::default(); + let dest = CrdsGossipPull::default(); + let (_, filters, caller) = req.unwrap(); + let mut filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); + let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); + + assert_eq!(rsp[0].len(), 0); + + let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( + &Pubkey::new_rand(), + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + ))); + dest_crds + .insert(new, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS) + .unwrap(); + + //should skip new value since caller is to old + let rsp = + dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS); + assert_eq!(rsp[0].len(), 0); + + //should return new value since caller is new + filters[0].0 = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( + &Pubkey::new_rand(), + CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS + 1, + ))); + + let rsp = + dest.generate_pull_responses(&dest_crds, &filters, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS); + assert_eq!(rsp[0].len(), 1); + } + #[test] fn test_process_pull_request() { let mut node_crds = Crds::default(); @@ -664,7 +751,7 @@ mod test { let mut dest = CrdsGossipPull::default(); let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let rsp = dest.generate_pull_responses(&dest_crds, &filters); + let rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); dest.process_pull_requests(&mut dest_crds, filters, 1); assert!(rsp.iter().all(|rsp| rsp.is_empty())); assert!(dest_crds.lookup(&caller.label()).is_some()); @@ -688,7 +775,7 @@ mod test { let mut node_crds = Crds::default(); let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), - 0, + 1, ))); let node_pubkey = entry.label().pubkey(); let mut node = CrdsGossipPull::default(); @@ -696,7 +783,7 @@ mod test { let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost( &Pubkey::new_rand(), - 0, + 1, ))); node_crds.insert(new, 0).unwrap(); @@ -735,7 +822,7 @@ mod test { ); let (_, filters, caller) = req.unwrap(); let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect(); - let mut rsp = dest.generate_pull_responses(&dest_crds, &filters); + let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, 0); dest.process_pull_requests(&mut dest_crds, filters, 0); // if there is a false positive this is empty // prob should be around 0.1 per iteration diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 15e133812f..6384b5cf48 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -436,7 +436,7 @@ fn network_run_pull( let rsp = node .lock() .unwrap() - .generate_pull_responses(&filters) + .generate_pull_responses(&filters, now) .into_iter() .flatten() .collect();