From ee6bb0d5d38218efdc9a1ee0442088b044e49def Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Mon, 4 Apr 2022 14:44:21 -0700 Subject: [PATCH] track fec set turbine stats (#23989) --- core/src/replay_stage.rs | 7 ++ ledger/src/blockstore.rs | 24 +++--- ledger/src/lib.rs | 2 +- ledger/src/slot_stats.rs | 169 ++++++++++++++++++++++++++++----------- 4 files changed, 142 insertions(+), 60 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 5a0b86c50..1796ae2b6 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1682,6 +1682,9 @@ impl ReplayStage { blockstore .set_dead_slot(slot) .expect("Failed to mark slot as dead in blockstore"); + + blockstore.slots_stats.mark_dead(slot); + rpc_subscriptions.notify_slot_update(SlotUpdate::Dead { slot, err: format!("error: {:?}", err), @@ -1788,6 +1791,9 @@ impl ReplayStage { epoch_slots_frozen_slots, drop_bank_sender, ); + + blockstore.slots_stats.mark_rooted(new_root); + rpc_subscriptions.notify_roots(rooted_slots); if let Some(sender) = bank_notification_sender { sender @@ -2931,6 +2937,7 @@ impl ReplayStage { accounts_background_request_sender, highest_confirmed_root, ); + drop_bank_sender .send(removed_banks) .unwrap_or_else(|err| warn!("bank drop failed: {:?}", err)); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 19a521e66..779de10cc 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -179,7 +179,7 @@ pub struct Blockstore { pub shred_timing_point_sender: Option, pub lowest_cleanup_slot: RwLock, no_compaction: bool, - slots_stats: Mutex, + pub slots_stats: SlotsStats, } pub struct IndexMetaWorkingSetEntry { @@ -451,7 +451,7 @@ impl Blockstore { last_root, lowest_cleanup_slot: RwLock::::default(), no_compaction: false, - slots_stats: Mutex::::default(), + slots_stats: SlotsStats::default(), }; if initialize_transaction_status_index { blockstore.initialize_transaction_status_index()?; @@ -1217,10 +1217,10 @@ impl Blockstore { return false; } + self.slots_stats - .lock() - .unwrap() - .add_shred(slot, shred_source); + .record_shred(shred.slot(), shred.fec_set_index(), shred_source, None); + // insert coding shred into rocks let result = self .insert_coding_shred(index_meta, &shred, write_batch) @@ -1652,13 +1652,13 @@ impl Blockstore { end_index, }) .collect(); - { - let mut slots_stats = self.slots_stats.lock().unwrap(); - slots_stats.add_shred(slot_meta.slot, shred_source); - if slot_meta.is_full() { - slots_stats.set_full(slot_meta); - } - } + + self.slots_stats.record_shred( + shred.slot(), + shred.fec_set_index(), + shred_source, + Some(slot_meta), + ); // slot is full, send slot full timing to poh_timing_report service. if slot_meta.is_full() { diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 8931c4678..1497d2243 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -8,7 +8,6 @@ pub mod bigtable_delete; pub mod bigtable_upload; pub mod bigtable_upload_service; pub mod block_error; -mod slot_stats; #[macro_use] pub mod blockstore; pub mod ancestor_iterator; @@ -25,6 +24,7 @@ pub mod next_slots_iterator; pub mod rooted_slot_iterator; pub mod shred; pub mod sigverify_shreds; +pub mod slot_stats; pub mod staking_utils; #[macro_use] diff --git a/ledger/src/slot_stats.rs b/ledger/src/slot_stats.rs index 13e90fe19..0f3a72def 100644 --- a/ledger/src/slot_stats.rs +++ b/ledger/src/slot_stats.rs @@ -1,21 +1,16 @@ use { - crate::blockstore_meta::SlotMeta, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot, + crate::blockstore_meta::SlotMeta, + bitflags::bitflags, + lru::LruCache, + solana_sdk::clock::Slot, + std::{ + collections::HashMap, + sync::{Mutex, MutexGuard}, + }, }; const SLOTS_STATS_CACHE_CAPACITY: usize = 300; -macro_rules! get_mut_entry ( - ($cache:expr, $key:expr) => ( - match $cache.get_mut(&$key) { - Some(entry) => entry, - None => { - $cache.put($key, SlotStats::default()); - $cache.get_mut(&$key).unwrap() - } - } - ); -); - #[derive(Copy, Clone, Debug)] pub(crate) enum ShredSource { Turbine, @@ -32,59 +27,139 @@ bitflags! { } } -#[derive(Default)] -struct SlotStats { - flags: SlotFlags, +#[derive(Clone, Default)] +pub struct SlotStats { + turbine_fec_set_index_counts: HashMap, num_repaired: usize, num_recovered: usize, + last_index: u64, + flags: SlotFlags, } -pub(crate) struct SlotsStats(LruCache); +impl SlotStats { + pub fn get_min_index_count(&self) -> usize { + self.turbine_fec_set_index_counts + .iter() + .map(|(_, cnt)| *cnt) + .min() + .unwrap_or(0) + } + + fn report(&self, slot: Slot) { + let min_fec_set_count = self.get_min_index_count(); + datapoint_info!( + "slot_stats_tracking_complete", + ("slot", slot, i64), + ("last_index", self.last_index, i64), + ("num_repaired", self.num_repaired, i64), + ("num_recovered", self.num_recovered, i64), + ("min_turbine_fec_set_count", min_fec_set_count, i64), + ("is_full", self.flags.contains(SlotFlags::FULL), bool), + ("is_rooted", self.flags.contains(SlotFlags::ROOTED), bool), + ("is_dead", self.flags.contains(SlotFlags::DEAD), bool), + ); + } +} + +pub struct SlotsStats { + pub stats: Mutex>, +} impl Default for SlotsStats { fn default() -> Self { - // LruCache::unbounded because capacity is enforced manually. - Self(LruCache::unbounded()) + Self { + stats: Mutex::new(LruCache::new(SLOTS_STATS_CACHE_CAPACITY)), + } } } impl SlotsStats { - pub(crate) fn add_shred(&mut self, slot: Slot, source: ShredSource) { - let entry = get_mut_entry!(self.0, slot); - match source { - ShredSource::Turbine => (), - ShredSource::Repaired => entry.num_repaired += 1, - ShredSource::Recovered => entry.num_recovered += 1, - } - self.maybe_evict_cache(); + fn get_or_default_with_eviction_check<'a>( + stats: &'a mut MutexGuard>, + slot: Slot, + ) -> (&'a mut SlotStats, Option<(Slot, SlotStats)>) { + let evicted = if stats.len() == stats.cap() { + match stats.peek_lru() { + Some((s, _)) if *s == slot => None, + _ => stats.pop_lru(), + } + } else { + None + }; + stats.get_or_insert(slot, SlotStats::default); + (stats.get_mut(&slot).unwrap(), evicted) } - pub(crate) fn set_full(&mut self, slot_meta: &SlotMeta) { - let total_time_ms = - solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp); - let last_index = slot_meta - .last_index - .and_then(|ix| i64::try_from(ix).ok()) - .unwrap_or(-1); - let entry = get_mut_entry!(self.0, slot_meta.slot); - if !entry.flags.contains(SlotFlags::FULL) { + pub(crate) fn record_shred( + &self, + slot: Slot, + fec_set_index: u32, + source: ShredSource, + slot_meta: Option<&SlotMeta>, + ) { + let mut slot_full_reporting_info = None; + let mut stats = self.stats.lock().unwrap(); + let (mut slot_stats, evicted) = Self::get_or_default_with_eviction_check(&mut stats, slot); + match source { + ShredSource::Recovered => slot_stats.num_recovered += 1, + ShredSource::Repaired => slot_stats.num_repaired += 1, + ShredSource::Turbine => { + *slot_stats + .turbine_fec_set_index_counts + .entry(fec_set_index) + .or_default() += 1 + } + } + if let Some(meta) = slot_meta { + if meta.is_full() { + slot_stats.last_index = meta.last_index.unwrap_or_default(); + if !slot_stats.flags.contains(SlotFlags::FULL) { + slot_stats.flags |= SlotFlags::FULL; + slot_full_reporting_info = + Some((slot_stats.num_repaired, slot_stats.num_recovered)); + } + } + } + drop(stats); + if let Some((num_repaired, num_recovered)) = slot_full_reporting_info { + let slot_meta = slot_meta.unwrap(); + let total_time_ms = + solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp); + let last_index = slot_meta + .last_index + .and_then(|ix| i64::try_from(ix).ok()) + .unwrap_or(-1); datapoint_info!( "shred_insert_is_full", ("total_time_ms", total_time_ms, i64), - ("slot", slot_meta.slot, i64), + ("slot", slot, i64), ("last_index", last_index, i64), - ("num_repaired", entry.num_repaired, i64), - ("num_recovered", entry.num_recovered, i64), + ("num_repaired", num_repaired, i64), + ("num_recovered", num_recovered, i64), ); } - entry.flags |= SlotFlags::FULL; - self.maybe_evict_cache(); - } - - fn maybe_evict_cache(&mut self) { - while self.0.len() > SLOTS_STATS_CACHE_CAPACITY { - let (_slot, _entry) = self.0.pop_lru().unwrap(); - // TODO: submit metrics for (slot, entry). + if let Some((evicted_slot, evicted_stats)) = evicted { + evicted_stats.report(evicted_slot); } } + + fn add_flag(&self, slot: Slot, flag: SlotFlags) { + let evicted = { + let mut stats = self.stats.lock().unwrap(); + let (slot_stats, evicted) = Self::get_or_default_with_eviction_check(&mut stats, slot); + slot_stats.flags |= flag; + evicted + }; + if let Some((evicted_slot, evicted_stats)) = evicted { + evicted_stats.report(evicted_slot); + } + } + + pub fn mark_dead(&self, slot: Slot) { + self.add_flag(slot, SlotFlags::DEAD); + } + + pub fn mark_rooted(&self, slot: Slot) { + self.add_flag(slot, SlotFlags::ROOTED); + } }