moves shred stats to a separate file (#24484)
This commit is contained in:
parent
4f8b720262
commit
3bbfaae7b6
|
@ -304,7 +304,7 @@ impl StandardBroadcastRun {
|
|||
process_stats.receive_elapsed = duration_as_us(&receive_elapsed);
|
||||
process_stats.coding_send_elapsed = coding_send_time.as_us();
|
||||
|
||||
self.process_shreds_stats.update(&process_stats);
|
||||
self.process_shreds_stats += process_stats;
|
||||
|
||||
if last_tick_height == bank.max_tick_height() {
|
||||
self.report_and_reset_stats(false);
|
||||
|
@ -391,59 +391,23 @@ impl StandardBroadcastRun {
|
|||
}
|
||||
|
||||
fn report_and_reset_stats(&mut self, was_interrupted: bool) {
|
||||
let stats = &self.process_shreds_stats;
|
||||
let unfinished_slot = self.unfinished_slot.as_ref().unwrap();
|
||||
if was_interrupted {
|
||||
datapoint_info!(
|
||||
self.process_shreds_stats.submit(
|
||||
"broadcast-process-shreds-interrupted-stats",
|
||||
("slot", unfinished_slot.slot as i64, i64),
|
||||
("shredding_time", stats.shredding_elapsed, i64),
|
||||
("receive_time", stats.receive_elapsed, i64),
|
||||
(
|
||||
"num_data_shreds",
|
||||
unfinished_slot.next_shred_index as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_leader_schedule_time",
|
||||
stats.get_leader_schedule_elapsed,
|
||||
i64
|
||||
),
|
||||
("serialize_shreds_time", stats.serialize_elapsed, i64),
|
||||
("gen_data_time", stats.gen_data_elapsed, i64),
|
||||
("gen_coding_time", stats.gen_coding_elapsed, i64),
|
||||
("sign_coding_time", stats.sign_coding_elapsed, i64),
|
||||
("coding_send_time", stats.coding_send_elapsed, i64),
|
||||
unfinished_slot.slot,
|
||||
unfinished_slot.next_shred_index, // num_data_shreds,
|
||||
None, // slot_broadcast_time
|
||||
);
|
||||
} else {
|
||||
datapoint_info!(
|
||||
let slot_broadcast_time = self.slot_broadcast_start.unwrap().elapsed();
|
||||
self.process_shreds_stats.submit(
|
||||
"broadcast-process-shreds-stats",
|
||||
("slot", unfinished_slot.slot as i64, i64),
|
||||
("shredding_time", stats.shredding_elapsed, i64),
|
||||
("receive_time", stats.receive_elapsed, i64),
|
||||
(
|
||||
"num_data_shreds",
|
||||
unfinished_slot.next_shred_index as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"slot_broadcast_time",
|
||||
self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"get_leader_schedule_time",
|
||||
stats.get_leader_schedule_elapsed,
|
||||
i64
|
||||
),
|
||||
("serialize_shreds_time", stats.serialize_elapsed, i64),
|
||||
("gen_data_time", stats.gen_data_elapsed, i64),
|
||||
("gen_coding_time", stats.gen_coding_elapsed, i64),
|
||||
("sign_coding_time", stats.sign_coding_elapsed, i64),
|
||||
("coding_send_time", stats.coding_send_elapsed, i64),
|
||||
unfinished_slot.slot,
|
||||
unfinished_slot.next_shred_index, // num_data_shreds,
|
||||
Some(slot_broadcast_time),
|
||||
);
|
||||
}
|
||||
self.process_shreds_stats.reset();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ use {
|
|||
net::UdpSocket,
|
||||
sync::{atomic::AtomicBool, Arc, RwLock},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Instant,
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
};
|
||||
|
||||
|
@ -72,6 +72,7 @@ impl ShredFetchStage {
|
|||
) where
|
||||
F: Fn(&mut Packet),
|
||||
{
|
||||
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
|
||||
let mut shreds_received = LruCache::new(DEFAULT_LRU_SIZE);
|
||||
let mut last_updated = Instant::now();
|
||||
|
||||
|
@ -80,7 +81,6 @@ impl ShredFetchStage {
|
|||
let mut last_slot = std::u64::MAX;
|
||||
let mut slots_per_epoch = 0;
|
||||
|
||||
let mut last_stats = Instant::now();
|
||||
let mut stats = ShredFetchStats::default();
|
||||
let mut packet_hasher = PacketHasher::default();
|
||||
|
||||
|
@ -111,20 +111,7 @@ impl ShredFetchStage {
|
|||
&packet_hasher,
|
||||
);
|
||||
});
|
||||
if last_stats.elapsed().as_millis() > 1000 {
|
||||
datapoint_info!(
|
||||
name,
|
||||
("index_overrun", stats.index_overrun, i64),
|
||||
("shred_count", stats.shred_count, i64),
|
||||
("slot_bad_deserialize", stats.slot_bad_deserialize, i64),
|
||||
("index_bad_deserialize", stats.index_bad_deserialize, i64),
|
||||
("index_out_of_bounds", stats.index_out_of_bounds, i64),
|
||||
("slot_out_of_range", stats.slot_out_of_range, i64),
|
||||
("duplicate_shred", stats.duplicate_shred, i64),
|
||||
);
|
||||
stats = ShredFetchStats::default();
|
||||
last_stats = Instant::now();
|
||||
}
|
||||
stats.maybe_submit(name, STATS_SUBMIT_CADENCE);
|
||||
if sendr.send(vec![packet_batch]).is_err() {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ pub mod leader_schedule_utils;
|
|||
pub mod next_slots_iterator;
|
||||
pub mod rooted_slot_iterator;
|
||||
pub mod shred;
|
||||
pub mod shred_stats;
|
||||
pub mod sigverify_shreds;
|
||||
pub mod slot_stats;
|
||||
pub mod staking_utils;
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
|
||||
//! payload can fit into one coding shred / packet.
|
||||
|
||||
pub use crate::shred_stats::{ProcessShredsStats, ShredFetchStats};
|
||||
use {
|
||||
crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session},
|
||||
bincode::config::Options,
|
||||
|
@ -71,34 +72,6 @@ use {
|
|||
thiserror::Error,
|
||||
};
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ProcessShredsStats {
|
||||
// Per-slot elapsed time
|
||||
pub shredding_elapsed: u64,
|
||||
pub receive_elapsed: u64,
|
||||
pub serialize_elapsed: u64,
|
||||
pub gen_data_elapsed: u64,
|
||||
pub gen_coding_elapsed: u64,
|
||||
pub sign_coding_elapsed: u64,
|
||||
pub coding_send_elapsed: u64,
|
||||
pub get_leader_schedule_elapsed: u64,
|
||||
}
|
||||
impl ProcessShredsStats {
|
||||
pub fn update(&mut self, new_stats: &ProcessShredsStats) {
|
||||
self.shredding_elapsed += new_stats.shredding_elapsed;
|
||||
self.receive_elapsed += new_stats.receive_elapsed;
|
||||
self.serialize_elapsed += new_stats.serialize_elapsed;
|
||||
self.gen_data_elapsed += new_stats.gen_data_elapsed;
|
||||
self.gen_coding_elapsed += new_stats.gen_coding_elapsed;
|
||||
self.sign_coding_elapsed += new_stats.sign_coding_elapsed;
|
||||
self.coding_send_elapsed += new_stats.gen_coding_elapsed;
|
||||
self.get_leader_schedule_elapsed += new_stats.get_leader_schedule_elapsed;
|
||||
}
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
||||
|
||||
pub type Nonce = u32;
|
||||
|
||||
/// The following constants are computed by hand, and hardcoded.
|
||||
|
@ -1098,18 +1071,6 @@ impl Shredder {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Eq, PartialEq)]
|
||||
pub struct ShredFetchStats {
|
||||
pub index_overrun: usize,
|
||||
pub shred_count: usize,
|
||||
pub index_bad_deserialize: usize,
|
||||
pub index_out_of_bounds: usize,
|
||||
pub slot_bad_deserialize: usize,
|
||||
pub duplicate_shred: usize,
|
||||
pub slot_out_of_range: usize,
|
||||
pub bad_shred_type: usize,
|
||||
}
|
||||
|
||||
// Get slot, index, and type from a packet with partial deserialize
|
||||
pub fn get_shred_slot_index_type(
|
||||
p: &Packet,
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
use {
|
||||
solana_sdk::clock::Slot,
|
||||
std::{
|
||||
ops::AddAssign,
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Default, Clone, Copy)]
|
||||
pub struct ProcessShredsStats {
|
||||
// Per-slot elapsed time
|
||||
pub shredding_elapsed: u64,
|
||||
pub receive_elapsed: u64,
|
||||
pub serialize_elapsed: u64,
|
||||
pub gen_data_elapsed: u64,
|
||||
pub gen_coding_elapsed: u64,
|
||||
pub sign_coding_elapsed: u64,
|
||||
pub coding_send_elapsed: u64,
|
||||
pub get_leader_schedule_elapsed: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Eq, PartialEq)]
|
||||
pub struct ShredFetchStats {
|
||||
pub index_overrun: usize,
|
||||
pub shred_count: usize,
|
||||
pub(crate) index_bad_deserialize: usize,
|
||||
pub(crate) index_out_of_bounds: usize,
|
||||
pub(crate) slot_bad_deserialize: usize,
|
||||
pub duplicate_shred: usize,
|
||||
pub slot_out_of_range: usize,
|
||||
pub(crate) bad_shred_type: usize,
|
||||
since: Option<Instant>,
|
||||
}
|
||||
|
||||
impl ProcessShredsStats {
|
||||
pub fn submit(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
slot: Slot,
|
||||
num_data_shreds: u32,
|
||||
slot_broadcast_time: Option<Duration>,
|
||||
) {
|
||||
let slot_broadcast_time = slot_broadcast_time
|
||||
.map(|t| t.as_micros() as i64)
|
||||
.unwrap_or(-1);
|
||||
datapoint_info!(
|
||||
name,
|
||||
("slot", slot, i64),
|
||||
("shredding_time", self.shredding_elapsed, i64),
|
||||
("receive_time", self.receive_elapsed, i64),
|
||||
("num_data_shreds", num_data_shreds, i64),
|
||||
("slot_broadcast_time", slot_broadcast_time, i64),
|
||||
(
|
||||
"get_leader_schedule_time",
|
||||
self.get_leader_schedule_elapsed,
|
||||
i64
|
||||
),
|
||||
("serialize_shreds_time", self.serialize_elapsed, i64),
|
||||
("gen_data_time", self.gen_data_elapsed, i64),
|
||||
("gen_coding_time", self.gen_coding_elapsed, i64),
|
||||
("sign_coding_time", self.sign_coding_elapsed, i64),
|
||||
("coding_send_time", self.coding_send_elapsed, i64),
|
||||
);
|
||||
*self = Self::default();
|
||||
}
|
||||
}
|
||||
|
||||
impl ShredFetchStats {
|
||||
pub fn maybe_submit(&mut self, name: &'static str, cadence: Duration) {
|
||||
let elapsed = self.since.as_ref().map(Instant::elapsed);
|
||||
if elapsed.unwrap_or(Duration::MAX) < cadence {
|
||||
return;
|
||||
}
|
||||
datapoint_info!(
|
||||
name,
|
||||
("index_overrun", self.index_overrun, i64),
|
||||
("shred_count", self.shred_count, i64),
|
||||
("slot_bad_deserialize", self.slot_bad_deserialize, i64),
|
||||
("index_bad_deserialize", self.index_bad_deserialize, i64),
|
||||
("index_out_of_bounds", self.index_out_of_bounds, i64),
|
||||
("slot_out_of_range", self.slot_out_of_range, i64),
|
||||
("duplicate_shred", self.duplicate_shred, i64),
|
||||
);
|
||||
*self = Self {
|
||||
since: Some(Instant::now()),
|
||||
..Self::default()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
|
||||
fn add_assign(&mut self, rhs: Self) {
|
||||
let Self {
|
||||
shredding_elapsed,
|
||||
receive_elapsed,
|
||||
serialize_elapsed,
|
||||
gen_data_elapsed,
|
||||
gen_coding_elapsed,
|
||||
sign_coding_elapsed,
|
||||
coding_send_elapsed,
|
||||
get_leader_schedule_elapsed,
|
||||
} = rhs;
|
||||
self.shredding_elapsed += shredding_elapsed;
|
||||
self.receive_elapsed += receive_elapsed;
|
||||
self.serialize_elapsed += serialize_elapsed;
|
||||
self.gen_data_elapsed += gen_data_elapsed;
|
||||
self.gen_coding_elapsed += gen_coding_elapsed;
|
||||
self.sign_coding_elapsed += sign_coding_elapsed;
|
||||
self.coding_send_elapsed += coding_send_elapsed;
|
||||
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue