adds an atomic variant of the bloom filter (#12422)

For crds_gossip_pull, we want to parallelize build_crds_filters, which
requires concurrent writes to bloom filters.

This commit implements a variant of the bloom filter which uses atomics
for its bits vector and so is thread-safe.
This commit is contained in:
behzad nouri 2020-09-24 18:37:19 +00:00 committed by GitHub
parent 42f1ef8acb
commit bb183938d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 209 additions and 1 deletions

View File

@ -3,7 +3,8 @@
extern crate test;
use bv::BitVec;
use fnv::FnvHasher;
use solana_runtime::bloom::{Bloom, BloomHashIndex};
use rand::Rng;
use solana_runtime::bloom::{AtomicBloom, Bloom, BloomHashIndex};
use solana_sdk::{
hash::{hash, Hash},
signature::Signature,
@ -97,3 +98,48 @@ fn bench_sigs_hashmap(bencher: &mut Bencher) {
});
assert_eq!(falses, 0);
}
#[bench]
fn bench_add_hash(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();
let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(1200)
.collect();
let mut fail = 0;
bencher.iter(|| {
let mut bloom = Bloom::random(1287, 0.1, 7424);
for hash_value in &hash_values {
bloom.add(hash_value);
}
let index = rng.gen_range(0, hash_values.len());
if !bloom.contains(&hash_values[index]) {
fail += 1;
}
});
assert_eq!(fail, 0);
}
#[bench]
fn bench_add_hash_atomic(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();
let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(1200)
.collect();
let mut fail = 0;
bencher.iter(|| {
let bloom: AtomicBloom<_> = Bloom::random(1287, 0.1, 7424).into();
// Intentionally not using parallelism here, so that this and above
// benchmark only compare the bit-vector ops.
// For benchmarking the parallel code, change bellow for loop to:
// hash_values.par_iter().for_each(|v| bloom.add(v));
for hash_value in &hash_values {
bloom.add(hash_value);
}
let bloom: Bloom<_> = bloom.into();
let index = rng.gen_range(0, hash_values.len());
if !bloom.contains(&hash_values[index]) {
fail += 1;
}
});
assert_eq!(fail, 0);
}

View File

@ -4,6 +4,7 @@ use fnv::FnvHasher;
use rand::{self, Rng};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{cmp, hash::Hasher, marker::PhantomData};
/// Generate a stable hash of `self` for each `hash_index`
@ -121,9 +122,63 @@ impl<T: AsRef<[u8]>> BloomHashIndex for T {
}
}
pub struct AtomicBloom<T> {
num_bits: u64,
keys: Vec<u64>,
bits: Vec<AtomicU64>,
_phantom: PhantomData<T>,
}
impl<T: BloomHashIndex> From<Bloom<T>> for AtomicBloom<T> {
fn from(bloom: Bloom<T>) -> Self {
AtomicBloom {
num_bits: bloom.bits.len(),
keys: bloom.keys,
bits: bloom
.bits
.into_boxed_slice()
.iter()
.map(|&x| AtomicU64::new(x))
.collect(),
_phantom: PhantomData::default(),
}
}
}
impl<T: BloomHashIndex> AtomicBloom<T> {
pub fn add(&self, key: &T) {
for k in &self.keys {
let pos = key.hash_at_index(*k) % self.num_bits;
// Divide by 64 to figure out which of the
// AtomicU64 bit chunks we need to modify.
let index = pos >> 6;
// (pos & 63) is equivalent to mod 64 so that we can find
// the index of the bit within the AtomicU64 to modify.
let bit = 1u64 << (pos & 63);
self.bits[index as usize].fetch_or(bit, Ordering::Relaxed);
}
}
}
impl<T: BloomHashIndex> Into<Bloom<T>> for AtomicBloom<T> {
fn into(self) -> Bloom<T> {
let bits: Vec<_> = self.bits.into_iter().map(AtomicU64::into_inner).collect();
let num_bits_set = bits.iter().map(|x| x.count_ones() as u64).sum();
let mut bits: BitVec<u64> = bits.into();
bits.truncate(self.num_bits);
Bloom {
keys: self.keys,
bits,
num_bits_set,
_phantom: PhantomData::default(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use rayon::prelude::*;
use solana_sdk::hash::{hash, Hash};
#[test]
@ -205,4 +260,100 @@ mod test {
"Bloom { keys.len: 1 bits.len: 1000 num_set: 2 bits: 0000000000.. }"
);
}
#[test]
fn test_atomic_bloom() {
let mut rng = rand::thread_rng();
let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(1200)
.collect();
let bloom: AtomicBloom<_> = Bloom::<Hash>::random(1287, 0.1, 7424).into();
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.num_bits, 6168);
assert_eq!(bloom.bits.len(), 97);
hash_values.par_iter().for_each(|v| bloom.add(v));
let bloom: Bloom<Hash> = bloom.into();
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 6168);
assert!(bloom.num_bits_set > 2000);
for hash_value in hash_values {
assert!(bloom.contains(&hash_value));
}
let false_positive = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(10_000)
.filter(|hash_value| bloom.contains(hash_value))
.count();
assert!(false_positive < 2_000, "false_positive: {}", false_positive);
}
#[test]
fn test_atomic_bloom_round_trip() {
let mut rng = rand::thread_rng();
let keys: Vec<_> = std::iter::repeat_with(|| rng.gen()).take(5).collect();
let mut bloom = Bloom::<Hash>::new(9731, keys.clone());
let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(1000)
.collect();
for hash_value in &hash_values {
bloom.add(hash_value);
}
let num_bits_set = bloom.num_bits_set;
assert!(num_bits_set > 2000, "# bits set: {}", num_bits_set);
// Round-trip with no inserts.
let bloom: AtomicBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
let bloom: Bloom<_> = bloom.into();
assert_eq!(bloom.num_bits_set, num_bits_set);
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
// Round trip, re-inserting the same hash values.
let bloom: AtomicBloom<_> = bloom.into();
hash_values.par_iter().for_each(|v| bloom.add(v));
let bloom: Bloom<_> = bloom.into();
assert_eq!(bloom.num_bits_set, num_bits_set);
assert_eq!(bloom.bits.len(), 9731);
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
// Round trip, inserting new hash values.
let more_hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(1000)
.collect();
let bloom: AtomicBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
more_hash_values.par_iter().for_each(|v| bloom.add(v));
let bloom: Bloom<_> = bloom.into();
assert_eq!(bloom.bits.len(), 9731);
assert!(bloom.num_bits_set > num_bits_set);
assert!(
bloom.num_bits_set > 4000,
"# bits set: {}",
bloom.num_bits_set
);
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
for hash_value in &more_hash_values {
assert!(bloom.contains(hash_value));
}
let false_positive = std::iter::repeat_with(|| Hash::new_rand(&mut rng))
.take(10_000)
.filter(|hash_value| bloom.contains(hash_value))
.count();
assert!(false_positive < 2000, "false_positive: {}", false_positive);
// Assert that the bits vector precisely match if no atomic ops were
// used.
let bits = bloom.bits;
let mut bloom = Bloom::<Hash>::new(9731, keys);
for hash_value in &hash_values {
bloom.add(hash_value);
}
for hash_value in &more_hash_values {
bloom.add(hash_value);
}
assert_eq!(bits, bloom.bits);
}
}

View File

@ -88,6 +88,17 @@ impl Hash {
pub fn to_bytes(self) -> [u8; HASH_BYTES] {
self.0
}
/// New random hash value for tests and benchmarks.
#[cfg(not(feature = "program"))]
pub fn new_rand<R: ?Sized>(rng: &mut R) -> Self
where
R: rand::Rng,
{
let mut buf = [0u8; HASH_BYTES];
rng.fill(&mut buf);
Hash::new(&buf)
}
}
/// Return a Sha256 hash for the given data.