From 1f9c89c1e8506346f7583a20c9203ab7360f1fd4 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Fri, 25 Mar 2022 19:32:22 +0000 Subject: [PATCH] expands lifetime of SlotStats (#23872) Current slot stats are removed when the slot is full or every 30 seconds if the slot is before root: https://github.com/solana-labs/solana/blob/493a8e234/ledger/src/blockstore.rs#L2017-L2027 In order to track if the slot is ultimately marked as dead or rooted and emit more metrics, this commit expands lifetime of SlotStats while bounding total size of cache using an LRU eviction policy. --- Cargo.lock | 2 + ledger/Cargo.toml | 2 + ledger/src/blockstore.rs | 107 ++++++++------------------------------- ledger/src/lib.rs | 1 + ledger/src/slot_stats.rs | 90 ++++++++++++++++++++++++++++++++ 5 files changed, 115 insertions(+), 87 deletions(-) create mode 100644 ledger/src/slot_stats.rs diff --git a/Cargo.lock b/Cargo.lock index 5cfd7adf0..76fc3ee1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5075,6 +5075,7 @@ version = "1.11.0" dependencies = [ "assert_matches", "bincode", + "bitflags", "byteorder", "chrono", "chrono-humanize", @@ -5085,6 +5086,7 @@ dependencies = [ "lazy_static", "libc", "log", + "lru", "matches", "num-derive", "num-traits", diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 51260e47d..8af1604b6 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" +bitflags = "1.3.1" byteorder = "1.4.3" chrono = { version = "0.4.11", features = ["serde"] } chrono-humanize = "0.2.1" @@ -21,6 +22,7 @@ itertools = "0.10.3" lazy_static = "1.4.0" libc = "0.2.120" log = { version = "0.4.14" } +lru = "0.7.3" num-derive = "0.3" num-traits = "0.2" num_cpus = "1.13.1" diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 36fb2706d..d5485ed5c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -15,6 +15,7 @@ use { max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE, }, + slot_stats::{ShredSource, SlotsStats}, }, bincode::deserialize, crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, @@ -49,7 +50,7 @@ use { borrow::Cow, cell::RefCell, cmp, - collections::{hash_map::Entry as HashMapEntry, BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{hash_map::Entry as HashMapEntry, BTreeSet, HashMap, HashSet}, convert::TryInto, fs, io::{Error as IoError, ErrorKind}, @@ -59,7 +60,6 @@ use { atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, RwLockWriteGuard, }, - time::Instant, }, tempfile::{Builder, TempDir}, thiserror::Error, @@ -178,26 +178,6 @@ pub struct Blockstore { slots_stats: Mutex, } -struct SlotsStats { - last_cleanup_ts: Instant, - stats: BTreeMap, -} - -impl Default for SlotsStats { - fn default() -> Self { - SlotsStats { - last_cleanup_ts: Instant::now(), - stats: BTreeMap::new(), - } - } -} - -#[derive(Default)] -struct SlotStats { - num_repaired: usize, - num_recovered: usize, -} - pub struct IndexMetaWorkingSetEntry { index: Index, // true only if at least one shred for this Index was inserted since the time this @@ -220,13 +200,6 @@ pub struct SlotMetaWorkingSetEntry { did_insert_occur: bool, } -#[derive(PartialEq, Debug, Clone)] -enum ShredSource { - Turbine, - Repaired, - Recovered, -} - #[derive(Default)] pub struct BlockstoreInsertionMetrics { pub num_shreds: usize, @@ -921,13 +894,13 @@ impl Blockstore { let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { + let shred_source = if is_repaired { + ShredSource::Repaired + } else { + ShredSource::Turbine + }; match shred.shred_type() { ShredType::Data => { - let shred_source = if is_repaired { - ShredSource::Repaired - } else { - ShredSource::Turbine - }; match self.check_insert_data_shred( shred, &mut erasure_metas, @@ -966,7 +939,7 @@ impl Blockstore { &mut index_meta_time, handle_duplicate, is_trusted, - is_repaired, + shred_source, metrics, ); } @@ -1152,7 +1125,7 @@ impl Blockstore { index_meta_time: &mut u64, handle_duplicate: &F, is_trusted: bool, - is_repaired: bool, + shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, ) -> bool where @@ -1219,13 +1192,10 @@ impl Blockstore { return false; } - - if is_repaired { - let mut slots_stats = self.slots_stats.lock().unwrap(); - let mut e = slots_stats.stats.entry(slot).or_default(); - e.num_repaired += 1; - } - + self.slots_stats + .lock() + .unwrap() + .add_shred(slot, shred_source); // insert coding shred into rocks let result = self .insert_coding_shred(index_meta, &shred, write_batch) @@ -1371,7 +1341,7 @@ impl Blockstore { just_inserted_shreds, &self.last_root, leader_schedule, - shred_source.clone(), + shred_source, ) { return Err(InsertDataShredError::InvalidShred); } @@ -1643,49 +1613,12 @@ impl Blockstore { end_index, }) .collect(); - if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { + { let mut slots_stats = self.slots_stats.lock().unwrap(); - let mut e = slots_stats.stats.entry(slot_meta.slot).or_default(); - if shred_source == ShredSource::Repaired { - e.num_repaired += 1; + slots_stats.add_shred(slot_meta.slot, shred_source); + if slot_meta.is_full() { + slots_stats.set_full(slot_meta); } - if shred_source == ShredSource::Recovered { - e.num_recovered += 1; - } - } - if slot_meta.is_full() { - let (num_repaired, num_recovered) = { - let mut slots_stats = self.slots_stats.lock().unwrap(); - if let Some(e) = slots_stats.stats.remove(&slot_meta.slot) { - if slots_stats.last_cleanup_ts.elapsed().as_secs() > 30 { - let root = self.last_root(); - slots_stats.stats = slots_stats.stats.split_off(&root); - slots_stats.last_cleanup_ts = Instant::now(); - } - (e.num_repaired, e.num_recovered) - } else { - (0, 0) - } - }; - datapoint_info!( - "shred_insert_is_full", - ( - "total_time_ms", - solana_sdk::timing::timestamp() - slot_meta.first_shred_timestamp, - i64 - ), - ("slot", slot_meta.slot, i64), - ( - "last_index", - slot_meta - .last_index - .and_then(|ix| i64::try_from(ix).ok()) - .unwrap_or(-1), - i64 - ), - ("num_repaired", num_repaired, i64), - ("num_recovered", num_recovered, i64), - ); } trace!("inserted shred into slot {:?} and index {:?}", slot, index); Ok(newly_completed_data_sets) @@ -6029,7 +5962,7 @@ pub mod tests { panic!("no dupes"); }, false, - false, + ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); @@ -6047,7 +5980,7 @@ pub mod tests { counter.fetch_add(1, Ordering::Relaxed); }, false, - false, + ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); assert_eq!(counter.load(Ordering::Relaxed), 1); diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 75b51a77f..8931c4678 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -8,6 +8,7 @@ pub mod bigtable_delete; pub mod bigtable_upload; pub mod bigtable_upload_service; pub mod block_error; +mod slot_stats; #[macro_use] pub mod blockstore; pub mod ancestor_iterator; diff --git a/ledger/src/slot_stats.rs b/ledger/src/slot_stats.rs new file mode 100644 index 000000000..13e90fe19 --- /dev/null +++ b/ledger/src/slot_stats.rs @@ -0,0 +1,90 @@ +use { + crate::blockstore_meta::SlotMeta, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot, +}; + +const SLOTS_STATS_CACHE_CAPACITY: usize = 300; + +macro_rules! get_mut_entry ( + ($cache:expr, $key:expr) => ( + match $cache.get_mut(&$key) { + Some(entry) => entry, + None => { + $cache.put($key, SlotStats::default()); + $cache.get_mut(&$key).unwrap() + } + } + ); +); + +#[derive(Copy, Clone, Debug)] +pub(crate) enum ShredSource { + Turbine, + Repaired, + Recovered, +} + +bitflags! { + #[derive(Default)] + struct SlotFlags: u8 { + const DEAD = 0b00000001; + const FULL = 0b00000010; + const ROOTED = 0b00000100; + } +} + +#[derive(Default)] +struct SlotStats { + flags: SlotFlags, + num_repaired: usize, + num_recovered: usize, +} + +pub(crate) struct SlotsStats(LruCache); + +impl Default for SlotsStats { + fn default() -> Self { + // LruCache::unbounded because capacity is enforced manually. + Self(LruCache::unbounded()) + } +} + +impl SlotsStats { + pub(crate) fn add_shred(&mut self, slot: Slot, source: ShredSource) { + let entry = get_mut_entry!(self.0, slot); + match source { + ShredSource::Turbine => (), + ShredSource::Repaired => entry.num_repaired += 1, + ShredSource::Recovered => entry.num_recovered += 1, + } + self.maybe_evict_cache(); + } + + pub(crate) fn set_full(&mut self, slot_meta: &SlotMeta) { + let total_time_ms = + solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp); + let last_index = slot_meta + .last_index + .and_then(|ix| i64::try_from(ix).ok()) + .unwrap_or(-1); + let entry = get_mut_entry!(self.0, slot_meta.slot); + if !entry.flags.contains(SlotFlags::FULL) { + datapoint_info!( + "shred_insert_is_full", + ("total_time_ms", total_time_ms, i64), + ("slot", slot_meta.slot, i64), + ("last_index", last_index, i64), + ("num_repaired", entry.num_repaired, i64), + ("num_recovered", entry.num_recovered, i64), + ); + } + entry.flags |= SlotFlags::FULL; + self.maybe_evict_cache(); + } + + fn maybe_evict_cache(&mut self) { + while self.0.len() > SLOTS_STATS_CACHE_CAPACITY { + let (_slot, _entry) = self.0.pop_lru().unwrap(); + // TODO: submit metrics for (slot, entry). + } + } +}