moves packet-hasher out of the mutex (#26091)

Packet-hasher is not mutated across threads and does not need to be
wrapped in a mutex.
This commit is contained in:
behzad nouri 2022-06-21 16:29:27 +00:00 committed by GitHub
parent e344c8476f
commit d2afa6b418
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 34 deletions

View File

@ -35,7 +35,7 @@ pub mod ledger_cleanup_service;
pub mod ledger_metric_report_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_hasher;
mod packet_hasher;
pub mod packet_threshold;
pub mod poh_timing_report_service;
pub mod poh_timing_reporter;

View File

@ -10,7 +10,7 @@ use {
};
#[derive(Clone)]
pub struct PacketHasher {
pub(crate) struct PacketHasher {
seed1: u128,
seed2: u128,
}
@ -39,7 +39,7 @@ impl PacketHasher {
hasher.finish()
}
pub fn reset(&mut self) {
pub(crate) fn reset(&mut self) {
*self = Self::default();
}
}

View File

@ -36,7 +36,7 @@ use {
std::{
collections::{BTreeSet, HashMap, HashSet},
net::UdpSocket,
ops::{AddAssign, DerefMut},
ops::AddAssign,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
@ -127,17 +127,18 @@ impl RetransmitStats {
// Map of shred (slot, index, type) => list of hash values seen for that key.
type ShredFilter = LruCache<ShredId, Vec<u64>>;
type ShredFilterAndHasher = (ShredFilter, PacketHasher);
// Returns true if shred is already received and should skip retransmit.
fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndHasher>) -> bool {
fn should_skip_retransmit(
shred: &Shred,
shreds_received: &Mutex<ShredFilter>,
packet_hasher: &PacketHasher,
) -> bool {
let key = shred.id();
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
match cache.get_mut(&key) {
match shreds_received.get_mut(&key) {
Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true,
Some(sent) => {
let hash = hasher.hash_shred(shred);
let hash = packet_hasher.hash_shred(shred);
if sent.contains(&hash) {
true
} else {
@ -146,8 +147,8 @@ fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex<ShredFilterAndH
}
}
None => {
let hash = hasher.hash_shred(shred);
cache.put(key, vec![hash]);
let hash = packet_hasher.hash_shred(shred);
shreds_received.put(key, vec![hash]);
false
}
}
@ -177,16 +178,15 @@ fn check_if_first_shred_received(
}
fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilterAndHasher>,
shreds_received: &Mutex<ShredFilter>,
packet_hasher: &mut PacketHasher,
hasher_reset_ts: &mut Instant,
) {
const UPDATE_INTERVAL: Duration = Duration::from_secs(1);
if hasher_reset_ts.elapsed() >= UPDATE_INTERVAL {
*hasher_reset_ts = Instant::now();
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
cache.clear();
hasher.reset();
shreds_received.lock().unwrap().clear();
packet_hasher.reset();
}
}
@ -201,7 +201,8 @@ fn retransmit(
stats: &mut RetransmitStats,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
hasher_reset_ts: &mut Instant,
shreds_received: &Mutex<ShredFilterAndHasher>,
shreds_received: &Mutex<ShredFilter>,
packet_hasher: &mut PacketHasher,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: Option<&RpcSubscriptions>,
@ -222,13 +223,13 @@ fn retransmit(
stats.epoch_fetch += epoch_fetch.as_us();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cache_update");
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
maybe_reset_shreds_received_cache(shreds_received, packet_hasher, hasher_reset_ts);
epoch_cache_update.stop();
stats.epoch_cache_update += epoch_cache_update.as_us();
let socket_addr_space = cluster_info.socket_addr_space();
let retransmit_shred = |shred: &Shred, socket: &UdpSocket| {
if should_skip_retransmit(shred, shreds_received) {
if should_skip_retransmit(shred, shreds_received, packet_hasher) {
stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed);
return None;
}
@ -347,7 +348,8 @@ pub fn retransmitter(
);
let mut hasher_reset_ts = Instant::now();
let mut stats = RetransmitStats::new(Instant::now());
let shreds_received = Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default()));
let shreds_received = Mutex::new(LruCache::new(DEFAULT_LRU_SIZE));
let mut packet_hasher = PacketHasher::default();
let first_shreds_received = Mutex::<BTreeSet<Slot>>::default();
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
@ -371,6 +373,7 @@ pub fn retransmitter(
&cluster_nodes_cache,
&mut hasher_reset_ts,
&shreds_received,
&mut packet_hasher,
&max_slots,
&first_shreds_received,
rpc_subscriptions.as_deref(),
@ -631,11 +634,20 @@ mod tests {
version,
0,
);
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
let shreds_received = Mutex::new(LruCache::new(100));
let packet_hasher = PacketHasher::default();
// unique shred for (1, 5) should pass
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
// duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
let shred = Shred::new_from_data(
slot,
@ -648,9 +660,17 @@ mod tests {
0,
);
// first duplicate shred for (1, 5) passed
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
// then blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
let shred = Shred::new_from_data(
slot,
@ -663,24 +683,56 @@ mod tests {
0,
);
// 2nd duplicate shred for (1, 5) blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
let shred = Shred::new_from_parity_shard(slot, index, &[], 0, 1, 1, 0, version);
// Coding at (1, 5) passes
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
// then blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
let shred = Shred::new_from_parity_shard(slot, index, &[], 2, 1, 1, 0, version);
// 2nd unique coding at (1, 5) passes
assert!(!should_skip_retransmit(&shred, &shreds_received));
assert!(!should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
// same again is blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
let shred = Shred::new_from_parity_shard(slot, index, &[], 3, 1, 1, 0, version);
// Another unique coding at (1, 5) always blocked
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(&shred, &shreds_received));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
assert!(should_skip_retransmit(
&shred,
&shreds_received,
&packet_hasher
));
}
}