expands lifetime of SlotStats (#23872)

Current slot stats are removed when the slot is full or every 30 seconds
if the slot is before root:
https://github.com/solana-labs/solana/blob/493a8e234/ledger/src/blockstore.rs#L2017-L2027

In order to track if the slot is ultimately marked as dead or rooted and
emit more metrics, this commit expands lifetime of SlotStats while
bounding total size of cache using an LRU eviction policy.
This commit is contained in:
behzad nouri 2022-03-25 19:32:22 +00:00 committed by GitHub
parent c6dda3b324
commit 1f9c89c1e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 115 additions and 87 deletions

2
Cargo.lock generated
View File

@ -5075,6 +5075,7 @@ version = "1.11.0"
dependencies = [
"assert_matches",
"bincode",
"bitflags",
"byteorder",
"chrono",
"chrono-humanize",
@ -5085,6 +5086,7 @@ dependencies = [
"lazy_static",
"libc",
"log",
"lru",
"matches",
"num-derive",
"num-traits",

View File

@ -11,6 +11,7 @@ edition = "2021"
[dependencies]
bincode = "1.3.3"
bitflags = "1.3.1"
byteorder = "1.4.3"
chrono = { version = "0.4.11", features = ["serde"] }
chrono-humanize = "0.2.1"
@ -21,6 +22,7 @@ itertools = "0.10.3"
lazy_static = "1.4.0"
libc = "0.2.120"
log = { version = "0.4.14" }
lru = "0.7.3"
num-derive = "0.3"
num-traits = "0.2"
num_cpus = "1.13.1"

View File

@ -15,6 +15,7 @@ use {
max_ticks_per_n_shreds, ErasureSetId, Result as ShredResult, Shred, ShredId, ShredType,
Shredder, SHRED_PAYLOAD_SIZE,
},
slot_stats::{ShredSource, SlotsStats},
},
bincode::deserialize,
crossbeam_channel::{bounded, Receiver, Sender, TrySendError},
@ -49,7 +50,7 @@ use {
borrow::Cow,
cell::RefCell,
cmp,
collections::{hash_map::Entry as HashMapEntry, BTreeMap, BTreeSet, HashMap, HashSet},
collections::{hash_map::Entry as HashMapEntry, BTreeSet, HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
@ -59,7 +60,6 @@ use {
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock, RwLockWriteGuard,
},
time::Instant,
},
tempfile::{Builder, TempDir},
thiserror::Error,
@ -178,26 +178,6 @@ pub struct Blockstore {
slots_stats: Mutex<SlotsStats>,
}
struct SlotsStats {
last_cleanup_ts: Instant,
stats: BTreeMap<Slot, SlotStats>,
}
impl Default for SlotsStats {
fn default() -> Self {
SlotsStats {
last_cleanup_ts: Instant::now(),
stats: BTreeMap::new(),
}
}
}
#[derive(Default)]
struct SlotStats {
num_repaired: usize,
num_recovered: usize,
}
pub struct IndexMetaWorkingSetEntry {
index: Index,
// true only if at least one shred for this Index was inserted since the time this
@ -220,13 +200,6 @@ pub struct SlotMetaWorkingSetEntry {
did_insert_occur: bool,
}
#[derive(PartialEq, Debug, Clone)]
enum ShredSource {
Turbine,
Repaired,
Recovered,
}
#[derive(Default)]
pub struct BlockstoreInsertionMetrics {
pub num_shreds: usize,
@ -921,13 +894,13 @@ impl Blockstore {
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
for (i, (shred, is_repaired)) in shreds.into_iter().zip(is_repaired).enumerate() {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
match shred.shred_type() {
ShredType::Data => {
let shred_source = if is_repaired {
ShredSource::Repaired
} else {
ShredSource::Turbine
};
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
@ -966,7 +939,7 @@ impl Blockstore {
&mut index_meta_time,
handle_duplicate,
is_trusted,
is_repaired,
shred_source,
metrics,
);
}
@ -1152,7 +1125,7 @@ impl Blockstore {
index_meta_time: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
is_repaired: bool,
shred_source: ShredSource,
metrics: &mut BlockstoreInsertionMetrics,
) -> bool
where
@ -1219,13 +1192,10 @@ impl Blockstore {
return false;
}
if is_repaired {
let mut slots_stats = self.slots_stats.lock().unwrap();
let mut e = slots_stats.stats.entry(slot).or_default();
e.num_repaired += 1;
}
self.slots_stats
.lock()
.unwrap()
.add_shred(slot, shred_source);
// insert coding shred into rocks
let result = self
.insert_coding_shred(index_meta, &shred, write_batch)
@ -1371,7 +1341,7 @@ impl Blockstore {
just_inserted_shreds,
&self.last_root,
leader_schedule,
shred_source.clone(),
shred_source,
) {
return Err(InsertDataShredError::InvalidShred);
}
@ -1643,49 +1613,12 @@ impl Blockstore {
end_index,
})
.collect();
if shred_source == ShredSource::Repaired || shred_source == ShredSource::Recovered {
{
let mut slots_stats = self.slots_stats.lock().unwrap();
let mut e = slots_stats.stats.entry(slot_meta.slot).or_default();
if shred_source == ShredSource::Repaired {
e.num_repaired += 1;
slots_stats.add_shred(slot_meta.slot, shred_source);
if slot_meta.is_full() {
slots_stats.set_full(slot_meta);
}
if shred_source == ShredSource::Recovered {
e.num_recovered += 1;
}
}
if slot_meta.is_full() {
let (num_repaired, num_recovered) = {
let mut slots_stats = self.slots_stats.lock().unwrap();
if let Some(e) = slots_stats.stats.remove(&slot_meta.slot) {
if slots_stats.last_cleanup_ts.elapsed().as_secs() > 30 {
let root = self.last_root();
slots_stats.stats = slots_stats.stats.split_off(&root);
slots_stats.last_cleanup_ts = Instant::now();
}
(e.num_repaired, e.num_recovered)
} else {
(0, 0)
}
};
datapoint_info!(
"shred_insert_is_full",
(
"total_time_ms",
solana_sdk::timing::timestamp() - slot_meta.first_shred_timestamp,
i64
),
("slot", slot_meta.slot, i64),
(
"last_index",
slot_meta
.last_index
.and_then(|ix| i64::try_from(ix).ok())
.unwrap_or(-1),
i64
),
("num_repaired", num_repaired, i64),
("num_recovered", num_recovered, i64),
);
}
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(newly_completed_data_sets)
@ -6029,7 +5962,7 @@ pub mod tests {
panic!("no dupes");
},
false,
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
@ -6047,7 +5980,7 @@ pub mod tests {
counter.fetch_add(1, Ordering::Relaxed);
},
false,
false,
ShredSource::Turbine,
&mut BlockstoreInsertionMetrics::default(),
));
assert_eq!(counter.load(Ordering::Relaxed), 1);

View File

@ -8,6 +8,7 @@ pub mod bigtable_delete;
pub mod bigtable_upload;
pub mod bigtable_upload_service;
pub mod block_error;
mod slot_stats;
#[macro_use]
pub mod blockstore;
pub mod ancestor_iterator;

90
ledger/src/slot_stats.rs Normal file
View File

@ -0,0 +1,90 @@
use {
crate::blockstore_meta::SlotMeta, bitflags::bitflags, lru::LruCache, solana_sdk::clock::Slot,
};
const SLOTS_STATS_CACHE_CAPACITY: usize = 300;
macro_rules! get_mut_entry (
($cache:expr, $key:expr) => (
match $cache.get_mut(&$key) {
Some(entry) => entry,
None => {
$cache.put($key, SlotStats::default());
$cache.get_mut(&$key).unwrap()
}
}
);
);
#[derive(Copy, Clone, Debug)]
pub(crate) enum ShredSource {
Turbine,
Repaired,
Recovered,
}
bitflags! {
#[derive(Default)]
struct SlotFlags: u8 {
const DEAD = 0b00000001;
const FULL = 0b00000010;
const ROOTED = 0b00000100;
}
}
#[derive(Default)]
struct SlotStats {
flags: SlotFlags,
num_repaired: usize,
num_recovered: usize,
}
pub(crate) struct SlotsStats(LruCache<Slot, SlotStats>);
impl Default for SlotsStats {
fn default() -> Self {
// LruCache::unbounded because capacity is enforced manually.
Self(LruCache::unbounded())
}
}
impl SlotsStats {
pub(crate) fn add_shred(&mut self, slot: Slot, source: ShredSource) {
let entry = get_mut_entry!(self.0, slot);
match source {
ShredSource::Turbine => (),
ShredSource::Repaired => entry.num_repaired += 1,
ShredSource::Recovered => entry.num_recovered += 1,
}
self.maybe_evict_cache();
}
pub(crate) fn set_full(&mut self, slot_meta: &SlotMeta) {
let total_time_ms =
solana_sdk::timing::timestamp().saturating_sub(slot_meta.first_shred_timestamp);
let last_index = slot_meta
.last_index
.and_then(|ix| i64::try_from(ix).ok())
.unwrap_or(-1);
let entry = get_mut_entry!(self.0, slot_meta.slot);
if !entry.flags.contains(SlotFlags::FULL) {
datapoint_info!(
"shred_insert_is_full",
("total_time_ms", total_time_ms, i64),
("slot", slot_meta.slot, i64),
("last_index", last_index, i64),
("num_repaired", entry.num_repaired, i64),
("num_recovered", entry.num_recovered, i64),
);
}
entry.flags |= SlotFlags::FULL;
self.maybe_evict_cache();
}
fn maybe_evict_cache(&mut self) {
while self.0.len() > SLOTS_STATS_CACHE_CAPACITY {
let (_slot, _entry) = self.0.pop_lru().unwrap();
// TODO: submit metrics for (slot, entry).
}
}
}