moves slot updates notifications after shreds retransmit (#26094)

RetransmitSlotStats can already be utilized to track when the first
shred for a slot was received; therefore
    first_shreds_received: &Mutex<BTreeSet<Slot>>

is redundant. Sending update notifications after shreds retransmit will
also bypass the need for a mutex.
This commit is contained in:
behzad nouri 2022-06-21 17:19:40 -04:00 committed by GitHub
parent 61946a49c3
commit 75425521b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 18 additions and 39 deletions

View File

@ -34,7 +34,7 @@ use {
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}, solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
solana_streamer::sendmmsg::{multi_target_send, SendPktsError}, solana_streamer::sendmmsg::{multi_target_send, SendPktsError},
std::{ std::{
collections::{BTreeSet, HashMap, HashSet}, collections::{HashMap, HashSet},
net::UdpSocket, net::UdpSocket,
ops::AddAssign, ops::AddAssign,
sync::{ sync::{
@ -154,29 +154,6 @@ fn should_skip_retransmit(
} }
} }
// Returns true if this is the first time receiving a shred for `shred_slot`.
fn check_if_first_shred_received(
shred_slot: Slot,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
root_bank: &Bank,
) -> bool {
if shred_slot <= root_bank.slot() {
return false;
}
let mut first_shreds_received_locked = first_shreds_received.lock().unwrap();
if first_shreds_received_locked.insert(shred_slot) {
datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64));
if first_shreds_received_locked.len() > 100 {
*first_shreds_received_locked =
first_shreds_received_locked.split_off(&(root_bank.slot() + 1));
}
true
} else {
false
}
}
fn maybe_reset_shreds_received_cache( fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilter>, shreds_received: &Mutex<ShredFilter>,
packet_hasher: &mut PacketHasher, packet_hasher: &mut PacketHasher,
@ -204,7 +181,6 @@ fn retransmit(
shreds_received: &Mutex<ShredFilter>, shreds_received: &Mutex<ShredFilter>,
packet_hasher: &mut PacketHasher, packet_hasher: &mut PacketHasher,
max_slots: &MaxSlots, max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: Option<&RpcSubscriptions>, rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<(), RecvTimeoutError> { ) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1); const RECV_TIMEOUT: Duration = Duration::from_secs(1);
@ -238,15 +214,6 @@ fn retransmit(
.retransmit .retransmit
.fetch_max(shred_slot, Ordering::Relaxed); .fetch_max(shred_slot, Ordering::Relaxed);
if let Some(rpc_subscriptions) = rpc_subscriptions {
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot: shred_slot,
timestamp: timestamp(),
});
}
}
let mut compute_turbine_peers = Measure::start("turbine_start"); let mut compute_turbine_peers = Measure::start("turbine_start");
// TODO: consider using root-bank here for leader lookup! // TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here, and if // Shreds' signatures should be verified before they reach here, and if
@ -318,7 +285,7 @@ fn retransmit(
) )
.reduce(HashMap::new, RetransmitSlotStats::merge) .reduce(HashMap::new, RetransmitSlotStats::merge)
}); });
stats.upsert_slot_stats(slot_stats); stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions);
timer_start.stop(); timer_start.stop();
stats.total_time += timer_start.as_us(); stats.total_time += timer_start.as_us();
stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);
@ -350,7 +317,6 @@ pub fn retransmitter(
let mut stats = RetransmitStats::new(Instant::now()); let mut stats = RetransmitStats::new(Instant::now());
let shreds_received = Mutex::new(LruCache::new(DEFAULT_LRU_SIZE)); let shreds_received = Mutex::new(LruCache::new(DEFAULT_LRU_SIZE));
let mut packet_hasher = PacketHasher::default(); let mut packet_hasher = PacketHasher::default();
let first_shreds_received = Mutex::<BTreeSet<Slot>>::default();
let num_threads = get_thread_count().min(8).max(sockets.len()); let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new() let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads) .num_threads(num_threads)
@ -375,7 +341,6 @@ pub fn retransmitter(
&shreds_received, &shreds_received,
&mut packet_hasher, &mut packet_hasher,
&max_slots, &max_slots,
&first_shreds_received,
rpc_subscriptions.as_deref(), rpc_subscriptions.as_deref(),
) { ) {
Ok(()) => (), Ok(()) => (),
@ -538,13 +503,27 @@ impl RetransmitStats {
} }
} }
fn upsert_slot_stats<I>(&mut self, feed: I) fn upsert_slot_stats<I>(
where &mut self,
feed: I,
root: Slot,
rpc_subscriptions: Option<&RpcSubscriptions>,
) where
I: IntoIterator<Item = (Slot, RetransmitSlotStats)>, I: IntoIterator<Item = (Slot, RetransmitSlotStats)>,
{ {
for (slot, slot_stats) in feed { for (slot, slot_stats) in feed {
match self.slot_stats.get_mut(&slot) { match self.slot_stats.get_mut(&slot) {
None => { None => {
if let Some(rpc_subscriptions) = rpc_subscriptions {
if slot > root {
let slot_update = SlotUpdate::FirstShredReceived {
slot,
timestamp: slot_stats.outset,
};
rpc_subscriptions.notify_slot_update(slot_update);
datapoint_info!("retransmit-first-shred", ("slot", slot, i64));
}
}
self.slot_stats.put(slot, slot_stats); self.slot_stats.put(slot, slot_stats);
} }
Some(entry) => { Some(entry) => {