adds packet/shred count stats to window-service

Adding back these metrics from the earlier commit which removed them
from retransmit stage.
This commit is contained in:
behzad nouri 2021-08-12 20:58:23 -04:00
parent bf437b0336
commit 8198a7eae1
1 changed files with 75 additions and 29 deletions

View File

@ -25,10 +25,12 @@ use {
solana_perf::packet::{Packet, Packets},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms},
solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey},
solana_streamer::streamer::PacketSender,
std::collections::HashSet,
std::{
cmp::Reverse,
collections::HashMap,
net::{SocketAddr, UdpSocket},
ops::Deref,
sync::{
@ -71,6 +73,58 @@ impl WindowServiceMetrics {
}
}
#[derive(Default)]
struct ReceiveWindowStats {
num_packets: usize,
num_shreds: usize, // num_discards: num_packets - num_shreds
num_repairs: usize,
elapsed: Duration, // excludes waiting time on the receiver channel.
slots: HashMap<Slot, /*num shreds:*/ usize>,
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
since: Option<Instant>,
}
impl ReceiveWindowStats {
fn maybe_submit(&mut self) {
const MAX_NUM_ADDRS: usize = 5;
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
let elapsed = self.since.as_ref().map(Instant::elapsed);
if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
return;
}
datapoint_info!(
"receive_window_stats",
("num_packets", self.num_packets, i64),
("num_shreds", self.num_shreds, i64),
("num_repairs", self.num_repairs, i64),
("elapsed_micros", self.elapsed.as_micros(), i64),
);
for (slot, num_shreds) in &self.slots {
datapoint_info!(
"receive_window_num_slot_shreds",
("slot", *slot, i64),
("num_shreds", *num_shreds, i64)
);
}
let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
addrs.truncate(MAX_NUM_ADDRS);
}
addrs.sort_unstable_by_key(reverse_count);
info!(
"num addresses: {}, top packets by source: {:?}",
self.addrs.len(),
addrs
);
*self = Self {
since: Some(Instant::now()),
..Self::default()
};
}
}
fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
if shred.is_data() {
// Only data shreds have parent information
@ -258,11 +312,11 @@ fn recv_window<F>(
leader_schedule_cache: &LeaderScheduleCache,
bank_forks: &RwLock<BankForks>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
retransmit: &PacketSender,
shred_filter: F,
thread_pool: &ThreadPool,
stats: &mut ReceiveWindowStats,
) -> Result<()>
where
F: Fn(&Shred, Arc<Bank>, /*last root:*/ Slot) -> bool + Sync,
@ -270,9 +324,7 @@ where
let timer = Duration::from_millis(200);
let mut packets = verified_receiver.recv_timeout(timer)?;
packets.extend(verified_receiver.try_iter().flatten());
let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum();
let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
@ -320,10 +372,15 @@ where
.flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet))
.unzip()
});
trace!("{:?} shreds from packets", shreds.len());
trace!("{} num total shreds received: {}", my_pubkey, total_packets);
stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::<usize>();
stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
stats.num_shreds += shreds.len();
for shred in &shreds {
*stats.slots.entry(shred.slot()).or_default() += 1;
}
for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) {
*stats.addrs.entry(packet.meta.addr()).or_default() += 1;
}
for packets in packets.into_iter() {
if !packets.is_empty() {
@ -333,12 +390,7 @@ where
}
insert_shred_sender.send((shreds, repair_infos))?;
trace!(
"Elapsed processing time in recv_window(): {}",
duration_as_ms(&now.elapsed())
);
stats.elapsed += now.elapsed();
Ok(())
}
@ -556,6 +608,7 @@ impl WindowService {
+ std::marker::Send
+ std::marker::Sync,
{
let mut stats = ReceiveWindowStats::default();
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -570,14 +623,13 @@ impl WindowService {
inc_new_counter_error!("solana-window-error", 1, 1);
};
loop {
if exit.load(Ordering::Relaxed) {
break;
}
while !exit.load(Ordering::Relaxed) {
let mut handle_timeout = || {
if now.elapsed() > Duration::from_secs(30) {
warn!("Window does not seem to be receiving data. Ensure port configuration is correct...");
warn!(
"Window does not seem to be receiving data. \
Ensure port configuration is correct..."
);
now = Instant::now();
}
};
@ -586,18 +638,11 @@ impl WindowService {
&leader_schedule_cache,
&bank_forks,
&insert_sender,
&id,
&verified_receiver,
&retransmit,
|shred, bank, last_root| {
shred_filter(
&id,
shred,
Some(bank),
last_root,
)
},
|shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root),
&thread_pool,
&mut stats,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break;
@ -605,6 +650,7 @@ impl WindowService {
} else {
now = Instant::now();
}
stats.maybe_submit();
}
})
.unwrap()