diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 894b2cece0..a440e9d71b 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -30,7 +30,7 @@ use { marker::PhantomData, path::Path, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, }, @@ -748,11 +748,13 @@ impl Rocks { } fn write(&self, batch: RWriteBatch) -> Result<()> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_write_counter, + ); let result = self.db.write(batch); - if is_perf_context_enabled { - report_write_perf_context(rocksdb_metric_header!( + if is_perf_enabled { + report_write_perf(rocksdb_metric_header!( "blockstore_rocksdb_write_perf,op=write_batch", "write_batch", self.column_options @@ -2007,6 +2009,12 @@ pub struct LedgerColumnOptions { // If the value is greater than 0, then RocksDB read/write perf sample // will be collected once for every `rocks_perf_sample_interval` ops. pub rocks_perf_sample_interval: usize, + + // A counter to determine whether to sample the current RocksDB read operation. + pub perf_read_counter: Arc, + + // A counter to determine whether to sample the current RocksDB write operation. + pub perf_write_counter: Arc, } impl Default for LedgerColumnOptions { @@ -2015,6 +2023,23 @@ impl Default for LedgerColumnOptions { shred_storage_type: ShredStorageType::RocksLevel, compression_type: BlockstoreCompressionType::default(), rocks_perf_sample_interval: 0, + perf_read_counter: Arc::::default(), + perf_write_counter: Arc::::default(), + } + } +} + +impl LedgerColumnOptions { + pub fn new( + shred_storage_type: ShredStorageType, + compression_type: BlockstoreCompressionType, + rocks_perf_sample_interval: usize, + ) -> Self { + Self { + shred_storage_type, + compression_type, + rocks_perf_sample_interval, + ..Self::default() } } } @@ -2184,11 +2209,13 @@ where C: Column + ColumnName + ColumnMetrics, { pub fn get_bytes(&self, key: C::Index) -> Result>> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_read_counter, + ); let result = self.backend.get_cf(self.handle(), &C::key(key)); - if is_perf_context_enabled { - report_read_perf_context(C::rocksdb_get_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } result } @@ -2260,11 +2287,13 @@ where } pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_write_counter, + ); let result = self.backend.put_cf(self.handle(), &C::key(key), value); - if is_perf_context_enabled { - report_write_perf_context(C::rocksdb_put_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); } result } @@ -2281,25 +2310,45 @@ where mod rocks_metrics_utils { use { - rand::{thread_rng, Rng}, rocksdb::{ perf::{set_perf_stats, PerfMetric, PerfStatsLevel}, PerfContext, }, - std::cell::RefCell, + std::{ + cell::RefCell, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }, }; // Thread local instance of RocksDB's PerfContext. thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefCell::new(PerfContext::default());} - /// Returns true if the PerfContext is enabled. - pub(crate) fn maybe_collect_perf_context(sample_interval: usize) -> bool { + /// The function enables RocksDB PerfContext once for every `sample_interval`. + /// + /// PerfContext is a thread-local struct defined in RocksDB for collecting + /// per-thread read / write performance metrics. + /// + /// When this function enables PerfContext, the function will return true, + /// and the PerfContext of the ubsequent RocksDB operation will be collected. + pub fn maybe_enable_perf( + sample_interval: usize, + perf_samples_counter: &Arc, + ) -> bool { if sample_interval == 0 { return false; } - if thread_rng().gen_range(0, sample_interval) > 0 { + + if perf_samples_counter.fetch_add(1, Ordering::Relaxed) < sample_interval { return false; } + // Ideally, fetch_sub(*sample_interval) should be used to keep it + // super precise. However, since we do not use Mutex to protect the + // above check and the below operation, we simply reset it to 0. + perf_samples_counter.store(0, Ordering::Relaxed); + set_perf_stats(PerfStatsLevel::EnableTime); PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| { perf_context.borrow_mut().reset(); @@ -2309,7 +2358,7 @@ mod rocks_metrics_utils { /// Reports the collected PerfContext and disables the PerfContext after /// reporting. - pub(crate) fn report_read_perf_context(metric_header: &'static str) { + pub(crate) fn report_read_perf(metric_header: &'static str) { PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context_cell| { set_perf_stats(PerfStatsLevel::Disable); let perf_context = perf_context_cell.borrow(); @@ -2489,7 +2538,7 @@ mod rocks_metrics_utils { } /// Reports the collected PerfContext and disables the PerfContext after /// reporting. - pub fn report_write_perf_context(metric_header: &'static str) { + pub fn report_write_perf(metric_header: &'static str) { PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context_cell| { set_perf_stats(PerfStatsLevel::Disable); let perf_context = perf_context_cell.borrow(); @@ -2554,7 +2603,7 @@ mod rocks_metrics_utils { } } use crate::blockstore_db::rocks_metrics_utils::{ - maybe_collect_perf_context, report_read_perf_context, report_write_perf_context, + maybe_enable_perf, report_read_perf, report_write_perf, }; impl LedgerColumn @@ -2563,41 +2612,47 @@ where { pub fn get(&self, key: C::Index) -> Result> { let mut result = Ok(None); - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_read_counter, + ); if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? { let value = deserialize(&serialized_value)?; result = Ok(Some(value)) } - if is_perf_context_enabled { - report_read_perf_context(C::rocksdb_get_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } result } pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_write_counter, + ); let serialized_value = serialize(value)?; let result = self .backend .put_cf(self.handle(), &C::key(key), &serialized_value); - if is_perf_context_enabled { - report_write_perf_context(C::rocksdb_put_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); } result } pub fn delete(&self, key: C::Index) -> Result<()> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_write_counter, + ); let result = self.backend.delete_cf(self.handle(), &C::key(key)); - if is_perf_context_enabled { - report_write_perf_context(C::rocksdb_delete_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_write_perf(C::rocksdb_delete_perf_metric_header(&self.column_options)); } result } @@ -2611,11 +2666,13 @@ where &self, key: C::Index, ) -> Result> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_read_counter, + ); let result = self.backend.get_cf(self.handle(), &C::key(key)); - if is_perf_context_enabled { - report_read_perf_context(C::rocksdb_get_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } if let Some(serialized_value) = result? { @@ -2630,11 +2687,13 @@ where } pub fn get_protobuf(&self, key: C::Index) -> Result> { - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_read_counter, + ); let result = self.backend.get_cf(self.handle(), &C::key(key)); - if is_perf_context_enabled { - report_read_perf_context(C::rocksdb_get_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } if let Some(serialized_value) = result? { @@ -2648,11 +2707,13 @@ where let mut buf = Vec::with_capacity(value.encoded_len()); value.encode(&mut buf)?; - let is_perf_context_enabled = - maybe_collect_perf_context(self.column_options.rocks_perf_sample_interval); + let is_perf_enabled = maybe_enable_perf( + self.column_options.rocks_perf_sample_interval, + &self.column_options.perf_write_counter, + ); let result = self.backend.put_cf(self.handle(), &C::key(key), &buf); - if is_perf_context_enabled { - report_write_perf_context(C::rocksdb_put_perf_metric_header(&self.column_options)); + if is_perf_enabled { + report_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); } result diff --git a/validator/src/main.rs b/validator/src/main.rs index e04e440821..3ffd0fbdd9 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2743,6 +2743,7 @@ pub fn main() { "rocksdb_perf_sample_interval", usize ), + ..LedgerColumnOptions::default() }; if matches.is_present("halt_on_known_validators_accounts_hash_mismatch") {