From 5d9aba5548b17d1763683efac9cb0f5a67d0b76d Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 20 Mar 2023 20:32:23 +0000 Subject: [PATCH] increases retransmit-stage deduper capacity and reset-cycle (#30758) For duplicate block detection, for each (slot, shred-index, shred-type) we need to allow 2 different shreds to be retransmitted. The commit implements this using two bloom-filter dedupers: * Shreds are deduplicated using the 1st deduper. * If a shred is not a duplicate, then we check if: (slot, shred-index, shred-type, k) is not a duplicate for either k = 0 or k = 1 using the 2nd deduper, and if so then the shred is retransmitted. This allows to achieve larger capactiy compared to current LRU-cache. --- core/src/lib.rs | 1 - core/src/packet_hasher.rs | 39 -------- core/src/retransmit_stage.rs | 187 +++++++++++------------------------ ledger/src/blockstore.rs | 2 +- ledger/src/shred.rs | 2 +- 5 files changed, 62 insertions(+), 169 deletions(-) delete mode 100644 core/src/packet_hasher.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 4891bce930..f2247befad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -44,7 +44,6 @@ pub mod next_leader; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; pub mod packet_deserializer; -mod packet_hasher; pub mod packet_threshold; pub mod poh_timing_report_service; pub mod poh_timing_reporter; diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs deleted file mode 100644 index 2734b025ce..0000000000 --- a/core/src/packet_hasher.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Get a unique hash value for a packet -// Used in retransmit and shred fetch to prevent dos with same packet data. - -use { - ahash::AHasher, - rand::{thread_rng, Rng}, - std::hash::Hasher, -}; - -#[derive(Clone)] -pub(crate) struct PacketHasher { - seed1: u128, - seed2: u128, -} - -impl Default for PacketHasher { - fn default() -> Self { - Self { - seed1: thread_rng().gen::(), - seed2: thread_rng().gen::(), - } - } -} - -impl PacketHasher { - pub(crate) fn hash_shred(&self, shred: &[u8]) -> u64 { - self.hash_data(shred) - } - - fn hash_data(&self, data: &[u8]) -> u64 { - let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2); - hasher.write(data); - hasher.finish() - } - - pub(crate) fn reset(&mut self) { - *self = Self::default(); - } -} diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8ecb13e723..96eeffff8c 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -2,13 +2,11 @@ #![allow(clippy::rc_buffer)] use { - crate::{ - cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, - packet_hasher::PacketHasher, - }, + crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS}, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::{izip, Itertools}, lru::LruCache, + rand::Rng, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_gossip::{ cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo, @@ -18,6 +16,7 @@ use { shred::{self, ShredId}, }, solana_measure::measure::Measure, + solana_perf::deduper::Deduper, solana_rayon_threadlimit::get_thread_count, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_rpc_client_api::response::SlotUpdate, @@ -42,7 +41,9 @@ use { }; const MAX_DUPLICATE_COUNT: usize = 2; -const DEFAULT_LRU_SIZE: usize = 1 << 20; +const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001; +const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB +const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60); // Minimum number of shreds to use rayon parallel iterators. const PAR_ITER_MIN_NUM_SHREDS: usize = 2; @@ -131,45 +132,36 @@ impl RetransmitStats { } } -// Map of shred (slot, index, type) => list of hash values seen for that key. -type ShredFilter = LruCache>; - -// Returns true if shred is already received and should skip retransmit. -fn should_skip_retransmit( - key: ShredId, - shred: &[u8], - shreds_received: &mut ShredFilter, - packet_hasher: &PacketHasher, -) -> bool { - match shreds_received.get_mut(&key) { - Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true, - Some(sent) => { - let hash = packet_hasher.hash_shred(shred); - if sent.contains(&hash) { - true - } else { - sent.push(hash); - false - } - } - None => { - let hash = packet_hasher.hash_shred(shred); - shreds_received.put(key, vec![hash]); - false - } - } +struct ShredDeduper { + deduper: Deduper, + shred_id_filter: Deduper, } -fn maybe_reset_shreds_received_cache( - shreds_received: &mut ShredFilter, - packet_hasher: &mut PacketHasher, - hasher_reset_ts: &mut Instant, -) { - const UPDATE_INTERVAL: Duration = Duration::from_secs(1); - if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL { - *hasher_reset_ts = Instant::now(); - shreds_received.clear(); - packet_hasher.reset(); +impl ShredDeduper { + fn new(rng: &mut R, num_bits: u64) -> Self { + Self { + deduper: Deduper::new(rng, num_bits), + shred_id_filter: Deduper::new(rng, num_bits), + } + } + + fn maybe_reset( + &mut self, + rng: &mut R, + false_positive_rate: f64, + reset_cycle: Duration, + ) { + self.deduper + .maybe_reset(rng, false_positive_rate, reset_cycle); + self.shred_id_filter + .maybe_reset(rng, false_positive_rate, reset_cycle); + } + + fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool { + // In order to detect duplicate blocks across cluster, we retransmit + // max_duplicate_count different shreds for each ShredId. + self.deduper.dedup(shred) + || (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i))) } } @@ -183,9 +175,7 @@ fn retransmit( sockets: &[UdpSocket], stats: &mut RetransmitStats, cluster_nodes_cache: &ClusterNodesCache, - hasher_reset_ts: &mut Instant, - shreds_received: &mut ShredFilter, - packet_hasher: &mut PacketHasher, + shred_deduper: &mut ShredDeduper<2>, max_slots: &MaxSlots, rpc_subscriptions: Option<&RpcSubscriptions>, ) -> Result<(), RecvTimeoutError> { @@ -205,7 +195,11 @@ fn retransmit( stats.epoch_fetch += epoch_fetch.as_us(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cache_update"); - maybe_reset_shreds_received_cache(shreds_received, packet_hasher, hasher_reset_ts); + shred_deduper.maybe_reset( + &mut rand::thread_rng(), + DEDUPER_FALSE_POSITIVE_RATE, + DEDUPER_RESET_CYCLE, + ); epoch_cache_update.stop(); stats.epoch_cache_update += epoch_cache_update.as_us(); // Lookup slot leader and cluster nodes for each slot. @@ -213,7 +207,7 @@ fn retransmit( .into_iter() .filter_map(|shred| { let key = shred::layout::get_shred_id(&shred)?; - if should_skip_retransmit(key, &shred, shreds_received, packet_hasher) { + if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) { stats.num_shreds_skipped += 1; None } else { @@ -377,10 +371,9 @@ pub fn retransmitter( CLUSTER_NODES_CACHE_NUM_EPOCH_CAP, CLUSTER_NODES_CACHE_TTL, ); - let mut hasher_reset_ts = Instant::now(); + let mut rng = rand::thread_rng(); + let mut shred_deduper = ShredDeduper::<2>::new(&mut rng, DEDUPER_NUM_BITS); let mut stats = RetransmitStats::new(Instant::now()); - let mut shreds_received = LruCache::::new(DEFAULT_LRU_SIZE); - let mut packet_hasher = PacketHasher::default(); #[allow(clippy::manual_clamp)] let num_threads = get_thread_count().min(8).max(sockets.len()); let thread_pool = ThreadPoolBuilder::new() @@ -400,9 +393,7 @@ pub fn retransmitter( &sockets, &mut stats, &cluster_nodes_cache, - &mut hasher_reset_ts, - &mut shreds_received, - &mut packet_hasher, + &mut shred_deduper, &max_slots, rpc_subscriptions.as_deref(), ) { @@ -594,6 +585,8 @@ impl RetransmitSlotStats { mod tests { use { super::*, + rand::SeedableRng, + rand_chacha::ChaChaRng, solana_ledger::shred::{Shred, ShredFlags}, }; @@ -612,22 +605,12 @@ mod tests { version, 0, ); - let mut shreds_received = LruCache::new(100); - let packet_hasher = PacketHasher::default(); + let mut rng = ChaChaRng::from_seed([0xa5; 32]); + let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007); // unique shred for (1, 5) should pass - assert!(!should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); // duplicate shred for (1, 5) blocked - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); let shred = Shred::new_from_data( slot, @@ -640,19 +623,9 @@ mod tests { 0, ); // first duplicate shred for (1, 5) passed - assert!(!should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); // then blocked - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); let shred = Shred::new_from_data( slot, @@ -665,64 +638,24 @@ mod tests { 0, ); // 2nd duplicate shred for (1, 5) blocked - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); let shred = Shred::new_from_parity_shard(slot, index, &[], 0, 1, 1, 0, version); // Coding at (1, 5) passes - assert!(!should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); // then blocked - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); let shred = Shred::new_from_parity_shard(slot, index, &[], 2, 1, 1, 0, version); // 2nd unique coding at (1, 5) passes - assert!(!should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); // same again is blocked - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); let shred = Shred::new_from_parity_shard(slot, index, &[], 3, 1, 1, 0, version); // Another unique coding at (1, 5) always blocked - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); - assert!(should_skip_retransmit( - shred.id(), - shred.payload(), - &mut shreds_received, - &packet_hasher - )); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); + assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 7be984fdc5..dada1dd187 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -3211,7 +3211,7 @@ impl Blockstore { // given slot and index as this implies the leader generated two different shreds with // the same slot and index pub fn is_shred_duplicate(&self, shred: ShredId, payload: Vec) -> Option> { - let (slot, index, shred_type) = shred.unwrap(); + let (slot, index, shred_type) = shred.unpack(); let existing_shred = match shred_type { ShredType::Data => self.get_data_shred(slot, index as u64), ShredType::Code => self.get_coding_shred(slot, index as u64), diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 9beb856087..ca6bd55837 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -258,7 +258,7 @@ impl ShredId { self.0 } - pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) { + pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) { (self.0, self.1, self.2) }