diff --git a/Cargo.lock b/Cargo.lock index 2ee844ae6..ba1affa1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5260,6 +5260,7 @@ dependencies = [ name = "solana-perf" version = "1.10.0" dependencies = [ + "ahash 0.7.6", "bincode", "bv", "caps", diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 37d511332..f9373b8c1 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -10,10 +10,9 @@ use { core::time::Duration, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, itertools::Itertools, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, - solana_perf::sigverify::dedup_packets, + solana_perf::sigverify::Deduper, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -215,7 +214,7 @@ impl SigVerifyStage { } fn verifier( - bloom: &AtomicBloom<&[u8]>, + deduper: &Deduper, recvr: &PacketBatchReceiver, sendr: &Sender>, verifier: &T, @@ -231,15 +230,15 @@ impl SigVerifyStage { ); let mut dedup_time = Measure::start("sigverify_dedup_time"); - let dedup_fail = dedup_packets(bloom, &mut batches) as usize; + let dedup_fail = deduper.dedup_packets(&mut batches) as usize; dedup_time.stop(); - let valid_packets = num_packets.saturating_sub(dedup_fail); + let num_unique = num_packets.saturating_sub(dedup_fail); let mut discard_time = Measure::start("sigverify_discard_time"); - if valid_packets > MAX_SIGVERIFY_BATCH { + if num_unique > MAX_SIGVERIFY_BATCH { Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH) }; - let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH); + let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH); discard_time.stop(); let mut verify_batch_time = Measure::start("sigverify_batch_time"); @@ -290,25 +289,16 @@ impl SigVerifyStage { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); - const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000); - const MAX_BLOOM_ITEMS: usize = 1_000_000; - const MAX_BLOOM_FAIL: f64 = 0.0001; - const MAX_BLOOM_BITS: usize = 8 << 22; + const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2); + const MAX_DEDUPER_ITEMS: u32 = 1_000_000; Builder::new() .name("solana-verifier".to_string()) .spawn(move || { - let mut bloom = - Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); - let mut bloom_age = Instant::now(); + let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE); loop { - let now = Instant::now(); - if now.duration_since(bloom_age) > MAX_BLOOM_AGE { - bloom = - Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); - bloom_age = now; - } + deduper.reset(); if let Err(e) = Self::verifier( - &bloom, + &deduper, &packet_receiver, &verified_sender, &verifier, diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 4236b194c..ac76019c3 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -10,6 +10,7 @@ documentation = "https://docs.rs/solana-perf" edition = "2021" [dependencies] +ahash = "0.7.6" bincode = "1.3.3" curve25519-dalek = { version = "3" } dlopen = "0.1.8" diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index 8542da9b3..c2d72f083 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -5,14 +5,16 @@ extern crate test; use { rand::prelude::*, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_perf::{ packet::{to_packet_batches, PacketBatch}, sigverify, }, + std::time::Duration, test::Bencher, }; +const NUM: usize = 4096; + fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { // subtract 8 bytes because the length will get serialized as well (0..size.checked_sub(8).unwrap()) @@ -22,20 +24,14 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { // verify packets - let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000)); bencher.iter(|| { - // bench - sigverify::dedup_packets(&bloom, &mut batches); - - // reset - bloom.clear_for_tests(); - batches.iter_mut().for_each(|batch| { - batch - .packets - .iter_mut() - .for_each(|p| p.meta.set_discard(false)) - }); - }) + let _ans = deduper.dedup_packets(&mut batches); + deduper.reset(); + batches + .iter_mut() + .for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.set_discard(false))); + }); } #[bench] @@ -46,7 +42,7 @@ fn bench_dedup_same_small_packets(bencher: &mut Bencher) { let batches = to_packet_batches( &std::iter::repeat(small_packet) - .take(4096) + .take(NUM) .collect::>(), 128, ); @@ -61,7 +57,7 @@ fn bench_dedup_same_big_packets(bencher: &mut Bencher) { let big_packet = test_packet_with_size(1024, &mut rng); let batches = to_packet_batches( - &std::iter::repeat(big_packet).take(4096).collect::>(), + &std::iter::repeat(big_packet).take(NUM).collect::>(), 128, ); @@ -74,7 +70,7 @@ fn bench_dedup_diff_small_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); let batches = to_packet_batches( - &(0..4096) + &(0..NUM) .map(|_| test_packet_with_size(128, &mut rng)) .collect::>(), 128, @@ -89,7 +85,7 @@ fn bench_dedup_diff_big_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); let batches = to_packet_batches( - &(0..4096) + &(0..NUM) .map(|_| test_packet_with_size(1024, &mut rng)) .collect::>(), 128, @@ -97,3 +93,27 @@ fn bench_dedup_diff_big_packets(bencher: &mut Bencher) { do_bench_dedup_packets(bencher, batches); } + +#[bench] +#[ignore] +fn bench_dedup_baseline(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..0) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + 128, + ); + + do_bench_dedup_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_dedup_reset(bencher: &mut Bencher) { + let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(0)); + bencher.iter(|| { + deduper.reset(); + }); +} diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index fa9f831d2..daf002614 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -4,7 +4,6 @@ //! to the GPU. //! -use solana_bloom::bloom::AtomicBloom; #[cfg(test)] use solana_sdk::transaction::Transaction; use { @@ -14,6 +13,8 @@ use { perf_libs, recycler::Recycler, }, + ahash::AHasher, + rand::{thread_rng, Rng}, rayon::ThreadPool, solana_metrics::inc_new_counter_debug, solana_rayon_threadlimit::get_thread_count, @@ -24,7 +25,9 @@ use { short_vec::decode_shortu16_len, signature::Signature, }, - std::sync::atomic::{AtomicU64, Ordering}, + std::hash::Hasher, + std::sync::atomic::{AtomicBool, AtomicU64, Ordering}, + std::time::{Duration, Instant}, std::{convert::TryFrom, mem::size_of}, }; @@ -420,34 +423,73 @@ pub fn generate_offsets( ) } -fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8]>) { - // If this packet was already marked as discard, drop it - if packet.meta.discard() { - return; - } - - // If this packet was not newly added, it's a dup and should be discarded - if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) { - packet.meta.set_discard(true); - count.fetch_add(1, Ordering::Relaxed); - } +pub struct Deduper { + filter: Vec, + seed: (u128, u128), + age: Instant, + max_age: Duration, + pub saturated: AtomicBool, } -pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 { - use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - // machine specific random offset to read the u64 from the packet signature - let count = AtomicU64::new(0); - PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|batch| { - batch - .packets - .par_iter_mut() - .for_each(|p| dedup_packet(&count, p, bloom)) - }) - }); - inc_new_counter_debug!("dedup_packets_total", packet_count); - count.load(Ordering::Relaxed) +impl Deduper { + pub fn new(size: u32, max_age: Duration) -> Self { + let mut filter: Vec = Vec::with_capacity(size as usize); + filter.resize_with(size as usize, Default::default); + let seed = thread_rng().gen(); + Self { + filter, + seed, + age: Instant::now(), + max_age, + saturated: AtomicBool::new(false), + } + } + + pub fn reset(&mut self) { + let now = Instant::now(); + //this should reset every 500k unique packets per 1m sized deduper + //false positive rate is 1/1000 at that point + let saturated = self.saturated.load(Ordering::Relaxed); + if saturated || now.duration_since(self.age) > self.max_age { + for i in &self.filter { + i.store(0, Ordering::Relaxed); + } + self.seed = thread_rng().gen(); + self.age = now; + self.saturated.store(false, Ordering::Relaxed); + } + } + + fn dedup_packet(&self, packet: &mut Packet) -> u64 { + // If this packet was already marked as discard, drop it + if packet.meta.discard() { + return 0; + } + let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1); + hasher.write(&packet.data[0..packet.meta.size]); + let hash = hasher.finish(); + let len = self.filter.len(); + let pos = (usize::try_from(hash).unwrap()).wrapping_rem(len); + // saturate each position with or + let prev = self.filter[pos].fetch_or(hash, Ordering::Relaxed); + if prev == u64::MAX { + self.saturated.store(true, Ordering::Relaxed); + //reset this value + self.filter[pos].store(hash, Ordering::Relaxed); + } + if hash == prev & hash { + packet.meta.set_discard(true); + return 1; + } + 0 + } + + pub fn dedup_packets(&self, batches: &mut [PacketBatch]) -> u64 { + batches + .iter_mut() + .flat_map(|batch| batch.packets.iter_mut().map(|p| self.dedup_packet(p))) + .sum() + } } pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { @@ -460,7 +502,7 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { .packets .par_iter_mut() .for_each(|p| verify_packet(p, reject_non_vote)) - }) + }); }); inc_new_counter_debug!("ed25519_verify_cpu", packet_count); } @@ -634,7 +676,6 @@ mod tests { test_tx::{new_test_vote_tx, test_multisig_tx, test_tx}, }, bincode::{deserialize, serialize}, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ instruction::CompiledInstruction, message::{Message, MessageHeader}, @@ -1299,26 +1340,57 @@ mod tests { fn test_dedup_same() { let tx = test_tx(); - // generate packet vector let mut batches = to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); - let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; - // because dedup uses a threadpool, there maybe up to N threads of txs that go through - let n = get_thread_count(); - assert!(packet_count < discard + n * 2); + let filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let discard = filter.dedup_packets(&mut batches) as usize; + assert_eq!(packet_count, discard + 1); } #[test] fn test_dedup_diff() { - // generate packet vector + let mut filter = Deduper::new(1_000_000, Duration::from_millis(0)); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); - let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + let discard = filter.dedup_packets(&mut batches) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through - let n = get_thread_count(); - assert!(discard < n * 2); + assert_eq!(discard, 0); + filter.reset(); + for i in filter.filter { + assert_eq!(i.load(Ordering::Relaxed), 0); + } + } + + #[test] + #[ignore] + fn test_dedup_saturated() { + let filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut discard = 0; + assert!(!filter.saturated.load(Ordering::Relaxed)); + for i in 0..1000 { + let mut batches = + to_packet_batches(&(0..1000).map(|_| test_tx()).collect::>(), 128); + discard += filter.dedup_packets(&mut batches) as usize; + println!("{} {}", i, discard); + if filter.saturated.load(Ordering::Relaxed) { + break; + } + } + assert!(filter.saturated.load(Ordering::Relaxed)); + } + + #[test] + fn test_dedup_false_positive() { + let filter = Deduper::new(1_000_000, Duration::from_millis(0)); + let mut discard = 0; + for i in 0..10 { + let mut batches = + to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); + discard += filter.dedup_packets(&mut batches) as usize; + println!("false positive rate: {}/{}", discard, i * 1024); + } + //allow for 1 false positive even if extremely unlikely + assert!(discard < 2); } } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index f51aed2af..7abc7d129 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -69,6 +69,17 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom 0.2.4", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.18" @@ -1158,9 +1169,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.1" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4060f4657be78b8e766215b02b18a2e862d83745545de804638e2b545e81aee6" +checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" dependencies = [ "cfg-if 1.0.0", "libc", @@ -1218,7 +1229,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" dependencies = [ - "ahash", + "ahash 0.4.7", ] [[package]] @@ -2127,7 +2138,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" dependencies = [ - "getrandom 0.2.1", + "getrandom 0.2.4", ] [[package]] @@ -2203,7 +2214,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" dependencies = [ - "getrandom 0.2.1", + "getrandom 0.2.4", "redox_syscall 0.2.10", ] @@ -3325,6 +3336,7 @@ dependencies = [ name = "solana-perf" version = "1.10.0" dependencies = [ + "ahash 0.7.6", "bincode", "bv", "caps",