Gossip PullRequests tend to return a lot of duplicates. (#10326)
* filter messages that are likely to be pushed from the response * tests * tests * wait to start filtering responses, and push stats to influx * wait to start filtering responses, and push stats to influx * reduce the timers to match the publish self timeout * fmt * fmt
This commit is contained in:
parent
3cea73cf14
commit
31e20eff82
|
@ -251,6 +251,7 @@ pub struct ClusterInfo {
|
||||||
my_contact_info: RwLock<ContactInfo>,
|
my_contact_info: RwLock<ContactInfo>,
|
||||||
id: Pubkey,
|
id: Pubkey,
|
||||||
stats: GossipStats,
|
stats: GossipStats,
|
||||||
|
start_time: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
|
@ -391,6 +392,7 @@ impl ClusterInfo {
|
||||||
let id = contact_info.id;
|
let id = contact_info.id;
|
||||||
let me = Self {
|
let me = Self {
|
||||||
gossip: RwLock::new(CrdsGossip::default()),
|
gossip: RwLock::new(CrdsGossip::default()),
|
||||||
|
start_time: timestamp(),
|
||||||
keypair,
|
keypair,
|
||||||
entrypoint: RwLock::new(None),
|
entrypoint: RwLock::new(None),
|
||||||
outbound_budget: RwLock::new(DataBudget {
|
outbound_budget: RwLock::new(DataBudget {
|
||||||
|
@ -425,6 +427,7 @@ impl ClusterInfo {
|
||||||
my_contact_info: RwLock::new(my_contact_info),
|
my_contact_info: RwLock::new(my_contact_info),
|
||||||
id: *new_id,
|
id: *new_id,
|
||||||
stats: GossipStats::default(),
|
stats: GossipStats::default(),
|
||||||
|
start_time: self.start_time,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1679,9 +1682,18 @@ impl ClusterInfo {
|
||||||
let now = timestamp();
|
let now = timestamp();
|
||||||
let self_id = me.id();
|
let self_id = me.id();
|
||||||
|
|
||||||
|
//skip messages that are likely to be pushed
|
||||||
|
let min_filter_time = me.start_time + 10 * CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
||||||
|
let push_timer = if min_filter_time < now {
|
||||||
|
// reason for / 3 is to allow push_self which has a /2 timeout to propagate
|
||||||
|
// first through push before responding with those values.
|
||||||
|
Some(now - CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 3)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let pull_responses = me
|
let pull_responses = me
|
||||||
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
|
.time_gossip_read_lock("generate_pull_responses", &me.stats.generate_pull_responses)
|
||||||
.generate_pull_responses(&caller_and_filters);
|
.generate_pull_responses(&caller_and_filters, push_timer);
|
||||||
|
|
||||||
me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
|
me.time_gossip_write_lock("process_pull_reqs", &me.stats.process_pull_requests)
|
||||||
.process_pull_requests(caller_and_filters, now);
|
.process_pull_requests(caller_and_filters, now);
|
||||||
|
@ -2060,6 +2072,16 @@ impl ClusterInfo {
|
||||||
self.stats.generate_pull_responses.clear(),
|
self.stats.generate_pull_responses.clear(),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"process_pull_response_fail",
|
||||||
|
self.stats.process_pull_response_fail.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"process_pull_response_success",
|
||||||
|
self.stats.process_pull_response_success.clear(),
|
||||||
|
i64
|
||||||
|
),
|
||||||
("process_prune", self.stats.process_prune.clear(), i64),
|
("process_prune", self.stats.process_prune.clear(), i64),
|
||||||
(
|
(
|
||||||
"process_push_message",
|
"process_push_message",
|
||||||
|
|
|
@ -166,8 +166,9 @@ impl CrdsGossip {
|
||||||
pub fn generate_pull_responses(
|
pub fn generate_pull_responses(
|
||||||
&self,
|
&self,
|
||||||
filters: &[(CrdsValue, CrdsFilter)],
|
filters: &[(CrdsValue, CrdsFilter)],
|
||||||
|
now: Option<u64>,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
self.pull.generate_pull_responses(&self.crds, filters)
|
self.pull.generate_pull_responses(&self.crds, filters, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// process a pull response
|
/// process a pull response
|
||||||
|
|
|
@ -100,6 +100,9 @@ impl CrdsFilter {
|
||||||
accum
|
accum
|
||||||
}
|
}
|
||||||
pub fn test_mask(&self, item: &Hash) -> bool {
|
pub fn test_mask(&self, item: &Hash) -> bool {
|
||||||
|
if self.mask_bits == 0 {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
// only consider the highest mask_bits bits from the hash and set the rest to 1.
|
// only consider the highest mask_bits bits from the hash and set the rest to 1.
|
||||||
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
|
let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64);
|
||||||
let bits = Self::hash_as_u64(item) | ones;
|
let bits = Self::hash_as_u64(item) | ones;
|
||||||
|
@ -227,8 +230,9 @@ impl CrdsGossipPull {
|
||||||
&self,
|
&self,
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
requests: &[(CrdsValue, CrdsFilter)],
|
requests: &[(CrdsValue, CrdsFilter)],
|
||||||
|
now: Option<u64>,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
self.filter_crds_values(crds, requests)
|
Self::filter_crds_values(crds, requests, now)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// process a pull response
|
/// process a pull response
|
||||||
|
@ -319,12 +323,17 @@ impl CrdsGossipPull {
|
||||||
}
|
}
|
||||||
/// filter values that fail the bloom filter up to max_bytes
|
/// filter values that fail the bloom filter up to max_bytes
|
||||||
fn filter_crds_values(
|
fn filter_crds_values(
|
||||||
&self,
|
|
||||||
crds: &Crds,
|
crds: &Crds,
|
||||||
filters: &[(CrdsValue, CrdsFilter)],
|
filters: &[(CrdsValue, CrdsFilter)],
|
||||||
|
now: Option<u64>,
|
||||||
) -> Vec<Vec<CrdsValue>> {
|
) -> Vec<Vec<CrdsValue>> {
|
||||||
let mut ret = vec![vec![]; filters.len()];
|
let mut ret = vec![vec![]; filters.len()];
|
||||||
|
let now = now.unwrap_or(u64::MAX);
|
||||||
for v in crds.table.values() {
|
for v in crds.table.values() {
|
||||||
|
//skip messages that are newer than now
|
||||||
|
if v.insert_timestamp > now {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
filters.iter().enumerate().for_each(|(i, (_, filter))| {
|
filters.iter().enumerate().for_each(|(i, (_, filter))| {
|
||||||
if !filter.contains(&v.value_hash) {
|
if !filter.contains(&v.value_hash) {
|
||||||
ret[i].push(v.value.clone());
|
ret[i].push(v.value.clone());
|
||||||
|
@ -587,7 +596,7 @@ mod test {
|
||||||
let mut dest = CrdsGossipPull::default();
|
let mut dest = CrdsGossipPull::default();
|
||||||
let (_, filters, caller) = req.unwrap();
|
let (_, filters, caller) = req.unwrap();
|
||||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let rsp = dest.generate_pull_responses(&dest_crds, &filters);
|
let rsp = dest.generate_pull_responses(&dest_crds, &filters, None);
|
||||||
dest.process_pull_requests(&mut dest_crds, filters, 1);
|
dest.process_pull_requests(&mut dest_crds, filters, 1);
|
||||||
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
|
assert!(rsp.iter().all(|rsp| rsp.is_empty()));
|
||||||
assert!(dest_crds.lookup(&caller.label()).is_some());
|
assert!(dest_crds.lookup(&caller.label()).is_some());
|
||||||
|
@ -658,7 +667,7 @@ mod test {
|
||||||
);
|
);
|
||||||
let (_, filters, caller) = req.unwrap();
|
let (_, filters, caller) = req.unwrap();
|
||||||
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
let filters: Vec<_> = filters.into_iter().map(|f| (caller.clone(), f)).collect();
|
||||||
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters);
|
let mut rsp = dest.generate_pull_responses(&dest_crds, &filters, None);
|
||||||
dest.process_pull_requests(&mut dest_crds, filters, 0);
|
dest.process_pull_requests(&mut dest_crds, filters, 0);
|
||||||
// if there is a false positive this is empty
|
// if there is a false positive this is empty
|
||||||
// prob should be around 0.1 per iteration
|
// prob should be around 0.1 per iteration
|
||||||
|
@ -700,6 +709,37 @@ mod test {
|
||||||
}
|
}
|
||||||
assert!(done);
|
assert!(done);
|
||||||
}
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_filter_crds_values() {
|
||||||
|
let mut node_crds = Crds::default();
|
||||||
|
let entry = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
|
&Pubkey::new_rand(),
|
||||||
|
0,
|
||||||
|
)));
|
||||||
|
|
||||||
|
let caller = CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
|
||||||
|
&Pubkey::new_rand(),
|
||||||
|
0,
|
||||||
|
)));
|
||||||
|
|
||||||
|
let node_label = entry.label();
|
||||||
|
let node_pubkey = node_label.pubkey();
|
||||||
|
node_crds.insert(entry, 1).unwrap();
|
||||||
|
let mut filter = CrdsFilter::new_rand(1, 128);
|
||||||
|
filter.mask_bits = 0;
|
||||||
|
let requests = [(caller, filter)];
|
||||||
|
|
||||||
|
let rsp = CrdsGossipPull::filter_crds_values(&node_crds, &requests, None);
|
||||||
|
assert_eq!(rsp[0][0].pubkey(), node_pubkey);
|
||||||
|
|
||||||
|
//skip 1 since its newer than 0
|
||||||
|
let rsp = CrdsGossipPull::filter_crds_values(&node_crds, &requests, Some(0));
|
||||||
|
assert!(rsp[0].is_empty());
|
||||||
|
|
||||||
|
let rsp = CrdsGossipPull::filter_crds_values(&node_crds, &requests, Some(1));
|
||||||
|
assert_eq!(rsp[0][0].pubkey(), node_pubkey);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_gossip_purge() {
|
fn test_gossip_purge() {
|
||||||
let mut node_crds = Crds::default();
|
let mut node_crds = Crds::default();
|
||||||
|
@ -763,6 +803,15 @@ mod test {
|
||||||
let h: Hash = hash(h.as_ref());
|
let h: Hash = hash(h.as_ref());
|
||||||
assert!(!filter.contains(&h));
|
assert!(!filter.contains(&h));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_crds_filter_test_mask_default() {
|
||||||
|
let filter = CrdsFilter::default();
|
||||||
|
assert_eq!(filter.mask_bits, 0);
|
||||||
|
let h: Hash = Hash::default();
|
||||||
|
assert_eq!(filter.test_mask(&h), true);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_crds_filter_add_mask() {
|
fn test_crds_filter_add_mask() {
|
||||||
let mut filter = CrdsFilter::new_rand(1000, 10);
|
let mut filter = CrdsFilter::new_rand(1000, 10);
|
||||||
|
|
|
@ -436,7 +436,7 @@ fn network_run_pull(
|
||||||
let rsp = node
|
let rsp = node
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.generate_pull_responses(&filters)
|
.generate_pull_responses(&filters, None)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
.flatten()
|
||||||
.collect();
|
.collect();
|
||||||
|
|
Loading…
Reference in New Issue