diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9273242d62..07b58d5c81 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -19,11 +19,12 @@ use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; use solana_perf::packet::Packets; use solana_sdk::epoch_schedule::EpochSchedule; +use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; use std::{ cmp, net::UdpSocket, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::mpsc::channel, sync::mpsc::RecvTimeoutError, sync::Mutex, @@ -36,6 +37,91 @@ use std::{ // it doesn't pull up too much work. const MAX_PACKET_BATCH_SIZE: usize = 100; +#[derive(Default)] +struct RetransmitStats { + total_packets: AtomicU64, + total_batches: AtomicU64, + total_time: AtomicU64, + repair_total: AtomicU64, + discard_total: AtomicU64, + retransmit_total: AtomicU64, + last_ts: AtomicU64, + compute_turbine_peers_total: AtomicU64, +} + +fn update_retransmit_stats( + stats: &Arc, + total_time: u64, + total_packets: usize, + retransmit_total: u64, + discard_total: u64, + repair_total: u64, + compute_turbine_peers_total: u64, + peers_len: usize, +) { + stats.total_time.fetch_add(total_time, Ordering::Relaxed); + stats + .total_packets + .fetch_add(total_packets as u64, Ordering::Relaxed); + stats + .retransmit_total + .fetch_add(retransmit_total, Ordering::Relaxed); + stats + .repair_total + .fetch_add(repair_total, Ordering::Relaxed); + stats + .discard_total + .fetch_add(discard_total, Ordering::Relaxed); + stats + .compute_turbine_peers_total + .fetch_add(compute_turbine_peers_total, Ordering::Relaxed); + stats.total_batches.fetch_add(1, Ordering::Relaxed); + + let now = timestamp(); + let last = stats.last_ts.load(Ordering::Relaxed); + if now - last > 2000 && stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last { + datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); + datapoint_info!( + "retransmit-stage", + ( + "total_time", + stats.total_time.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_batches", + stats.total_batches.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_packets", + stats.total_packets.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "retransmit_total", + stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "compute_turbine", + stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "repair_total", + stats.repair_total.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "discard_total", + stats.discard_total.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); + } +} + fn retransmit( bank_forks: &Arc>, leader_schedule_cache: &Arc, @@ -43,6 +129,7 @@ fn retransmit( r: &Arc>, sock: &UdpSocket, id: u32, + stats: &Arc, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -104,7 +191,7 @@ fn retransmit( let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect(); let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect(); compute_turbine_peers.stop(); - compute_turbine_peers_total += compute_turbine_peers.as_ms(); + compute_turbine_peers_total += compute_turbine_peers.as_us(); let leader = leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref())); @@ -116,7 +203,7 @@ fn retransmit( ClusterInfo::retransmit_to(&children, packet, leader, sock, true)?; } retransmit_time.stop(); - retransmit_total += retransmit_time.as_ms(); + retransmit_total += retransmit_time.as_us(); } } timer_start.stop(); @@ -127,16 +214,17 @@ fn retransmit( retransmit_total, id, ); - datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64)); - datapoint_debug!( - "retransmit-stage", - ("total_time", timer_start.as_ms() as i64, i64), - ("total_packets", total_packets as i64, i64), - ("retransmit_total", retransmit_total as i64, i64), - ("compute_turbine", compute_turbine_peers_total as i64, i64), - ("repair_total", i64::from(repair_total), i64), - ("discard_total", i64::from(discard_total), i64), + update_retransmit_stats( + stats, + timer_start.as_us(), + total_packets, + retransmit_total, + discard_total, + repair_total, + compute_turbine_peers_total, + peers_len, ); + Ok(()) } @@ -155,6 +243,7 @@ pub fn retransmitter( cluster_info: Arc, r: Arc>, ) -> Vec> { + let stats = Arc::new(RetransmitStats::default()); (0..sockets.len()) .map(|s| { let sockets = sockets.clone(); @@ -162,6 +251,7 @@ pub fn retransmitter( let leader_schedule_cache = leader_schedule_cache.clone(); let r = r.clone(); let cluster_info = cluster_info.clone(); + let stats = stats.clone(); Builder::new() .name("solana-retransmitter".to_string()) @@ -175,6 +265,7 @@ pub fn retransmitter( &r, &sockets[s], s as u32, + &stats, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 02d724a6ba..6b7faa85c0 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -22,12 +22,23 @@ use std::time::Instant; pub type ShredsReceived = HashMap>; +#[derive(Default)] +struct ShredFetchStats { + index_overrun: usize, + shred_count: usize, + index_bad_deserialize: usize, + index_out_of_bounds: usize, + slot_bad_deserialize: usize, + duplicate_shred: usize, + slot_out_of_range: usize, +} + pub struct ShredFetchStage { thread_hdls: Vec>, } impl ShredFetchStage { - fn get_slot_index(p: &Packet, index_overrun: &mut usize) -> Option<(u64, u32)> { + fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> { let index_start = OFFSET_OF_SHRED_INDEX; let index_end = index_start + SIZE_OF_SHRED_INDEX; let slot_start = OFFSET_OF_SHRED_SLOT; @@ -38,11 +49,17 @@ impl ShredFetchStage { if index < MAX_DATA_SHREDS_PER_SLOT as u32 && slot_end <= p.meta.size { if let Ok(slot) = limited_deserialize::(&p.data[slot_start..slot_end]) { return Some((slot, index)); + } else { + stats.slot_bad_deserialize += 1; } + } else { + stats.index_out_of_bounds += 1; } + } else { + stats.index_bad_deserialize += 1; } } else { - *index_overrun += 1; + stats.index_overrun += 1; } None } @@ -50,7 +67,7 @@ impl ShredFetchStage { fn process_packet( p: &mut Packet, shreds_received: &mut ShredsReceived, - index_overrun: &mut usize, + stats: &mut ShredFetchStats, last_root: Slot, last_slot: Slot, slots_per_epoch: u64, @@ -59,7 +76,7 @@ impl ShredFetchStage { F: Fn(&mut Packet), { p.meta.discard = true; - if let Some((slot, index)) = Self::get_slot_index(p, index_overrun) { + if let Some((slot, index)) = Self::get_slot_index(p, stats) { // Seems reasonable to limit shreds to 2 epochs away if slot > last_root && slot < (last_slot + 2 * slots_per_epoch) { // Shred filter @@ -70,7 +87,11 @@ impl ShredFetchStage { p.meta.discard = false; modify(p); slot_received.set(index.into(), true); + } else { + stats.duplicate_shred += 1; } + } else { + stats.slot_out_of_range += 1; } } } @@ -80,6 +101,7 @@ impl ShredFetchStage { recvr: PacketReceiver, sendr: PacketSender, bank_forks: Option>>, + name: &'static str, modify: F, ) where F: Fn(&mut Packet), @@ -92,6 +114,9 @@ impl ShredFetchStage { let mut last_slot = std::u64::MAX; let mut slots_per_epoch = 0; + let mut last_stats = Instant::now(); + let mut stats = ShredFetchStats::default(); + while let Some(mut p) = recvr.iter().next() { if last_cleared.elapsed().as_millis() > 200 { shreds_received.clear(); @@ -105,22 +130,32 @@ impl ShredFetchStage { slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); } } - let mut index_overrun = 0; - let mut shred_count = 0; + stats.shred_count += p.packets.len(); p.packets.iter_mut().for_each(|mut packet| { - shred_count += 1; Self::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, last_root, last_slot, slots_per_epoch, &modify, ); }); - inc_new_counter_warn!("shred_fetch_stage-shred_index_overrun", index_overrun); - inc_new_counter_info!("shred_fetch_stage-shred_count", shred_count); + if last_stats.elapsed().as_millis() > 1000 { + datapoint_info!( + name, + ("index_overrun", stats.index_overrun, i64), + ("shred_count", stats.shred_count, i64), + ("slot_bad_deserialize", stats.slot_bad_deserialize, i64), + ("index_bad_deserialize", stats.index_bad_deserialize, i64), + ("index_out_of_bounds", stats.index_out_of_bounds, i64), + ("slot_out_of_range", stats.slot_out_of_range, i64), + ("duplicate_shred", stats.duplicate_shred, i64), + ); + stats = ShredFetchStats::default(); + last_stats = Instant::now(); + } if sendr.send(p).is_err() { break; } @@ -133,6 +168,7 @@ impl ShredFetchStage { sender: PacketSender, recycler: Recycler>, bank_forks: Option>>, + name: &'static str, modify: F, ) -> (Vec>, JoinHandle<()>) where @@ -154,7 +190,7 @@ impl ShredFetchStage { let modifier_hdl = Builder::new() .name("solana-tvu-fetch-stage-packet-modifier".to_string()) - .spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, modify)) + .spawn(move || Self::modify_packets(packet_receiver, sender, bank_forks, name, modify)) .unwrap(); (streamers, modifier_hdl) } @@ -185,6 +221,7 @@ impl ShredFetchStage { sender.clone(), recycler.clone(), bank_forks.clone(), + "shred_fetch_tvu_forwards", |p| p.meta.forward = true, ); @@ -194,6 +231,7 @@ impl ShredFetchStage { sender.clone(), recycler.clone(), bank_forks, + "shred_fetch_repair", |p| p.meta.repair = true, ); @@ -225,7 +263,7 @@ mod tests { solana_logger::setup(); let mut shreds_received = ShredsReceived::default(); let mut packet = Packet::default(); - let mut index_overrun = 0; + let mut stats = ShredFetchStats::default(); let last_root = 0; let last_slot = 100; let slots_per_epoch = 10; @@ -233,13 +271,13 @@ mod tests { ShredFetchStage::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, last_root, last_slot, slots_per_epoch, &|_p| {}, ); - assert_eq!(index_overrun, 1); + assert_eq!(stats.index_overrun, 1); assert!(packet.meta.discard); let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0); shred.copy_to_packet(&mut packet); @@ -248,7 +286,7 @@ mod tests { ShredFetchStage::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, 3, last_slot, slots_per_epoch, @@ -260,7 +298,7 @@ mod tests { ShredFetchStage::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, last_root, last_slot, slots_per_epoch, @@ -272,7 +310,7 @@ mod tests { ShredFetchStage::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, last_root, last_slot, slots_per_epoch, @@ -287,7 +325,7 @@ mod tests { ShredFetchStage::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, last_root, last_slot, slots_per_epoch, @@ -301,7 +339,7 @@ mod tests { ShredFetchStage::process_packet( &mut packet, &mut shreds_received, - &mut index_overrun, + &mut stats, last_root, last_slot, slots_per_epoch, @@ -315,10 +353,10 @@ mod tests { let shred = Shred::new_from_data(1, 3, 0, None, true, true, 0, 0, 0); let mut packet = Packet::default(); shred.copy_to_packet(&mut packet); - let mut index_overrun = 0; + let mut stats = ShredFetchStats::default(); assert_eq!( Some((1, 3)), - ShredFetchStage::get_slot_index(&packet, &mut index_overrun) + ShredFetchStage::get_slot_index(&packet, &mut stats) ); } }