diff --git a/Cargo.lock b/Cargo.lock index 5cfd7adf0..76fc3ee1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5075,6 +5075,7 @@ version = "1.11.0" dependencies = [ "assert_matches", "bincode", + "bitflags", "byteorder", "chrono", "chrono-humanize", @@ -5085,6 +5086,7 @@ dependencies = [ "lazy_static", "libc", "log", + "lru", "matches", "num-derive", "num-traits", diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 51260e47d..8af1604b6 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" +bitflags = "1.3.1" byteorder = "1.4.3" chrono = { version = "0.4.11", features = ["serde"] } chrono-humanize = "0.2.1" @@ -21,6 +22,7 @@ itertools = "0.10.3" lazy_static = "1.4.0" libc = "0.2.120" log = { version = "0.4.14" } +lru = "0.7.3" num-derive = "0.3" num-traits = "0.2" num_cpus = "1.13.1" diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 36fb2706d..d5485ed5c 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -15,6 +15,7 @@ use { max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType, Shredder, SHRED_PAYLOAD_SIZE, }, + slot_stats::{ShredSource, SlotsStats}, }, bincode::deserialize, crossbeam_channel::{bounded, Receiver, Sender, TrySendError}, @@ -49,7 +50,7 @@ use { borrow::Cow, cell::RefCell, cmp, - collections::{hash_map::Entry as HashMapEntry, BTreeMap, BTreeSet, HashMap, HashSet}, + collections::{hash_map::Entry as HashMapEntry, BTreeSet, HashMap, HashSet}, convert::TryInto, fs, io::{Error as IoError, ErrorKind}, @@ -59,7 +60,6 @@ use { atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, RwLockWriteGuard, }, - time::Instant, }, tempfile::{Builder, TempDir}, thiserror::Error, @@ -178,26 +178,6 @@ pub struct Blockstore { slots_stats: Mutex, } -struct SlotsStats { - last_cleanup_ts: Instant, - stats: BTreeMap, -} - -impl Default for SlotsStats { - fn default() -> Self { - SlotsStats { - last_cleanup_ts: Instant::now(), - stats: BTreeMap::new(), - } - } -} - -#[derive(Default)] -struct SlotStats { - num_repaired: usize, - num_recovered: usize, -} - pub struct IndexMetaWorkingSetEntry { index: Index, // true only if at least one shred for this Index was inserted since the time this @@ -220,13 +200,6 @@ pub struct SlotMetaWorkingSetEntry { did_insert_occur: bool, } -#[derive(PartialEq, Debug, Clone)] -enum ShredSource { - Turbine, - Repaired, - Recovered, -} - #[derive(Default)] pub struct BlockstoreInsertionMetrics { pub num_shreds: usize, @@ -921,13 +894,13 @@ impl Blockstore { let mut newly_completed_data_sets: Vec = vec![]; let mut inserted_indices = Vec::new(); for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() { + let shred_source = if is_repaired { + ShredSource::Repaired + } else { + ShredSource::Turbine + }; match shred.shred_type() { ShredType::Data => { - let shred_source = if is_repaired { - ShredSource::Repaired - } else { - ShredSource::Turbine - }; match self.check_insert_data_shred( shred, &mut erasure_metas, @@ -966,7 +939,7 @@ impl Blockstore { &mut index_meta_time, handle_duplicate, is_trusted, - is_repaired, + shred_source, metrics, ); } @@ -1152,7 +1125,7 @@ impl Blockstore { index_meta_time: &mut u64, handle_duplicate: &F, is_trusted: bool, - is_repaired: bool, + shred_source: ShredSource, metrics: &mut BlockstoreInsertionMetrics, ) -> bool where @@ -1219,13 +1192,10 @@ impl Blockstore { return false; } - - if is_repaired { - let mut slots_stats = self.slots_stats.lock().unwrap(); - let mut e = slots_stats.stats.entry(slot).or_default(); - e.num_repaired += 1; - } - + self.slots_stats + .lock() + .unwrap() + .add_shred(slot, shred_source); // insert coding shred into rocks let result = self .insert_coding_shred(index_meta, &shred, write_batch) @@ -1371,7 +1341,7 @@ impl Blockstore { just_inserted_shreds, &self.last_root, leader_schedule, - shred_source.clone(), + shred_source, ) { return Err(InsertDataShredError::InvalidShred); } @@ -1643,49 +1613,12 @@ impl Blockstore { end_index, }) .collect(); - if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered { + { let mut slots_stats = self.slots_stats.lock().unwrap(); - let mut e = slots_stats.stats.entry(slot_meta.slot).or_default(); - if shred_source == ShredSource::Repaired { - e.num_repaired += 1; + slots_stats.add_shred(slot_meta.slot, shred_source); + if slot_meta.is_full() { + slots_stats.set_full(slot_meta); } - if shred_source == ShredSource::Recovered { - e.num_recovered += 1; - } - } - if slot_meta.is_full() { - let (num_repaired, num_recovered) = { - let mut slots_stats = self.slots_stats.lock().unwrap(); - if let Some(e) = slots_stats.stats.remove(&slot_meta.slot) { - if slots_stats.last_cleanup_ts.elapsed().as_secs() > 30 { - let root = self.last_root(); - slots_stats.stats = slots_stats.stats.split_off(&root); - slots_stats.last_cleanup_ts = Instant::now(); - } - (e.num_repaired, e.num_recovered) - } else { - (0, 0) - } - }; - datapoint_info!( - "shred_insert_is_full", - ( - "total_time_ms", - solana_sdk::timing::timestamp() - slot_meta.first_shred_timestamp, - i64 - ), - ("slot", slot_meta.slot, i64), - ( - "last_index", - slot_meta - .last_index - .and_then(|ix| i64::try_from(ix).ok()) - .unwrap_or(-1), - i64 - ), - ("num_repaired", num_repaired, i64), - ("num_recovered", num_recovered, i64), - ); } trace!("inserted shred into slot {:?} and index {:?}", slot, index); Ok(newly_completed_data_sets) @@ -6029,7 +5962,7 @@ pub mod tests { panic!("no dupes"); }, false, - false, + ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); @@ -6047,7 +5980,7 @@ pub mod tests { counter.fetch_add(1, Ordering::Relaxed); }, false, - false, + ShredSource::Turbine, &mut BlockstoreInsertionMetrics::default(), )); assert_eq!(counter.load(Ordering::Relaxed), 1); diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 75b51a77f..8931c4678 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -8,6 +8,7 @@ pub mod bigtable_delete; pub mod bigtable_upload; pub mod bigtable_upload_service; pub mod block_error; +mod slot_stats; #[macro_use] pub mod blockstore; pub mod ancestor_iterator; diff --git a/ledger/src/slot_stats.rs b/ledger/src/slot_stats.rs new file mode 100644 index 000000000..13e90fe19 --- /dev/null +++ b/ledger/src/slot_stats.rs @@ -0,0 +1,90 @@ +use { + crate::blockstore_meta::SlotMeta, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot, +}; + +const SLOTS_STATS_CACHE_CAPACITY: usize = 300; + +macro_rules! get_mut_entry ( + ($cache:expr, $key:expr) => ( + match $cache.get_mut(&$key) { + Some(entry) => entry, + None => { + $cache.put($key, SlotStats::default()); + $cache.get_mut(&$key).unwrap() + } + } + ); +); + +#[derive(Copy, Clone, Debug)] +pub(crate) enum ShredSource { + Turbine, + Repaired, + Recovered, +} + +bitflags! { + #[derive(Default)] + struct SlotFlags: u8 { + const DEAD = 0b00000001; + const FULL = 0b00000010; + const ROOTED = 0b00000100; + } +} + +#[derive(Default)] +struct SlotStats { + flags: SlotFlags, + num_repaired: usize, + num_recovered: usize, +} + +pub(crate) struct SlotsStats(LruCache); + +impl Default for SlotsStats { + fn default() -> Self { + // LruCache::unbounded because capacity is enforced manually. + Self(LruCache::unbounded()) + } +} + +impl SlotsStats { + pub(crate) fn add_shred(&mut self, slot: Slot, source: ShredSource) { + let entry = get_mut_entry!(self.0, slot); + match source { + ShredSource::Turbine => (), + ShredSource::Repaired => entry.num_repaired += 1, + ShredSource::Recovered => entry.num_recovered += 1, + } + self.maybe_evict_cache(); + } + + pub(crate) fn set_full(&mut self, slot_meta: &SlotMeta) { + let total_time_ms = + solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp); + let last_index = slot_meta + .last_index + .and_then(|ix| i64::try_from(ix).ok()) + .unwrap_or(-1); + let entry = get_mut_entry!(self.0, slot_meta.slot); + if !entry.flags.contains(SlotFlags::FULL) { + datapoint_info!( + "shred_insert_is_full", + ("total_time_ms", total_time_ms, i64), + ("slot", slot_meta.slot, i64), + ("last_index", last_index, i64), + ("num_repaired", entry.num_repaired, i64), + ("num_recovered", entry.num_recovered, i64), + ); + } + entry.flags |= SlotFlags::FULL; + self.maybe_evict_cache(); + } + + fn maybe_evict_cache(&mut self) { + while self.0.len() > SLOTS_STATS_CACHE_CAPACITY { + let (_slot, _entry) = self.0.pop_lru().unwrap(); + // TODO: submit metrics for (slot, entry). + } + } +}