track fec set turbine stats (#23989)

This commit is contained in:
Jeff Biseda 2022-04-04 14:44:21 -07:00 committed by GitHub
parent 6a7f6585ce
commit ee6bb0d5d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 142 additions and 60 deletions

View File

@ -1682,6 +1682,9 @@ impl ReplayStage {
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
blockstore.slots_stats.mark_dead(slot);
rpc_subscriptions.notify_slot_update(SlotUpdate::Dead {
slot,
err: format!("error: {:?}", err),
@ -1788,6 +1791,9 @@ impl ReplayStage {
epoch_slots_frozen_slots,
drop_bank_sender,
);
blockstore.slots_stats.mark_rooted(new_root);
rpc_subscriptions.notify_roots(rooted_slots);
if let Some(sender) = bank_notification_sender {
sender
@ -2931,6 +2937,7 @@ impl ReplayStage {
accounts_background_request_sender,
highest_confirmed_root,
);
drop_bank_sender
.send(removed_banks)
.unwrap_or_else(|err| warn!("bank drop failed: {:?}", err));

View File

@ -179,7 +179,7 @@ pub struct Blockstore {
pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>,
no_compaction: bool,
slots_stats: Mutex<SlotsStats>,
pub slots_stats: SlotsStats,
}
pub struct IndexMetaWorkingSetEntry {
@ -451,7 +451,7 @@ impl Blockstore {
last_root,
lowest_cleanup_slot: RwLock::<Slot>::default(),
no_compaction: false,
slots_stats: Mutex::<SlotsStats>::default(),
slots_stats: SlotsStats::default(),
};
if initialize_transaction_status_index {
blockstore.initialize_transaction_status_index()?;
@ -1217,10 +1217,10 @@ impl Blockstore {
return false;
}
self.slots_stats
.lock()
.unwrap()
.add_shred(slot, shred_source);
.record_shred(shred.slot(), shred.fec_set_index(), shred_source, None);
// insert coding shred into rocks
let result = self
.insert_coding_shred(index_meta, &shred, write_batch)
@ -1652,13 +1652,13 @@ impl Blockstore {
end_index,
})
.collect();
{
let mut slots_stats = self.slots_stats.lock().unwrap();
slots_stats.add_shred(slot_meta.slot, shred_source);
if slot_meta.is_full() {
slots_stats.set_full(slot_meta);
}
}
self.slots_stats.record_shred(
shred.slot(),
shred.fec_set_index(),
shred_source,
Some(slot_meta),
);
// slot is full, send slot full timing to poh_timing_report service.
if slot_meta.is_full() {

View File

@ -8,7 +8,6 @@ pub mod bigtable_delete;
pub mod bigtable_upload;
pub mod bigtable_upload_service;
pub mod block_error;
mod slot_stats;
#[macro_use]
pub mod blockstore;
pub mod ancestor_iterator;
@ -25,6 +24,7 @@ pub mod next_slots_iterator;
pub mod rooted_slot_iterator;
pub mod shred;
pub mod sigverify_shreds;
pub mod slot_stats;
pub mod staking_utils;
#[macro_use]

View File

@ -1,21 +1,16 @@
use {
crate::blockstore_meta::SlotMeta, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot,
crate::blockstore_meta::SlotMeta,
bitflags::bitflags,
lru::LruCache,
solana_sdk::clock::Slot,
std::{
collections::HashMap,
sync::{Mutex, MutexGuard},
},
};
const SLOTS_STATS_CACHE_CAPACITY: usize = 300;
macro_rules! get_mut_entry (
($cache:expr, $key:expr) => (
match $cache.get_mut(&$key) {
Some(entry) => entry,
None => {
$cache.put($key, SlotStats::default());
$cache.get_mut(&$key).unwrap()
}
}
);
);
#[derive(Copy, Clone, Debug)]
pub(crate) enum ShredSource {
Turbine,
@ -32,59 +27,139 @@ bitflags! {
}
}
#[derive(Default)]
struct SlotStats {
flags: SlotFlags,
#[derive(Clone, Default)]
pub struct SlotStats {
turbine_fec_set_index_counts: HashMap</*fec_set_index*/ u32, /*count*/ usize>,
num_repaired: usize,
num_recovered: usize,
last_index: u64,
flags: SlotFlags,
}
pub(crate) struct SlotsStats(LruCache<Slot, SlotStats>);
impl SlotStats {
pub fn get_min_index_count(&self) -> usize {
self.turbine_fec_set_index_counts
.iter()
.map(|(_, cnt)| *cnt)
.min()
.unwrap_or(0)
}
fn report(&self, slot: Slot) {
let min_fec_set_count = self.get_min_index_count();
datapoint_info!(
"slot_stats_tracking_complete",
("slot", slot, i64),
("last_index", self.last_index, i64),
("num_repaired", self.num_repaired, i64),
("num_recovered", self.num_recovered, i64),
("min_turbine_fec_set_count", min_fec_set_count, i64),
("is_full", self.flags.contains(SlotFlags::FULL), bool),
("is_rooted", self.flags.contains(SlotFlags::ROOTED), bool),
("is_dead", self.flags.contains(SlotFlags::DEAD), bool),
);
}
}
pub struct SlotsStats {
pub stats: Mutex<LruCache<Slot, SlotStats>>,
}
impl Default for SlotsStats {
fn default() -> Self {
// LruCache::unbounded because capacity is enforced manually.
Self(LruCache::unbounded())
Self {
stats: Mutex::new(LruCache::new(SLOTS_STATS_CACHE_CAPACITY)),
}
}
}
impl SlotsStats {
pub(crate) fn add_shred(&mut self, slot: Slot, source: ShredSource) {
let entry = get_mut_entry!(self.0, slot);
match source {
ShredSource::Turbine => (),
ShredSource::Repaired => entry.num_repaired += 1,
ShredSource::Recovered => entry.num_recovered += 1,
}
self.maybe_evict_cache();
fn get_or_default_with_eviction_check<'a>(
stats: &'a mut MutexGuard<LruCache<Slot, SlotStats>>,
slot: Slot,
) -> (&'a mut SlotStats, Option<(Slot, SlotStats)>) {
let evicted = if stats.len() == stats.cap() {
match stats.peek_lru() {
Some((s, _)) if *s == slot => None,
_ => stats.pop_lru(),
}
} else {
None
};
stats.get_or_insert(slot, SlotStats::default);
(stats.get_mut(&slot).unwrap(), evicted)
}
pub(crate) fn set_full(&mut self, slot_meta: &SlotMeta) {
let total_time_ms =
solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp);
let last_index = slot_meta
.last_index
.and_then(|ix| i64::try_from(ix).ok())
.unwrap_or(-1);
let entry = get_mut_entry!(self.0, slot_meta.slot);
if !entry.flags.contains(SlotFlags::FULL) {
pub(crate) fn record_shred(
&self,
slot: Slot,
fec_set_index: u32,
source: ShredSource,
slot_meta: Option<&SlotMeta>,
) {
let mut slot_full_reporting_info = None;
let mut stats = self.stats.lock().unwrap();
let (mut slot_stats, evicted) = Self::get_or_default_with_eviction_check(&mut stats, slot);
match source {
ShredSource::Recovered => slot_stats.num_recovered += 1,
ShredSource::Repaired => slot_stats.num_repaired += 1,
ShredSource::Turbine => {
*slot_stats
.turbine_fec_set_index_counts
.entry(fec_set_index)
.or_default() += 1
}
}
if let Some(meta) = slot_meta {
if meta.is_full() {
slot_stats.last_index = meta.last_index.unwrap_or_default();
if !slot_stats.flags.contains(SlotFlags::FULL) {
slot_stats.flags |= SlotFlags::FULL;
slot_full_reporting_info =
Some((slot_stats.num_repaired, slot_stats.num_recovered));
}
}
}
drop(stats);
if let Some((num_repaired, num_recovered)) = slot_full_reporting_info {
let slot_meta = slot_meta.unwrap();
let total_time_ms =
solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp);
let last_index = slot_meta
.last_index
.and_then(|ix| i64::try_from(ix).ok())
.unwrap_or(-1);
datapoint_info!(
"shred_insert_is_full",
("total_time_ms", total_time_ms, i64),
("slot", slot_meta.slot, i64),
("slot", slot, i64),
("last_index", last_index, i64),
("num_repaired", entry.num_repaired, i64),
("num_recovered", entry.num_recovered, i64),
("num_repaired", num_repaired, i64),
("num_recovered", num_recovered, i64),
);
}
entry.flags |= SlotFlags::FULL;
self.maybe_evict_cache();
}
fn maybe_evict_cache(&mut self) {
while self.0.len() > SLOTS_STATS_CACHE_CAPACITY {
let (_slot, _entry) = self.0.pop_lru().unwrap();
// TODO: submit metrics for (slot, entry).
if let Some((evicted_slot, evicted_stats)) = evicted {
evicted_stats.report(evicted_slot);
}
}
fn add_flag(&self, slot: Slot, flag: SlotFlags) {
let evicted = {
let mut stats = self.stats.lock().unwrap();
let (slot_stats, evicted) = Self::get_or_default_with_eviction_check(&mut stats, slot);
slot_stats.flags |= flag;
evicted
};
if let Some((evicted_slot, evicted_stats)) = evicted {
evicted_stats.report(evicted_slot);
}
}
pub fn mark_dead(&self, slot: Slot) {
self.add_flag(slot, SlotFlags::DEAD);
}
pub fn mark_rooted(&self, slot: Slot) {
self.add_flag(slot, SlotFlags::ROOTED);
}
}