diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 8de3a1d9a2..9ea6f030a1 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -25,10 +25,12 @@ use { solana_perf::packet::{Packet, Packets}, solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, - solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey, timing::duration_as_ms}, + solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey}, solana_streamer::streamer::PacketSender, std::collections::HashSet, std::{ + cmp::Reverse, + collections::HashMap, net::{SocketAddr, UdpSocket}, ops::Deref, sync::{ @@ -71,6 +73,58 @@ impl WindowServiceMetrics { } } +#[derive(Default)] +struct ReceiveWindowStats { + num_packets: usize, + num_shreds: usize, // num_discards: num_packets - num_shreds + num_repairs: usize, + elapsed: Duration, // excludes waiting time on the receiver channel. + slots: HashMap, + addrs: HashMap, + since: Option, +} + +impl ReceiveWindowStats { + fn maybe_submit(&mut self) { + const MAX_NUM_ADDRS: usize = 5; + const SUBMIT_CADENCE: Duration = Duration::from_secs(2); + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE { + return; + } + datapoint_info!( + "receive_window_stats", + ("num_packets", self.num_packets, i64), + ("num_shreds", self.num_shreds, i64), + ("num_repairs", self.num_repairs, i64), + ("elapsed_micros", self.elapsed.as_micros(), i64), + ); + for (slot, num_shreds) in &self.slots { + datapoint_info!( + "receive_window_num_slot_shreds", + ("slot", *slot, i64), + ("num_shreds", *num_shreds, i64) + ); + } + let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect(); + let reverse_count = |(_addr, count): &_| Reverse(*count); + if addrs.len() > MAX_NUM_ADDRS { + addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count); + addrs.truncate(MAX_NUM_ADDRS); + } + addrs.sort_unstable_by_key(reverse_count); + info!( + "num addresses: {}, top packets by source: {:?}", + self.addrs.len(), + addrs + ); + *self = Self { + since: Some(Instant::now()), + ..Self::default() + }; + } +} + fn verify_shred_slot(shred: &Shred, root: u64) -> bool { if shred.is_data() { // Only data shreds have parent information @@ -258,11 +312,11 @@ fn recv_window( leader_schedule_cache: &LeaderScheduleCache, bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, - my_pubkey: &Pubkey, verified_receiver: &CrossbeamReceiver>, retransmit: &PacketSender, shred_filter: F, thread_pool: &ThreadPool, + stats: &mut ReceiveWindowStats, ) -> Result<()> where F: Fn(&Shred, Arc, /*last root:*/ Slot) -> bool + Sync, @@ -270,9 +324,7 @@ where let timer = Duration::from_millis(200); let mut packets = verified_receiver.recv_timeout(timer)?; packets.extend(verified_receiver.try_iter().flatten()); - let total_packets: usize = packets.iter().map(|p| p.packets.len()).sum(); let now = Instant::now(); - inc_new_counter_debug!("streamer-recv_window-recv", total_packets); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); @@ -320,10 +372,15 @@ where .flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet)) .unzip() }); - - trace!("{:?} shreds from packets", shreds.len()); - - trace!("{} num total shreds received: {}", my_pubkey, total_packets); + stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); + stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); + stats.num_shreds += shreds.len(); + for shred in &shreds { + *stats.slots.entry(shred.slot()).or_default() += 1; + } + for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { + *stats.addrs.entry(packet.meta.addr()).or_default() += 1; + } for packets in packets.into_iter() { if !packets.is_empty() { @@ -333,12 +390,7 @@ where } insert_shred_sender.send((shreds, repair_infos))?; - - trace!( - "Elapsed processing time in recv_window(): {}", - duration_as_ms(&now.elapsed()) - ); - + stats.elapsed += now.elapsed(); Ok(()) } @@ -556,6 +608,7 @@ impl WindowService { + std::marker::Send + std::marker::Sync, { + let mut stats = ReceiveWindowStats::default(); Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -570,14 +623,13 @@ impl WindowService { inc_new_counter_error!("solana-window-error", 1, 1); }; - loop { - if exit.load(Ordering::Relaxed) { - break; - } - + while !exit.load(Ordering::Relaxed) { let mut handle_timeout = || { if now.elapsed() > Duration::from_secs(30) { - warn!("Window does not seem to be receiving data. Ensure port configuration is correct..."); + warn!( + "Window does not seem to be receiving data. \ + Ensure port configuration is correct..." + ); now = Instant::now(); } }; @@ -586,18 +638,11 @@ impl WindowService { &leader_schedule_cache, &bank_forks, &insert_sender, - &id, &verified_receiver, &retransmit, - |shred, bank, last_root| { - shred_filter( - &id, - shred, - Some(bank), - last_root, - ) - }, + |shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root), &thread_pool, + &mut stats, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { break; @@ -605,6 +650,7 @@ impl WindowService { } else { now = Instant::now(); } + stats.maybe_submit(); } }) .unwrap()