increases retransmit-stage deduper capacity and reset-cycle (#30758)

For duplicate block detection, for each (slot, shred-index, shred-type)
we need to allow 2 different shreds to be retransmitted.
The commit implements this using two bloom-filter dedupers:
* Shreds are deduplicated using the 1st deduper.
* If a shred is not a duplicate, then we check if:
      (slot, shred-index, shred-type, k)
  is not a duplicate for either k = 0  or k = 1 using the 2nd deduper,
  and if so then the shred is retransmitted.

This allows to achieve larger capactiy compared to current LRU-cache.
This commit is contained in:
behzad nouri 2023-03-20 20:32:23 +00:00 committed by GitHub
parent ce0e23fbab
commit 5d9aba5548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 169 deletions

View File

@ -44,7 +44,6 @@ pub mod next_leader;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_deserializer;
mod packet_hasher;
pub mod packet_threshold;
pub mod poh_timing_report_service;
pub mod poh_timing_reporter;

View File

@ -1,39 +0,0 @@
// Get a unique hash value for a packet
// Used in retransmit and shred fetch to prevent dos with same packet data.
use {
ahash::AHasher,
rand::{thread_rng, Rng},
std::hash::Hasher,
};
#[derive(Clone)]
pub(crate) struct PacketHasher {
seed1: u128,
seed2: u128,
}
impl Default for PacketHasher {
fn default() -> Self {
Self {
seed1: thread_rng().gen::<u128>(),
seed2: thread_rng().gen::<u128>(),
}
}
}
impl PacketHasher {
pub(crate) fn hash_shred(&self, shred: &[u8]) -> u64 {
self.hash_data(shred)
}
fn hash_data(&self, data: &[u8]) -> u64 {
let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2);
hasher.write(data);
hasher.finish()
}
pub(crate) fn reset(&mut self) {
*self = Self::default();
}
}

View File

@ -2,13 +2,11 @@
#![allow(clippy::rc_buffer)]
use {
crate::{
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
packet_hasher::PacketHasher,
},
crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache, Error, MAX_NUM_TURBINE_HOPS},
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rand::Rng,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::{
cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo,
@ -18,6 +16,7 @@ use {
shred::{self, ShredId},
},
solana_measure::measure::Measure,
solana_perf::deduper::Deduper,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_rpc_client_api::response::SlotUpdate,
@ -42,7 +41,9 @@ use {
};
const MAX_DUPLICATE_COUNT: usize = 2;
const DEFAULT_LRU_SIZE: usize = 1 << 20;
const DEDUPER_FALSE_POSITIVE_RATE: f64 = 0.001;
const DEDUPER_NUM_BITS: u64 = 637_534_199; // 76MB
const DEDUPER_RESET_CYCLE: Duration = Duration::from_secs(5 * 60);
// Minimum number of shreds to use rayon parallel iterators.
const PAR_ITER_MIN_NUM_SHREDS: usize = 2;
@ -131,45 +132,36 @@ impl RetransmitStats {
}
}
// Map of shred (slot, index, type) => list of hash values seen for that key.
type ShredFilter = LruCache<ShredId, Vec<u64>>;
// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(
key: ShredId,
shred: &[u8],
shreds_received: &mut ShredFilter,
packet_hasher: &PacketHasher,
) -> bool {
match shreds_received.get_mut(&key) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
Some(sent) => {
let hash = packet_hasher.hash_shred(shred);
if sent.contains(&hash) {
true
} else {
sent.push(hash);
false
}
}
None => {
let hash = packet_hasher.hash_shred(shred);
shreds_received.put(key, vec![hash]);
false
}
}
struct ShredDeduper<const K: usize> {
deduper: Deduper<K, /*shred:*/ [u8]>,
shred_id_filter: Deduper<K, (ShredId, /*0..MAX_DUPLICATE_COUNT:*/ usize)>,
}
fn maybe_reset_shreds_received_cache(
shreds_received: &mut ShredFilter,
packet_hasher: &mut PacketHasher,
hasher_reset_ts: &mut Instant,
) {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL {
*hasher_reset_ts = Instant::now();
shreds_received.clear();
packet_hasher.reset();
impl<const K: usize> ShredDeduper<K> {
fn new<R: Rng>(rng: &mut R, num_bits: u64) -> Self {
Self {
deduper: Deduper::new(rng, num_bits),
shred_id_filter: Deduper::new(rng, num_bits),
}
}
fn maybe_reset<R: Rng>(
&mut self,
rng: &mut R,
false_positive_rate: f64,
reset_cycle: Duration,
) {
self.deduper
.maybe_reset(rng, false_positive_rate, reset_cycle);
self.shred_id_filter
.maybe_reset(rng, false_positive_rate, reset_cycle);
}
fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool {
// In order to detect duplicate blocks across cluster, we retransmit
// max_duplicate_count different shreds for each ShredId.
self.deduper.dedup(shred)
|| (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i)))
}
}
@ -183,9 +175,7 @@ fn retransmit(
sockets: &[UdpSocket],
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &mut Instant,
shreds_received: &mut ShredFilter,
packet_hasher: &mut PacketHasher,
shred_deduper: &mut ShredDeduper<2>,
max_slots: &MaxSlots,
rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<(), RecvTimeoutError> {
@ -205,7 +195,11 @@ fn retransmit(
stats.epoch_fetch += epoch_fetch.as_us();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cache_update");
maybe_reset_shreds_received_cache(shreds_received, packet_hasher, hasher_reset_ts);
shred_deduper.maybe_reset(
&mut rand::thread_rng(),
DEDUPER_FALSE_POSITIVE_RATE,
DEDUPER_RESET_CYCLE,
);
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
// Lookup slot leader and cluster nodes for each slot.
@ -213,7 +207,7 @@ fn retransmit(
.into_iter()
.filter_map(|shred| {
let key = shred::layout::get_shred_id(&shred)?;
if should_skip_retransmit(key, &shred, shreds_received, packet_hasher) {
if shred_deduper.dedup(key, &shred, MAX_DUPLICATE_COUNT) {
stats.num_shreds_skipped += 1;
None
} else {
@ -377,10 +371,9 @@ pub fn retransmitter(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
);
let mut hasher_reset_ts = Instant::now();
let mut rng = rand::thread_rng();
let mut shred_deduper = ShredDeduper::<2>::new(&mut rng, DEDUPER_NUM_BITS);
let mut stats = RetransmitStats::new(Instant::now());
let mut shreds_received = LruCache::<ShredId, _>::new(DEFAULT_LRU_SIZE);
let mut packet_hasher = PacketHasher::default();
#[allow(clippy::manual_clamp)]
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
@ -400,9 +393,7 @@ pub fn retransmitter(
&sockets,
&mut stats,
&cluster_nodes_cache,
&mut hasher_reset_ts,
&mut shreds_received,
&mut packet_hasher,
&mut shred_deduper,
&max_slots,
rpc_subscriptions.as_deref(),
) {
@ -594,6 +585,8 @@ impl RetransmitSlotStats {
mod tests {
use {
super::*,
rand::SeedableRng,
rand_chacha::ChaChaRng,
solana_ledger::shred::{Shred, ShredFlags},
};
@ -612,22 +605,12 @@ mod tests {
version,
0,
);
let mut shreds_received = LruCache::new(100);
let packet_hasher = PacketHasher::default();
let mut rng = ChaChaRng::from_seed([0xa5; 32]);
let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007);
// unique shred for (1, 5) should pass
assert!(!should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
let shred = Shred::new_from_data(
slot,
@ -640,19 +623,9 @@ mod tests {
0,
);
// first duplicate shred for (1, 5) passed
assert!(!should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// then blocked
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
let shred = Shred::new_from_data(
slot,
@ -665,64 +638,24 @@ mod tests {
0,
);
// 2nd duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
let shred = Shred::new_from_parity_shard(slot, index, &[], 0, 1, 1, 0, version);
// Coding at (1, 5) passes
assert!(!should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// then blocked
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
let shred = Shred::new_from_parity_shard(slot, index, &[], 2, 1, 1, 0, version);
// 2nd unique coding at (1, 5) passes
assert!(!should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
// same again is blocked
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
let shred = Shred::new_from_parity_shard(slot, index, &[], 3, 1, 1, 0, version);
// Another unique coding at (1, 5) always blocked
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
shred.id(),
shred.payload(),
&mut shreds_received,
&packet_hasher
));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));
}
}

View File

@ -3211,7 +3211,7 @@ impl Blockstore {
// given slot and index as this implies the leader generated two different shreds with
// the same slot and index
pub fn is_shred_duplicate(&self, shred: ShredId, payload: Vec<u8>) -> Option<Vec<u8>> {
let (slot, index, shred_type) = shred.unwrap();
let (slot, index, shred_type) = shred.unpack();
let existing_shred = match shred_type {
ShredType::Data => self.get_data_shred(slot, index as u64),
ShredType::Code => self.get_coding_shred(slot, index as u64),

View File

@ -258,7 +258,7 @@ impl ShredId {
self.0
}
pub(crate) fn unwrap(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
pub(crate) fn unpack(&self) -> (Slot, /*shred index:*/ u32, ShredType) {
(self.0, self.1, self.2)
}