From bb183938d990ae1433875d9042744f6857bf57eb Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 24 Sep 2020 18:37:19 +0000 Subject: [PATCH] adds an atomic variant of the bloom filter (#12422) For crds_gossip_pull, we want to parallelize build_crds_filters, which requires concurrent writes to bloom filters. This commit implements a variant of the bloom filter which uses atomics for its bits vector and so is thread-safe. --- runtime/benches/bloom.rs | 48 ++++++++++++- runtime/src/bloom.rs | 151 +++++++++++++++++++++++++++++++++++++++ sdk/src/hash.rs | 11 +++ 3 files changed, 209 insertions(+), 1 deletion(-) diff --git a/runtime/benches/bloom.rs b/runtime/benches/bloom.rs index b961fb8ee1..f39e0795fb 100644 --- a/runtime/benches/bloom.rs +++ b/runtime/benches/bloom.rs @@ -3,7 +3,8 @@ extern crate test; use bv::BitVec; use fnv::FnvHasher; -use solana_runtime::bloom::{Bloom, BloomHashIndex}; +use rand::Rng; +use solana_runtime::bloom::{AtomicBloom, Bloom, BloomHashIndex}; use solana_sdk::{ hash::{hash, Hash}, signature::Signature, @@ -97,3 +98,48 @@ fn bench_sigs_hashmap(bencher: &mut Bencher) { }); assert_eq!(falses, 0); } + +#[bench] +fn bench_add_hash(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1200) + .collect(); + let mut fail = 0; + bencher.iter(|| { + let mut bloom = Bloom::random(1287, 0.1, 7424); + for hash_value in &hash_values { + bloom.add(hash_value); + } + let index = rng.gen_range(0, hash_values.len()); + if !bloom.contains(&hash_values[index]) { + fail += 1; + } + }); + assert_eq!(fail, 0); +} + +#[bench] +fn bench_add_hash_atomic(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1200) + .collect(); + let mut fail = 0; + bencher.iter(|| { + let bloom: AtomicBloom<_> = Bloom::random(1287, 0.1, 7424).into(); + // Intentionally not using parallelism here, so that this and above + // benchmark only compare the bit-vector ops. + // For benchmarking the parallel code, change bellow for loop to: + // hash_values.par_iter().for_each(|v| bloom.add(v)); + for hash_value in &hash_values { + bloom.add(hash_value); + } + let bloom: Bloom<_> = bloom.into(); + let index = rng.gen_range(0, hash_values.len()); + if !bloom.contains(&hash_values[index]) { + fail += 1; + } + }); + assert_eq!(fail, 0); +} diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index 8b3f40aee5..680555f016 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -4,6 +4,7 @@ use fnv::FnvHasher; use rand::{self, Rng}; use serde::{Deserialize, Serialize}; use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{cmp, hash::Hasher, marker::PhantomData}; /// Generate a stable hash of `self` for each `hash_index` @@ -121,9 +122,63 @@ impl> BloomHashIndex for T { } } +pub struct AtomicBloom { + num_bits: u64, + keys: Vec, + bits: Vec, + _phantom: PhantomData, +} + +impl From> for AtomicBloom { + fn from(bloom: Bloom) -> Self { + AtomicBloom { + num_bits: bloom.bits.len(), + keys: bloom.keys, + bits: bloom + .bits + .into_boxed_slice() + .iter() + .map(|&x| AtomicU64::new(x)) + .collect(), + _phantom: PhantomData::default(), + } + } +} + +impl AtomicBloom { + pub fn add(&self, key: &T) { + for k in &self.keys { + let pos = key.hash_at_index(*k) % self.num_bits; + // Divide by 64 to figure out which of the + // AtomicU64 bit chunks we need to modify. + let index = pos >> 6; + // (pos & 63) is equivalent to mod 64 so that we can find + // the index of the bit within the AtomicU64 to modify. + let bit = 1u64 << (pos & 63); + self.bits[index as usize].fetch_or(bit, Ordering::Relaxed); + } + } +} + +impl Into> for AtomicBloom { + fn into(self) -> Bloom { + let bits: Vec<_> = self.bits.into_iter().map(AtomicU64::into_inner).collect(); + let num_bits_set = bits.iter().map(|x| x.count_ones() as u64).sum(); + let mut bits: BitVec = bits.into(); + bits.truncate(self.num_bits); + Bloom { + keys: self.keys, + bits, + num_bits_set, + _phantom: PhantomData::default(), + } + } +} + #[cfg(test)] mod test { use super::*; + use rayon::prelude::*; use solana_sdk::hash::{hash, Hash}; #[test] @@ -205,4 +260,100 @@ mod test { "Bloom { keys.len: 1 bits.len: 1000 num_set: 2 bits: 0000000000.. }" ); } + + #[test] + fn test_atomic_bloom() { + let mut rng = rand::thread_rng(); + let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1200) + .collect(); + let bloom: AtomicBloom<_> = Bloom::::random(1287, 0.1, 7424).into(); + assert_eq!(bloom.keys.len(), 3); + assert_eq!(bloom.num_bits, 6168); + assert_eq!(bloom.bits.len(), 97); + hash_values.par_iter().for_each(|v| bloom.add(v)); + let bloom: Bloom = bloom.into(); + assert_eq!(bloom.keys.len(), 3); + assert_eq!(bloom.bits.len(), 6168); + assert!(bloom.num_bits_set > 2000); + for hash_value in hash_values { + assert!(bloom.contains(&hash_value)); + } + let false_positive = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(10_000) + .filter(|hash_value| bloom.contains(hash_value)) + .count(); + assert!(false_positive < 2_000, "false_positive: {}", false_positive); + } + + #[test] + fn test_atomic_bloom_round_trip() { + let mut rng = rand::thread_rng(); + let keys: Vec<_> = std::iter::repeat_with(|| rng.gen()).take(5).collect(); + let mut bloom = Bloom::::new(9731, keys.clone()); + let hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1000) + .collect(); + for hash_value in &hash_values { + bloom.add(hash_value); + } + let num_bits_set = bloom.num_bits_set; + assert!(num_bits_set > 2000, "# bits set: {}", num_bits_set); + // Round-trip with no inserts. + let bloom: AtomicBloom<_> = bloom.into(); + assert_eq!(bloom.num_bits, 9731); + assert_eq!(bloom.bits.len(), (9731 + 63) / 64); + let bloom: Bloom<_> = bloom.into(); + assert_eq!(bloom.num_bits_set, num_bits_set); + for hash_value in &hash_values { + assert!(bloom.contains(hash_value)); + } + // Round trip, re-inserting the same hash values. + let bloom: AtomicBloom<_> = bloom.into(); + hash_values.par_iter().for_each(|v| bloom.add(v)); + let bloom: Bloom<_> = bloom.into(); + assert_eq!(bloom.num_bits_set, num_bits_set); + assert_eq!(bloom.bits.len(), 9731); + for hash_value in &hash_values { + assert!(bloom.contains(hash_value)); + } + // Round trip, inserting new hash values. + let more_hash_values: Vec<_> = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(1000) + .collect(); + let bloom: AtomicBloom<_> = bloom.into(); + assert_eq!(bloom.num_bits, 9731); + assert_eq!(bloom.bits.len(), (9731 + 63) / 64); + more_hash_values.par_iter().for_each(|v| bloom.add(v)); + let bloom: Bloom<_> = bloom.into(); + assert_eq!(bloom.bits.len(), 9731); + assert!(bloom.num_bits_set > num_bits_set); + assert!( + bloom.num_bits_set > 4000, + "# bits set: {}", + bloom.num_bits_set + ); + for hash_value in &hash_values { + assert!(bloom.contains(hash_value)); + } + for hash_value in &more_hash_values { + assert!(bloom.contains(hash_value)); + } + let false_positive = std::iter::repeat_with(|| Hash::new_rand(&mut rng)) + .take(10_000) + .filter(|hash_value| bloom.contains(hash_value)) + .count(); + assert!(false_positive < 2000, "false_positive: {}", false_positive); + // Assert that the bits vector precisely match if no atomic ops were + // used. + let bits = bloom.bits; + let mut bloom = Bloom::::new(9731, keys); + for hash_value in &hash_values { + bloom.add(hash_value); + } + for hash_value in &more_hash_values { + bloom.add(hash_value); + } + assert_eq!(bits, bloom.bits); + } } diff --git a/sdk/src/hash.rs b/sdk/src/hash.rs index 0f5497d87a..c9dcbcb4a5 100644 --- a/sdk/src/hash.rs +++ b/sdk/src/hash.rs @@ -88,6 +88,17 @@ impl Hash { pub fn to_bytes(self) -> [u8; HASH_BYTES] { self.0 } + + /// New random hash value for tests and benchmarks. + #[cfg(not(feature = "program"))] + pub fn new_rand(rng: &mut R) -> Self + where + R: rand::Rng, + { + let mut buf = [0u8; HASH_BYTES]; + rng.fill(&mut buf); + Hash::new(&buf) + } } /// Return a Sha256 hash for the given data.