down samples outgoing gossip pull requests (#33719)

Push message propagation has improved in recent versions of the gossip
code and we don't rely on pull requests as much as before. Handling pull
requests is also inefficient and expensive.
The commit reduces number of outgoing pull requests by down sampling.
This commit is contained in:
behzad nouri 2023-10-18 13:41:42 +00:00 committed by GitHub
parent 785959bbc3
commit c699bc9cab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 38 deletions

View File

@ -52,6 +52,6 @@ fn bench_build_crds_filters(bencher: &mut Bencher) {
let crds = RwLock::new(crds); let crds = RwLock::new(crds);
bencher.iter(|| { bencher.iter(|| {
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), 128); assert_eq!(filters.len(), 16);
}); });
} }

View File

@ -53,8 +53,6 @@ use {
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
// Retention period of hashes of received outdated values. // Retention period of hashes of received outdated values.
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000; const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
// Maximum number of pull requests to send out each time around.
const MAX_NUM_PULL_REQUESTS: usize = 1024;
pub const FALSE_RATE: f64 = 0.1f64; pub const FALSE_RATE: f64 = 0.1f64;
pub const KEYS: f64 = 8f64; pub const KEYS: f64 = 8f64;
@ -143,19 +141,26 @@ impl CrdsFilter {
/// A vector of crds filters that together hold a complete set of Hashes. /// A vector of crds filters that together hold a complete set of Hashes.
struct CrdsFilterSet { struct CrdsFilterSet {
filters: Vec<AtomicBloom<Hash>>, filters: Vec<Option<AtomicBloom<Hash>>>,
mask_bits: u32, mask_bits: u32,
} }
impl CrdsFilterSet { impl CrdsFilterSet {
fn new(num_items: usize, max_bytes: usize) -> Self { fn new<R: Rng>(rng: &mut R, num_items: usize, max_bytes: usize) -> Self {
const SAMPLE_RATE: usize = 8;
const MAX_NUM_FILTERS: usize = 1024;
let max_bits = (max_bytes * 8) as f64; let max_bits = (max_bytes * 8) as f64;
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS); let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items); let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items);
let filters = let mut filters: Vec<_> = repeat_with(|| None).take(1usize << mask_bits).collect();
repeat_with(|| Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into()) let mut indices: Vec<_> = (0..filters.len()).collect();
.take(1 << mask_bits) let size = (filters.len() + SAMPLE_RATE - 1) / SAMPLE_RATE;
.collect(); for _ in 0..MAX_NUM_FILTERS.min(size) {
let k = rng.gen_range(0..indices.len());
let k = indices.swap_remove(k);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
filters[k] = Some(AtomicBloom::<Hash>::from(filter));
}
Self { filters, mask_bits } Self { filters, mask_bits }
} }
@ -167,7 +172,9 @@ impl CrdsFilterSet {
.unwrap_or_default(), .unwrap_or_default(),
) )
.unwrap(); .unwrap();
self.filters[index].add(&hash_value); if let Some(filter) = &self.filters[index] {
filter.add(&hash_value);
}
} }
} }
@ -177,10 +184,12 @@ impl From<CrdsFilterSet> for Vec<CrdsFilter> {
cfs.filters cfs.filters
.into_iter() .into_iter()
.enumerate() .enumerate()
.map(|(seed, filter)| CrdsFilter { .filter_map(|(seed, filter)| {
filter: filter.into(), Some(CrdsFilter {
mask: CrdsFilter::compute_mask(seed as u64, mask_bits), filter: Bloom::<Hash>::from(filter?),
mask_bits, mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
mask_bits,
})
}) })
.collect() .collect()
} }
@ -269,14 +278,7 @@ impl CrdsGossipPull {
if nodes.is_empty() { if nodes.is_empty() {
return Err(CrdsGossipError::NoPeers); return Err(CrdsGossipError::NoPeers);
} }
let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size); let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
if filters.len() > MAX_NUM_PULL_REQUESTS {
for i in 0..MAX_NUM_PULL_REQUESTS {
let j = rng.gen_range(i..filters.len());
filters.swap(i, j);
}
filters.truncate(MAX_NUM_PULL_REQUESTS);
}
// Associate each pull-request filter with a randomly selected peer. // Associate each pull-request filter with a randomly selected peer.
let dist = WeightedIndex::new(weights).unwrap(); let dist = WeightedIndex::new(weights).unwrap();
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone()); let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
@ -425,7 +427,7 @@ impl CrdsGossipPull {
let crds = crds.read().unwrap(); let crds = crds.read().unwrap();
let num_items = crds.len() + crds.num_purged() + failed_inserts.len(); let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items); let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
let filters = CrdsFilterSet::new(num_items, bloom_size); let filters = CrdsFilterSet::new(&mut rand::thread_rng(), num_items, bloom_size);
thread_pool.install(|| { thread_pool.install(|| {
crds.par_values() crds.par_values()
.with_min_len(PAR_MIN_LENGTH) .with_min_len(PAR_MIN_LENGTH)
@ -669,45 +671,61 @@ pub(crate) mod tests {
#[test] #[test]
fn test_crds_filter_set_add() { fn test_crds_filter_set_add() {
let crds_filter_set = let mut rng = rand::thread_rng();
CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196); let crds_filter_set = CrdsFilterSet::new(
let hash_values: Vec<_> = repeat_with(Hash::new_unique).take(1024).collect(); &mut rng, /*num_items=*/ 59672788, /*max_bytes=*/ 8196,
);
let hash_values: Vec<_> = repeat_with(|| {
let buf: [u8; 32] = rng.gen();
solana_sdk::hash::hashv(&[&buf])
})
.take(1024)
.collect();
assert_eq!(crds_filter_set.filters.len(), 8192);
for hash_value in &hash_values { for hash_value in &hash_values {
crds_filter_set.add(*hash_value); crds_filter_set.add(*hash_value);
} }
let filters: Vec<CrdsFilter> = crds_filter_set.into(); let filters: Vec<CrdsFilter> = crds_filter_set.into();
let mut num_hits = 0;
assert_eq!(filters.len(), 1024); assert_eq!(filters.len(), 1024);
for hash_value in hash_values { for hash_value in hash_values {
let mut num_hits = 0; let mut hit = false;
let mut false_positives = 0; let mut false_positives = 0;
for filter in &filters { for filter in &filters {
if filter.test_mask(&hash_value) { if filter.test_mask(&hash_value) {
num_hits += 1; num_hits += 1;
assert!(!hit);
hit = true;
assert!(filter.contains(&hash_value)); assert!(filter.contains(&hash_value));
assert!(filter.filter.contains(&hash_value)); assert!(filter.filter.contains(&hash_value));
} else if filter.filter.contains(&hash_value) { } else if filter.filter.contains(&hash_value) {
false_positives += 1; false_positives += 1;
} }
} }
assert_eq!(num_hits, 1);
assert!(false_positives < 5); assert!(false_positives < 5);
} }
assert!(num_hits > 96, "num_hits: {num_hits}");
} }
#[test] #[test]
fn test_crds_filter_set_new() { fn test_crds_filter_set_new() {
// Validates invariances required by CrdsFilterSet::get in the // Validates invariances required by CrdsFilterSet::get in the
// vector of filters generated by CrdsFilterSet::new. // vector of filters generated by CrdsFilterSet::new.
let filters: Vec<CrdsFilter> = let filters = CrdsFilterSet::new(
CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into(); &mut rand::thread_rng(),
assert_eq!(filters.len(), 16384); 55345017, // num_items
4098, // max_bytes
);
assert_eq!(filters.filters.len(), 16384);
let filters = Vec::<CrdsFilter>::from(filters);
assert_eq!(filters.len(), 1024);
let mask_bits = filters[0].mask_bits; let mask_bits = filters[0].mask_bits;
let right_shift = 64 - mask_bits; let right_shift = 64 - mask_bits;
let ones = !0u64 >> mask_bits; let ones = !0u64 >> mask_bits;
for (i, filter) in filters.iter().enumerate() { for filter in &filters {
// Check that all mask_bits are equal. // Check that all mask_bits are equal.
assert_eq!(mask_bits, filter.mask_bits); assert_eq!(mask_bits, filter.mask_bits);
assert_eq!(i as u64, filter.mask >> right_shift); assert!((0..16384).contains(&(filter.mask >> right_shift)));
assert_eq!(ones, ones & filter.mask); assert_eq!(ones, ones & filter.mask);
} }
} }
@ -740,7 +758,7 @@ pub(crate) mod tests {
let crds = RwLock::new(crds); let crds = RwLock::new(crds);
assert!(num_inserts > 30_000, "num inserts: {num_inserts}"); assert!(num_inserts > 30_000, "num inserts: {num_inserts}");
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE); let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32)); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4));
let crds = crds.read().unwrap(); let crds = crds.read().unwrap();
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect()); let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect(); let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
@ -751,21 +769,24 @@ pub(crate) mod tests {
"hash_values.len(): {}", "hash_values.len(): {}",
hash_values.len() hash_values.len()
); );
let mut num_hits = 0;
let mut false_positives = 0; let mut false_positives = 0;
for hash_value in hash_values { for hash_value in hash_values {
let mut num_hits = 0; let mut hit = false;
for filter in &filters { for filter in &filters {
if filter.test_mask(&hash_value) { if filter.test_mask(&hash_value) {
num_hits += 1; num_hits += 1;
assert!(!hit);
hit = true;
assert!(filter.contains(&hash_value)); assert!(filter.contains(&hash_value));
assert!(filter.filter.contains(&hash_value)); assert!(filter.filter.contains(&hash_value));
} else if filter.filter.contains(&hash_value) { } else if filter.filter.contains(&hash_value) {
false_positives += 1; false_positives += 1;
} }
} }
assert_eq!(num_hits, 1);
} }
assert!(false_positives < 150_000, "fp: {false_positives}"); assert!(num_hits > 4000, "num_hits: {num_hits}");
assert!(false_positives < 20_000, "fp: {false_positives}");
} }
#[test] #[test]
@ -1308,7 +1329,8 @@ pub(crate) mod tests {
} }
#[test] #[test]
fn test_crds_filter_complete_set_add_mask() { fn test_crds_filter_complete_set_add_mask() {
let mut filters: Vec<CrdsFilter> = CrdsFilterSet::new(1000, 10).into(); let mut filters =
Vec::<CrdsFilter>::from(CrdsFilterSet::new(&mut rand::thread_rng(), 1000, 10));
assert!(filters.iter().all(|f| f.mask_bits > 0)); assert!(filters.iter().all(|f| f.mask_bits > 0));
let mut h: Hash = Hash::default(); let mut h: Hash = Hash::default();
// rev to make the hash::default() miss on the first few test_masks // rev to make the hash::default() miss on the first few test_masks