Rename: AtomicBloom to ConcurrentBloom (#34483)

This commit is contained in:
Andrew Fitzgerald 2023-12-21 06:59:20 -08:00 committed by GitHub
parent fbc3999446
commit f0ff69b9cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 19 deletions

View File

@ -5,7 +5,7 @@ use {
bv::BitVec,
fnv::FnvHasher,
rand::Rng,
solana_bloom::bloom::{AtomicBloom, Bloom, BloomHashIndex},
solana_bloom::bloom::{Bloom, BloomHashIndex, ConcurrentBloom},
solana_sdk::{
hash::{hash, Hash},
signature::Signature,
@ -128,7 +128,7 @@ fn bench_add_hash_atomic(bencher: &mut Bencher) {
.collect();
let mut fail = 0;
bencher.iter(|| {
let bloom: AtomicBloom<_> = Bloom::random(1287, 0.1, 7424).into();
let bloom: ConcurrentBloom<_> = Bloom::random(1287, 0.1, 7424).into();
// Intentionally not using parallelism here, so that this and above
// benchmark only compare the bit-vector ops.
// For benchmarking the parallel code, change bellow for loop to:

View File

@ -141,16 +141,19 @@ impl<T: AsRef<[u8]>> BloomHashIndex for T {
}
}
pub struct AtomicBloom<T> {
/// Bloom filter that can be used concurrently.
/// Concurrent reads/writes are safe, but are not atomic at the struct level,
/// this means that reads may see partial writes.
pub struct ConcurrentBloom<T> {
num_bits: u64,
keys: Vec<u64>,
bits: Vec<AtomicU64>,
_phantom: PhantomData<T>,
}
impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {
impl<T: BloomHashIndex> From<Bloom<T>> for ConcurrentBloom<T> {
fn from(bloom: Bloom<T>) -> Self {
AtomicBloom {
ConcurrentBloom {
num_bits: bloom.bits.len(),
keys: bloom.keys,
bits: bloom
@ -164,7 +167,7 @@ impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {
}
}
impl<T: BloomHashIndex> AtomicBloom<T> {
impl<T: BloomHashIndex> ConcurrentBloom<T> {
fn pos(&self, key: &T, hash_index: u64) -> (usize, u64) {
let pos = key
.hash_at_index(hash_index)
@ -199,15 +202,15 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
})
}
pub fn clear_for_tests(&mut self) {
pub fn clear(&self) {
self.bits.iter().for_each(|bit| {
bit.store(0u64, Ordering::Relaxed);
});
}
}
impl<T: BloomHashIndex> From<AtomicBloom<T>> for Bloom<T> {
fn from(atomic_bloom: AtomicBloom<T>) -> Self {
impl<T: BloomHashIndex> From<ConcurrentBloom<T>> for Bloom<T> {
fn from(atomic_bloom: ConcurrentBloom<T>) -> Self {
let bits: Vec<_> = atomic_bloom
.bits
.into_iter()
@ -325,7 +328,7 @@ mod test {
let hash_values: Vec<_> = std::iter::repeat_with(generate_random_hash)
.take(1200)
.collect();
let bloom: AtomicBloom<_> = Bloom::<Hash>::random(1287, 0.1, 7424).into();
let bloom: ConcurrentBloom<_> = Bloom::<Hash>::random(1287, 0.1, 7424).into();
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.num_bits, 6168);
assert_eq!(bloom.bits.len(), 97);
@ -360,7 +363,7 @@ mod test {
let num_bits_set = bloom.num_bits_set;
assert!(num_bits_set > 2000, "# bits set: {num_bits_set}");
// Round-trip with no inserts.
let bloom: AtomicBloom<_> = bloom.into();
let bloom: ConcurrentBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
for hash_value in &hash_values {
@ -372,7 +375,7 @@ mod test {
assert!(bloom.contains(hash_value));
}
// Round trip, re-inserting the same hash values.
let bloom: AtomicBloom<_> = bloom.into();
let bloom: ConcurrentBloom<_> = bloom.into();
hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
@ -389,7 +392,7 @@ mod test {
let more_hash_values: Vec<_> = std::iter::repeat_with(generate_random_hash)
.take(1000)
.collect();
let bloom: AtomicBloom<_> = bloom.into();
let bloom: ConcurrentBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
more_hash_values.par_iter().for_each(|v| {

View File

@ -28,7 +28,7 @@ use {
Rng,
},
rayon::{prelude::*, ThreadPool},
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_bloom::bloom::{Bloom, ConcurrentBloom},
solana_sdk::{
hash::{hash, Hash},
native_token::LAMPORTS_PER_SOL,
@ -141,7 +141,7 @@ impl CrdsFilter {
/// A vector of crds filters that together hold a complete set of Hashes.
struct CrdsFilterSet {
filters: Vec<Option<AtomicBloom<Hash>>>,
filters: Vec<Option<ConcurrentBloom<Hash>>>,
mask_bits: u32,
}
@ -159,7 +159,7 @@ impl CrdsFilterSet {
let k = rng.gen_range(0..indices.len());
let k = indices.swap_remove(k);
let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
filters[k] = Some(AtomicBloom::<Hash>::from(filter));
filters[k] = Some(ConcurrentBloom::<Hash>::from(filter));
}
Self { filters, mask_bits }
}

View File

@ -2,7 +2,7 @@ use {
crate::weighted_shuffle::WeightedShuffle,
indexmap::IndexMap,
rand::Rng,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_bloom::bloom::{Bloom, ConcurrentBloom},
solana_sdk::{native_token::LAMPORTS_PER_SOL, pubkey::Pubkey},
std::collections::HashMap,
};
@ -19,7 +19,7 @@ pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES
// Keys are gossip nodes to push messages to.
// Values are which origins the node has pruned.
#[derive(Default)]
struct PushActiveSetEntry(IndexMap</*node:*/ Pubkey, /*origins:*/ AtomicBloom<Pubkey>>);
struct PushActiveSetEntry(IndexMap</*node:*/ Pubkey, /*origins:*/ ConcurrentBloom<Pubkey>>);
impl PushActiveSet {
#[cfg(debug_assertions)]
@ -151,7 +151,7 @@ impl PushActiveSetEntry {
if self.0.contains_key(node) {
continue;
}
let bloom = AtomicBloom::from(Bloom::random(
let bloom = ConcurrentBloom::from(Bloom::random(
num_bloom_filter_items,
Self::BLOOM_FALSE_RATE,
Self::BLOOM_MAX_BITS,