removes false_positive_rate field from Deduper (#30788)

removes the false_positive_rate field from the Deduper

Deduper.false_positive_rate field is misleading because it is not
enforced until maybe_reset is called. But then maybe_reset can be
invoked with an explicit argument.
This commit is contained in:
behzad nouri 2023-03-20 13:16:52 +00:00 committed by GitHub
parent cd7fe76744
commit 46614c0e9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 46 deletions

View File

@ -44,8 +44,7 @@ impl ShredFetchStage {
) { ) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1); const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut deduper = let mut deduper = Deduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
let mut last_updated = Instant::now(); let mut last_updated = Instant::now();
let mut keypair = repair_context let mut keypair = repair_context
.as_ref() .as_ref()
@ -60,7 +59,7 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default(); let mut stats = ShredFetchStats::default();
for mut packet_batch in recvr { for mut packet_batch in recvr {
deduper.maybe_reset(&mut rng, &DEDUPER_RESET_CYCLE); deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_RESET_CYCLE);
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT { if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now(); last_updated = Instant::now();
{ {
@ -286,7 +285,7 @@ mod tests {
fn test_data_code_same_index() { fn test_data_code_same_index() {
solana_logger::setup(); solana_logger::setup();
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let deduper = Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007); let deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default(); let mut packet = Packet::default();
let mut stats = ShredFetchStats::default(); let mut stats = ShredFetchStats::default();
@ -338,7 +337,7 @@ mod tests {
fn test_shred_filter() { fn test_shred_filter() {
solana_logger::setup(); solana_logger::setup();
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let deduper = Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, 640_007); let deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
let mut packet = Packet::default(); let mut packet = Packet::default();
let mut stats = ShredFetchStats::default(); let mut stats = ShredFetchStats::default();
let last_root = 0; let last_root = 0;

View File

@ -416,10 +416,9 @@ impl SigVerifyStage {
.name("solSigVerifier".to_string()) .name("solSigVerifier".to_string())
.spawn(move || { .spawn(move || {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut deduper = let mut deduper = Deduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
Deduper::<2>::new(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, DEDUPER_NUM_BITS);
loop { loop {
deduper.maybe_reset(&mut rng, &MAX_DEDUPER_AGE); deduper.maybe_reset(&mut rng, DEDUPER_FALSE_POSITIVE_RATE, MAX_DEDUPER_AGE);
if let Err(e) = if let Err(e) =
Self::verifier(&deduper, &packet_receiver, &mut verifier, &mut stats) Self::verifier(&deduper, &packet_receiver, &mut verifier, &mut stats)
{ {

View File

@ -25,12 +25,14 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
// verify packets // verify packets
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2>::new( let mut deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
&mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979,
);
bencher.iter(|| { bencher.iter(|| {
let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()); let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ());
deduper.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_secs(2)); deduper.maybe_reset(
&mut rng,
0.001, // false_positive_rate
Duration::from_secs(2), // reset_cycle
);
batches batches
.iter_mut() .iter_mut()
.for_each(|b| b.iter_mut().for_each(|p| p.meta_mut().set_discard(false))); .for_each(|b| b.iter_mut().for_each(|p| p.meta_mut().set_discard(false)));
@ -116,10 +118,12 @@ fn bench_dedup_baseline(bencher: &mut Bencher) {
#[ignore] #[ignore]
fn bench_dedup_reset(bencher: &mut Bencher) { fn bench_dedup_reset(bencher: &mut Bencher) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut deduper = Deduper::<2>::new( let mut deduper = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
&mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979,
);
bencher.iter(|| { bencher.iter(|| {
deduper.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_millis(0)); deduper.maybe_reset(
&mut rng,
0.001, // false_positive_rate
Duration::from_millis(0), // reset_cycle
);
}); });
} }

View File

@ -17,35 +17,46 @@ pub struct Deduper<const K: usize> {
bits: Vec<AtomicU64>, bits: Vec<AtomicU64>,
seeds: [(u128, u128); K], seeds: [(u128, u128); K],
clock: Instant, clock: Instant,
// Maximum number of one bits before the false positive
// rate exceeds the specified threshold.
capacity: u64,
popcount: AtomicU64, // Number of one bits in self.bits. popcount: AtomicU64, // Number of one bits in self.bits.
} }
impl<const K: usize> Deduper<K> { impl<const K: usize> Deduper<K> {
pub fn new<R: Rng>(rng: &mut R, false_positive_rate: f64, num_bits: u64) -> Self { pub fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
assert!(0.0 < false_positive_rate && false_positive_rate < 1.0); let size = num_bits.checked_add(63).unwrap() / 64;
let size = usize::try_from(num_bits.checked_add(63).unwrap() / 64).unwrap(); let size = usize::try_from(size).unwrap();
let capacity = num_bits as f64 * false_positive_rate.powf(1f64 / K as f64);
Self { Self {
num_bits, num_bits,
seeds: std::array::from_fn(|_| rng.gen()), seeds: std::array::from_fn(|_| rng.gen()),
clock: Instant::now(), clock: Instant::now(),
bits: repeat_with(AtomicU64::default).take(size).collect(), bits: repeat_with(AtomicU64::default).take(size).collect(),
capacity: capacity as u64,
popcount: AtomicU64::default(), popcount: AtomicU64::default(),
} }
} }
pub fn maybe_reset<R: Rng>(&mut self, rng: &mut R, reset_cycle: &Duration) { fn false_positive_rate(&self) -> f64 {
let popcount = self.popcount.load(Ordering::Relaxed); let popcount = self.popcount.load(Ordering::Relaxed);
if popcount >= self.capacity || &self.clock.elapsed() >= reset_cycle { let ones_ratio = popcount.min(self.num_bits) as f64 / self.num_bits as f64;
ones_ratio.powi(K as i32)
}
/// Resets the Deduper if either it is older than the reset_cycle or it is
/// saturated enough that false positive rate exceeds specified threshold.
/// Returns true if the deduper was saturated.
pub fn maybe_reset<R: Rng>(
&mut self,
rng: &mut R,
false_positive_rate: f64,
reset_cycle: Duration,
) -> bool {
assert!(0.0 < false_positive_rate && false_positive_rate < 1.0);
let saturated = self.false_positive_rate() >= false_positive_rate;
if saturated || self.clock.elapsed() >= reset_cycle {
self.seeds = std::array::from_fn(|_| rng.gen()); self.seeds = std::array::from_fn(|_| rng.gen());
self.clock = Instant::now(); self.clock = Instant::now();
self.bits.fill_with(AtomicU64::default); self.bits.fill_with(AtomicU64::default);
self.popcount = AtomicU64::default(); self.popcount = AtomicU64::default();
} }
saturated
} }
// Returns true if the packet is duplicate. // Returns true if the packet is duplicate.
@ -113,9 +124,7 @@ mod tests {
to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128); to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches); let packet_count = sigverify::count_packets_in_batches(&batches);
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let filter = Deduper::<2>::new( let filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
&mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979,
);
let mut num_deduped = 0; let mut num_deduped = 0;
let discard = filter.dedup_packets_and_count_discards( let discard = filter.dedup_packets_and_count_discards(
&mut batches, &mut batches,
@ -130,46 +139,57 @@ mod tests {
#[test] #[test]
fn test_dedup_diff() { fn test_dedup_diff() {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut filter = Deduper::<2>::new( let mut filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
&mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979,
);
let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
// because dedup uses a threadpool, there maybe up to N threads of txs that go through // because dedup uses a threadpool, there maybe up to N threads of txs that go through
assert_eq!(discard, 0); assert_eq!(discard, 0);
filter.maybe_reset(&mut rng, /*reset_cycle:*/ &Duration::from_millis(0)); assert!(!filter.maybe_reset(
&mut rng,
0.001, // false_positive_rate
Duration::from_millis(0), // reset_cycle
));
for i in filter.bits { for i in filter.bits {
assert_eq!(i.load(Ordering::Relaxed), 0); assert_eq!(i.load(Ordering::Relaxed), 0);
} }
} }
fn get_capacity<const K: usize>(num_bits: u64, false_positive_rate: f64) -> u64 {
(num_bits as f64 * false_positive_rate.powf(1f64 / K as f64)) as u64
}
#[test] #[test]
#[ignore] #[ignore]
fn test_dedup_saturated() { fn test_dedup_saturated() {
const NUM_BITS: u64 = 63_999_979;
const FALSE_POSITIVE_RATE: f64 = 0.001;
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let filter = Deduper::<2>::new( let mut filter = Deduper::<2>::new(&mut rng, NUM_BITS);
&mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979, let capacity = get_capacity::<2>(NUM_BITS, FALSE_POSITIVE_RATE);
);
let mut discard = 0; let mut discard = 0;
assert!(filter.popcount.load(Ordering::Relaxed) < filter.capacity); assert!(filter.popcount.load(Ordering::Relaxed) < capacity);
for i in 0..1000 { for i in 0..1000 {
let mut batches = let mut batches =
to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128); to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize; discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
trace!("{} {}", i, discard); trace!("{} {}", i, discard);
if filter.popcount.load(Ordering::Relaxed) >= filter.capacity { if filter.popcount.load(Ordering::Relaxed) > capacity {
break; break;
} }
} }
assert!(filter.popcount.load(Ordering::Relaxed) >= filter.capacity); assert!(filter.popcount.load(Ordering::Relaxed) > capacity);
assert!(filter.false_positive_rate() >= FALSE_POSITIVE_RATE);
assert!(filter.maybe_reset(
&mut rng,
FALSE_POSITIVE_RATE,
Duration::from_millis(0), // reset_cycle
));
} }
#[test] #[test]
fn test_dedup_false_positive() { fn test_dedup_false_positive() {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let filter = Deduper::<2>::new( let filter = Deduper::<2>::new(&mut rng, /*num_bits:*/ 63_999_979);
&mut rng, /*false_positive_rate:*/ 0.001, /*num_bits:*/ 63_999_979,
);
let mut discard = 0; let mut discard = 0;
for i in 0..10 { for i in 0..10 {
let mut batches = let mut batches =
@ -194,8 +214,18 @@ mod tests {
#[test_case(637_534_199, 0.0001, 6_375_341)] #[test_case(637_534_199, 0.0001, 6_375_341)]
fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) { fn test_dedup_capacity(num_bits: u64, false_positive_rate: f64, capacity: u64) {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let deduper = Deduper::<2>::new(&mut rng, false_positive_rate, num_bits); assert_eq!(get_capacity::<2>(num_bits, false_positive_rate), capacity);
assert_eq!(deduper.capacity, capacity); let mut deduper = Deduper::<2>::new(&mut rng, num_bits);
assert_eq!(deduper.false_positive_rate(), 0.0);
deduper.popcount.store(capacity, Ordering::Relaxed);
assert!(deduper.false_positive_rate() < false_positive_rate);
deduper.popcount.store(capacity + 1, Ordering::Relaxed);
assert!(deduper.false_positive_rate() >= false_positive_rate);
assert!(deduper.maybe_reset(
&mut rng,
false_positive_rate,
Duration::from_millis(0), // reset_cycle
));
} }
#[test_case([0xf9; 32], 3_199_997, 101_192, 51_414, 70, 101_125)] #[test_case([0xf9; 32], 3_199_997, 101_192, 51_414, 70, 101_125)]
@ -212,9 +242,10 @@ mod tests {
num_dups: usize, num_dups: usize,
popcount: u64, popcount: u64,
) { ) {
const FALSE_POSITIVE_RATE: f64 = 0.001;
let mut rng = ChaChaRng::from_seed(seed); let mut rng = ChaChaRng::from_seed(seed);
let deduper = Deduper::<2>::new(&mut rng, /*false_positive_rate:*/ 0.001, num_bits); let mut deduper = Deduper::<2>::new(&mut rng, num_bits);
assert_eq!(deduper.capacity, capacity); assert_eq!(get_capacity::<2>(num_bits, FALSE_POSITIVE_RATE), capacity);
let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default()); let mut packet = Packet::new([0u8; PACKET_DATA_SIZE], Meta::default());
let mut dup_count = 0usize; let mut dup_count = 0usize;
for _ in 0..num_packets { for _ in 0..num_packets {
@ -228,5 +259,11 @@ mod tests {
} }
assert_eq!(dup_count, num_dups); assert_eq!(dup_count, num_dups);
assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount); assert_eq!(deduper.popcount.load(Ordering::Relaxed), popcount);
assert!(deduper.false_positive_rate() < FALSE_POSITIVE_RATE);
assert!(!deduper.maybe_reset(
&mut rng,
FALSE_POSITIVE_RATE,
Duration::from_millis(0), // reset_cycle
));
} }
} }