diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 41ee14e72..6e1f427bb 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -44,8 +44,7 @@ impl ShredFetchStage { ) { const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1); let mut rng = rand::thread_rng(); - let mut deduper = - Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS); + let mut deduper = Deduper::<2>::new(&mut rng, DEDUPER_NUM_BITS); let mut last_updated = Instant::now(); let mut keypair = repair_context .as_ref() @@ -60,7 +59,7 @@ impl ShredFetchStage { let mut stats = ShredFetchStats::default(); for mut packet_batch in recvr { - deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE); + deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE); if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { last_updated = Instant::now(); { @@ -286,7 +285,7 @@ mod tests { fn test_data_code_same_index() { solana_logger::setup(); let mut rng = rand::thread_rng(); - let deduper = Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007); + let deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 640_007); let mut packet = Packet::default(); let mut stats = ShredFetchStats::default(); @@ -338,7 +337,7 @@ mod tests { fn test_shred_filter() { solana_logger::setup(); let mut rng = rand::thread_rng(); - let deduper = Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007); + let deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 640_007); let mut packet = Packet::default(); let mut stats = ShredFetchStats::default(); let last_root = 0; diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ffe9613a7..756fc604b 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -416,10 +416,9 @@ impl SigVerifyStage { .name("solSigVerifier".to_string()) .spawn(move || { let mut rng = rand::thread_rng(); - let mut deduper = - Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS); + let mut deduper = Deduper::<2>::new(&mut rng, DEDUPER_NUM_BITS); loop { - deduper.maybe_reset(&mut rng, &MAX_DEDUPER_AGE); + deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, MAX_DEDUPER_AGE); if let Err(e) = Self::verifier(&deduper, &packet_receiver, &mut verifier, &mut stats) { diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index b2a601bc7..156ee8788 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -25,12 +25,14 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { // verify packets let mut rng = rand::thread_rng(); - let mut deduper = Deduper::<2>::new( - &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, - ); + let mut deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979); bencher.iter(|| { let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()); - deduper.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_secs(2)); + deduper.maybe_reset( + &mut rng, + 0.001, // false_positive_rate + Duration::from_secs(2), // reset_cycle + ); batches .iter_mut() .for_each(|b| b.iter_mut().for_each(|p| p.meta_mut().set_discard(false))); @@ -116,10 +118,12 @@ fn bench_dedup_baseline(bencher: &mut Bencher) { #[ignore] fn bench_dedup_reset(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); - let mut deduper = Deduper::<2>::new( - &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, - ); + let mut deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979); bencher.iter(|| { - deduper.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_millis(0)); + deduper.maybe_reset( + &mut rng, + 0.001, // false_positive_rate + Duration::from_millis(0), // reset_cycle + ); }); } diff --git a/perf/src/deduper.rs b/perf/src/deduper.rs index 4c609cc99..823bf321e 100644 --- a/perf/src/deduper.rs +++ b/perf/src/deduper.rs @@ -17,35 +17,46 @@ pub struct Deduper { bits: Vec, seeds: [(u128, u128); K], clock: Instant, - // Maximum number of one bits before the false positive - // rate exceeds the specified threshold. - capacity: u64, popcount: AtomicU64, // Number of one bits in self.bits. } impl Deduper { - pub fn new(rng: &mut R, false_positive_rate: f64, num_bits: u64) -> Self { - assert!(0.0 < false_positive_rate && false_positive_rate < 1.0); - let size = usize::try_from(num_bits.checked_add(63).unwrap() / 64).unwrap(); - let capacity = num_bits as f64 * false_positive_rate.powf(1f64 / K as f64); + pub fn new(rng: &mut R, num_bits: u64) -> Self { + let size = num_bits.checked_add(63).unwrap() / 64; + let size = usize::try_from(size).unwrap(); Self { num_bits, seeds: std::array::from_fn(|_| rng.gen()), clock: Instant::now(), bits: repeat_with(AtomicU64::default).take(size).collect(), - capacity: capacity as u64, popcount: AtomicU64::default(), } } - pub fn maybe_reset(&mut self, rng: &mut R, reset_cycle: &Duration) { + fn false_positive_rate(&self) -> f64 { let popcount = self.popcount.load(Ordering::Relaxed); - if popcount >= self.capacity || &self.clock.elapsed() >= reset_cycle { + let ones_ratio = popcount.min(self.num_bits) as f64 / self.num_bits as f64; + ones_ratio.powi(K as i32) + } + + /// Resets the Deduper if either it is older than the reset_cycle or it is + /// saturated enough that false positive rate exceeds specified threshold. + /// Returns true if the deduper was saturated. + pub fn maybe_reset( + &mut self, + rng: &mut R, + false_positive_rate: f64, + reset_cycle: Duration, + ) -> bool { + assert!(0.0 < false_positive_rate && false_positive_rate < 1.0); + let saturated = self.false_positive_rate() >= false_positive_rate; + if saturated || self.clock.elapsed() >= reset_cycle { self.seeds = std::array::from_fn(|_| rng.gen()); self.clock = Instant::now(); self.bits.fill_with(AtomicU64::default); self.popcount = AtomicU64::default(); } + saturated } // Returns true if the packet is duplicate. @@ -113,9 +124,7 @@ mod tests { to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); let mut rng = rand::thread_rng(); - let filter = Deduper::<2>::new( - &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, - ); + let filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979); let mut num_deduped = 0; let discard = filter.dedup_packets_and_count_discards( &mut batches, @@ -130,46 +139,57 @@ mod tests { #[test] fn test_dedup_diff() { let mut rng = rand::thread_rng(); - let mut filter = Deduper::<2>::new( - &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, - ); + let mut filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through assert_eq!(discard, 0); - filter.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_millis(0)); + assert!(!filter.maybe_reset( + &mut rng, + 0.001, // false_positive_rate + Duration::from_millis(0), // reset_cycle + )); for i in filter.bits { assert_eq!(i.load(Ordering::Relaxed), 0); } } + fn get_capacity(num_bits: u64, false_positive_rate: f64) -> u64 { + (num_bits as f64 * false_positive_rate.powf(1f64 / K as f64)) as u64 + } + #[test] #[ignore] fn test_dedup_saturated() { + const NUM_BITS: u64 = 63_999_979; + const FALSE_POSITIVE_RATE: f64 = 0.001; let mut rng = rand::thread_rng(); - let filter = Deduper::<2>::new( - &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, - ); + let mut filter = Deduper::<2>::new(&mut rng, NUM_BITS); + let capacity = get_capacity::<2>(NUM_BITS, FALSE_POSITIVE_RATE); let mut discard = 0; - assert!(filter.popcount.load(Ordering::Relaxed) < filter.capacity); + assert!(filter.popcount.load(Ordering::Relaxed) < capacity); for i in 0..1000 { let mut batches = to_packet_batches(&(0..1000).map(|_| test_tx()).collect::>(), 128); discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; trace!("{} {}", i, discard); - if filter.popcount.load(Ordering::Relaxed) >= filter.capacity { + if filter.popcount.load(Ordering::Relaxed) > capacity { break; } } - assert!(filter.popcount.load(Ordering::Relaxed) >= filter.capacity); + assert!(filter.popcount.load(Ordering::Relaxed) > capacity); + assert!(filter.false_positive_rate() >= FALSE_POSITIVE_RATE); + assert!(filter.maybe_reset( + &mut rng, + FALSE_POSITIVE_RATE, + Duration::from_millis(0), // reset_cycle + )); } #[test] fn test_dedup_false_positive() { let mut rng = rand::thread_rng(); - let filter = Deduper::<2>::new( - &mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, - ); + let filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979); let mut discard = 0; for i in 0..10 { let mut batches = @@ -194,8 +214,18 @@ mod tests { #[test_case(637_534_199, 0.0001, 6_375_341)] fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) { let mut rng = rand::thread_rng(); - let deduper = Deduper::<2>::new(&mut rng, false_positive_rate, num_bits); - assert_eq!(deduper.capacity, capacity); + assert_eq!(get_capacity::<2>(num_bits, false_positive_rate), capacity); + let mut deduper = Deduper::<2>::new(&mut rng, num_bits); + assert_eq!(deduper.false_positive_rate(), 0.0); + deduper.popcount.store(capacity, Ordering::Relaxed); + assert!(deduper.false_positive_rate() < false_positive_rate); + deduper.popcount.store(capacity + 1, Ordering::Relaxed); + assert!(deduper.false_positive_rate() >= false_positive_rate); + assert!(deduper.maybe_reset( + &mut rng, + false_positive_rate, + Duration::from_millis(0), // reset_cycle + )); } #[test_case([0xf9; 32], 3_199_997, 101_192, 51_414, 70, 101_125)] @@ -212,9 +242,10 @@ mod tests { num_dups: usize, popcount: u64, ) { + const FALSE_POSITIVE_RATE: f64 = 0.001; let mut rng = ChaChaRng::from_seed(seed); - let deduper = Deduper::<2>::new(&mut rng, /*false_positive_rate:*/ 0.001, num_bits); - assert_eq!(deduper.capacity, capacity); + let mut deduper = Deduper::<2>::new(&mut rng, num_bits); + assert_eq!(get_capacity::<2>(num_bits, FALSE_POSITIVE_RATE), capacity); let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default()); let mut dup_count = 0usize; for _ in 0..num_packets { @@ -228,5 +259,11 @@ mod tests { } assert_eq!(dup_count, num_dups); assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount); + assert!(deduper.false_positive_rate() < FALSE_POSITIVE_RATE); + assert!(!deduper.maybe_reset( + &mut rng, + FALSE_POSITIVE_RATE, + Duration::from_millis(0), // reset_cycle + )); } }