diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 017c28a13..c62b1e1dd 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -304,7 +304,7 @@ impl StandardBroadcastRun { process_stats.receive_elapsed = duration_as_us(&receive_elapsed); process_stats.coding_send_elapsed = coding_send_time.as_us(); - self.process_shreds_stats.update(&process_stats); + self.process_shreds_stats += process_stats; if last_tick_height == bank.max_tick_height() { self.report_and_reset_stats(false); @@ -391,59 +391,23 @@ impl StandardBroadcastRun { } fn report_and_reset_stats(&mut self, was_interrupted: bool) { - let stats = &self.process_shreds_stats; let unfinished_slot = self.unfinished_slot.as_ref().unwrap(); if was_interrupted { - datapoint_info!( + self.process_shreds_stats.submit( "broadcast-process-shreds-interrupted-stats", - ("slot", unfinished_slot.slot as i64, i64), - ("shredding_time", stats.shredding_elapsed, i64), - ("receive_time", stats.receive_elapsed, i64), - ( - "num_data_shreds", - unfinished_slot.next_shred_index as i64, - i64 - ), - ( - "get_leader_schedule_time", - stats.get_leader_schedule_elapsed, - i64 - ), - ("serialize_shreds_time", stats.serialize_elapsed, i64), - ("gen_data_time", stats.gen_data_elapsed, i64), - ("gen_coding_time", stats.gen_coding_elapsed, i64), - ("sign_coding_time", stats.sign_coding_elapsed, i64), - ("coding_send_time", stats.coding_send_elapsed, i64), + unfinished_slot.slot, + unfinished_slot.next_shred_index, // num_data_shreds, + None, // slot_broadcast_time ); } else { - datapoint_info!( + let slot_broadcast_time = self.slot_broadcast_start.unwrap().elapsed(); + self.process_shreds_stats.submit( "broadcast-process-shreds-stats", - ("slot", unfinished_slot.slot as i64, i64), - ("shredding_time", stats.shredding_elapsed, i64), - ("receive_time", stats.receive_elapsed, i64), - ( - "num_data_shreds", - unfinished_slot.next_shred_index as i64, - i64 - ), - ( - "slot_broadcast_time", - self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64, - i64 - ), - ( - "get_leader_schedule_time", - stats.get_leader_schedule_elapsed, - i64 - ), - ("serialize_shreds_time", stats.serialize_elapsed, i64), - ("gen_data_time", stats.gen_data_elapsed, i64), - ("gen_coding_time", stats.gen_coding_elapsed, i64), - ("sign_coding_time", stats.sign_coding_elapsed, i64), - ("coding_send_time", stats.coding_send_elapsed, i64), + unfinished_slot.slot, + unfinished_slot.next_shred_index, // num_data_shreds, + Some(slot_broadcast_time), ); } - self.process_shreds_stats.reset(); } } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index b3e4c2d6b..3ec3a3d77 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -17,7 +17,7 @@ use { net::UdpSocket, sync::{atomic::AtomicBool, Arc, RwLock}, thread::{self, Builder, JoinHandle}, - time::Instant, + time::{Duration, Instant}, }, }; @@ -72,6 +72,7 @@ impl ShredFetchStage { ) where F: Fn(&mut Packet), { + const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1); let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE); let mut last_updated = Instant::now(); @@ -80,7 +81,6 @@ impl ShredFetchStage { let mut last_slot = std::u64::MAX; let mut slots_per_epoch = 0; - let mut last_stats = Instant::now(); let mut stats = ShredFetchStats::default(); let mut packet_hasher = PacketHasher::default(); @@ -111,20 +111,7 @@ impl ShredFetchStage { &packet_hasher, ); }); - if last_stats.elapsed().as_millis() > 1000 { - datapoint_info!( - name, - ("index_overrun", stats.index_overrun, i64), - ("shred_count", stats.shred_count, i64), - ("slot_bad_deserialize", stats.slot_bad_deserialize, i64), - ("index_bad_deserialize", stats.index_bad_deserialize, i64), - ("index_out_of_bounds", stats.index_out_of_bounds, i64), - ("slot_out_of_range", stats.slot_out_of_range, i64), - ("duplicate_shred", stats.duplicate_shred, i64), - ); - stats = ShredFetchStats::default(); - last_stats = Instant::now(); - } + stats.maybe_submit(name, STATS_SUBMIT_CADENCE); if sendr.send(vec![packet_batch]).is_err() { break; } diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 1497d2243..4f2a110c0 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -23,6 +23,7 @@ pub mod leader_schedule_utils; pub mod next_slots_iterator; pub mod rooted_slot_iterator; pub mod shred; +pub mod shred_stats; pub mod sigverify_shreds; pub mod slot_stats; pub mod staking_utils; diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index a3c849974..981d8161a 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -49,6 +49,7 @@ //! So, given a) - c), we must restrict data shred's payload length such that the entire coding //! payload can fit into one coding shred / packet. +pub use crate::shred_stats::{ProcessShredsStats, ShredFetchStats}; use { crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session}, bincode::config::Options, @@ -71,34 +72,6 @@ use { thiserror::Error, }; -#[derive(Default, Clone)] -pub struct ProcessShredsStats { - // Per-slot elapsed time - pub shredding_elapsed: u64, - pub receive_elapsed: u64, - pub serialize_elapsed: u64, - pub gen_data_elapsed: u64, - pub gen_coding_elapsed: u64, - pub sign_coding_elapsed: u64, - pub coding_send_elapsed: u64, - pub get_leader_schedule_elapsed: u64, -} -impl ProcessShredsStats { - pub fn update(&mut self, new_stats: &ProcessShredsStats) { - self.shredding_elapsed += new_stats.shredding_elapsed; - self.receive_elapsed += new_stats.receive_elapsed; - self.serialize_elapsed += new_stats.serialize_elapsed; - self.gen_data_elapsed += new_stats.gen_data_elapsed; - self.gen_coding_elapsed += new_stats.gen_coding_elapsed; - self.sign_coding_elapsed += new_stats.sign_coding_elapsed; - self.coding_send_elapsed += new_stats.gen_coding_elapsed; - self.get_leader_schedule_elapsed += new_stats.get_leader_schedule_elapsed; - } - pub fn reset(&mut self) { - *self = Self::default(); - } -} - pub type Nonce = u32; /// The following constants are computed by hand, and hardcoded. @@ -1098,18 +1071,6 @@ impl Shredder { } } -#[derive(Default, Debug, Eq, PartialEq)] -pub struct ShredFetchStats { - pub index_overrun: usize, - pub shred_count: usize, - pub index_bad_deserialize: usize, - pub index_out_of_bounds: usize, - pub slot_bad_deserialize: usize, - pub duplicate_shred: usize, - pub slot_out_of_range: usize, - pub bad_shred_type: usize, -} - // Get slot, index, and type from a packet with partial deserialize pub fn get_shred_slot_index_type( p: &Packet, diff --git a/ledger/src/shred_stats.rs b/ledger/src/shred_stats.rs new file mode 100644 index 000000000..dd5b962cc --- /dev/null +++ b/ledger/src/shred_stats.rs @@ -0,0 +1,112 @@ +use { + solana_sdk::clock::Slot, + std::{ + ops::AddAssign, + time::{Duration, Instant}, + }, +}; + +#[derive(Default, Clone, Copy)] +pub struct ProcessShredsStats { + // Per-slot elapsed time + pub shredding_elapsed: u64, + pub receive_elapsed: u64, + pub serialize_elapsed: u64, + pub gen_data_elapsed: u64, + pub gen_coding_elapsed: u64, + pub sign_coding_elapsed: u64, + pub coding_send_elapsed: u64, + pub get_leader_schedule_elapsed: u64, +} + +#[derive(Default, Debug, Eq, PartialEq)] +pub struct ShredFetchStats { + pub index_overrun: usize, + pub shred_count: usize, + pub(crate) index_bad_deserialize: usize, + pub(crate) index_out_of_bounds: usize, + pub(crate) slot_bad_deserialize: usize, + pub duplicate_shred: usize, + pub slot_out_of_range: usize, + pub(crate) bad_shred_type: usize, + since: Option, +} + +impl ProcessShredsStats { + pub fn submit( + &mut self, + name: &'static str, + slot: Slot, + num_data_shreds: u32, + slot_broadcast_time: Option, + ) { + let slot_broadcast_time = slot_broadcast_time + .map(|t| t.as_micros() as i64) + .unwrap_or(-1); + datapoint_info!( + name, + ("slot", slot, i64), + ("shredding_time", self.shredding_elapsed, i64), + ("receive_time", self.receive_elapsed, i64), + ("num_data_shreds", num_data_shreds, i64), + ("slot_broadcast_time", slot_broadcast_time, i64), + ( + "get_leader_schedule_time", + self.get_leader_schedule_elapsed, + i64 + ), + ("serialize_shreds_time", self.serialize_elapsed, i64), + ("gen_data_time", self.gen_data_elapsed, i64), + ("gen_coding_time", self.gen_coding_elapsed, i64), + ("sign_coding_time", self.sign_coding_elapsed, i64), + ("coding_send_time", self.coding_send_elapsed, i64), + ); + *self = Self::default(); + } +} + +impl ShredFetchStats { + pub fn maybe_submit(&mut self, name: &'static str, cadence: Duration) { + let elapsed = self.since.as_ref().map(Instant::elapsed); + if elapsed.unwrap_or(Duration::MAX) < cadence { + return; + } + datapoint_info!( + name, + ("index_overrun", self.index_overrun, i64), + ("shred_count", self.shred_count, i64), + ("slot_bad_deserialize", self.slot_bad_deserialize, i64), + ("index_bad_deserialize", self.index_bad_deserialize, i64), + ("index_out_of_bounds", self.index_out_of_bounds, i64), + ("slot_out_of_range", self.slot_out_of_range, i64), + ("duplicate_shred", self.duplicate_shred, i64), + ); + *self = Self { + since: Some(Instant::now()), + ..Self::default() + }; + } +} + +impl AddAssign for ProcessShredsStats { + fn add_assign(&mut self, rhs: Self) { + let Self { + shredding_elapsed, + receive_elapsed, + serialize_elapsed, + gen_data_elapsed, + gen_coding_elapsed, + sign_coding_elapsed, + coding_send_elapsed, + get_leader_schedule_elapsed, + } = rhs; + self.shredding_elapsed += shredding_elapsed; + self.receive_elapsed += receive_elapsed; + self.serialize_elapsed += serialize_elapsed; + self.gen_data_elapsed += gen_data_elapsed; + self.gen_coding_elapsed += gen_coding_elapsed; + self.sign_coding_elapsed += sign_coding_elapsed; + self.coding_send_elapsed += coding_send_elapsed; + self.get_leader_schedule_elapsed += get_leader_schedule_elapsed; + } +}