diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 9f1534f96..2851e3acb 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -536,7 +536,7 @@ impl Rocks { fn write(&self, batch: RWriteBatch) -> Result<()> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.write_batch_perf_status.op_count, + &self.write_batch_perf_status, ); let result = self.db.write(batch); if is_perf_enabled { @@ -1302,7 +1302,7 @@ where pub fn get_bytes(&self, key: C::Index) -> Result>> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.read_perf_status.op_count, + &self.read_perf_status, ); let result = self.backend.get_cf(self.handle(), &C::key(key)); if is_perf_enabled { @@ -1380,7 +1380,7 @@ where pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.write_perf_status.op_count, + &self.write_perf_status, ); let result = self.backend.put_cf(self.handle(), &C::key(key), value); if is_perf_enabled { @@ -1407,7 +1407,7 @@ where let mut result = Ok(None); let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.read_perf_status.op_count, + &self.read_perf_status, ); if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? { let value = deserialize(&serialized_value)?; @@ -1424,7 +1424,7 @@ where pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.write_perf_status.op_count, + &self.write_perf_status, ); let serialized_value = serialize(value)?; @@ -1441,7 +1441,7 @@ where pub fn delete(&self, key: C::Index) -> Result<()> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.write_perf_status.op_count, + &self.write_perf_status, ); let result = self.backend.delete_cf(self.handle(), &C::key(key)); if is_perf_enabled { @@ -1461,7 +1461,7 @@ where ) -> Result> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.read_perf_status.op_count, + &self.read_perf_status, ); let result = self.backend.get_cf(self.handle(), &C::key(key)); if is_perf_enabled { @@ -1482,7 +1482,7 @@ where pub fn get_protobuf(&self, key: C::Index) -> Result> { let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.read_perf_status.op_count, + &self.read_perf_status, ); let result = self.backend.get_cf(self.handle(), &C::key(key)); if is_perf_enabled { @@ -1502,7 +1502,7 @@ where let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, - &self.write_perf_status.op_count, + &self.write_perf_status, ); let result = self.backend.put_cf(self.handle(), &C::key(key), &buf); if is_perf_enabled { diff --git a/ledger/src/blockstore_metrics.rs b/ledger/src/blockstore_metrics.rs index 2a918af05..1eba08ba0 100644 --- a/ledger/src/blockstore_metrics.rs +++ b/ledger/src/blockstore_metrics.rs @@ -7,13 +7,15 @@ use { PerfContext, }, solana_metrics::datapoint_info, + solana_sdk::timing::timestamp, std::{ cell::RefCell, fmt::Debug, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, }, + time::Duration, }, }; @@ -236,6 +238,9 @@ impl BlockstoreRocksDbColumnFamilyMetrics { // Thread local instance of RocksDB's PerfContext. thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefCell::new(PerfContext::default());} +// The minimum time duration between two RocksDB perf samples of the same operation. +const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1); + /// The function enables RocksDB PerfContext once for every `sample_interval`. /// /// PerfContext is a thread-local struct defined in RocksDB for collecting @@ -245,25 +250,16 @@ thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefC /// and the PerfContext of the ubsequent RocksDB operation will be collected. pub(crate) fn maybe_enable_rocksdb_perf( sample_interval: usize, - perf_samples_counter: &AtomicUsize, + perf_status: &PerfSamplingStatus, ) -> bool { - if sample_interval == 0 { - return false; + if perf_status.should_sample(sample_interval) { + set_perf_stats(PerfStatsLevel::EnableTime); + PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| { + perf_context.borrow_mut().reset(); + }); + return true; } - - if perf_samples_counter.fetch_add(1, Ordering::Relaxed) < sample_interval { - return false; - } - // Ideally, fetch_sub(*sample_interval) should be used to keep it - // super precise. However, since we do not use Mutex to protect the - // above check and the below operation, we simply reset it to 0. - perf_samples_counter.store(0, Ordering::Relaxed); - - set_perf_stats(PerfStatsLevel::EnableTime); - PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| { - perf_context.borrow_mut().reset(); - }); - true + false } /// Reports the collected PerfContext and disables the PerfContext after @@ -516,7 +512,43 @@ pub(crate) fn report_rocksdb_write_perf(metric_header: &'static str) { /// A struct that holds the current status of RocksDB perf sampling. pub struct PerfSamplingStatus { // The number of RocksDB operations since the last perf sample. - pub(crate) op_count: AtomicUsize, + op_count: AtomicUsize, + // The timestamp of the latest operation with perf stats collection. + last_sample_time_ms: AtomicU64, +} + +impl PerfSamplingStatus { + fn should_sample(&self, sample_count_interval: usize) -> bool { + if sample_count_interval == 0 { + return false; + } + + // Rate-limiting based on the number of samples. + if self.op_count.fetch_add(1, Ordering::Relaxed) < sample_count_interval { + return false; + } + self.op_count.store(0, Ordering::Relaxed); + + // Rate-limiting based on the time duration. + let current_time_ms = timestamp(); + let old_time_ms = self.last_sample_time_ms.load(Ordering::Relaxed); + if old_time_ms + (PERF_SAMPLING_MIN_DURATION.as_millis() as u64) > current_time_ms { + return false; + } + + // If the `last_sample_time_ms` has a different value than `old_time_ms`, + // it means some other thread has performed the sampling and updated + // the last sample time. In this case, the current thread will skip + // the current sample. + self.last_sample_time_ms + .compare_exchange_weak( + old_time_ms, + current_time_ms, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + } } pub trait ColumnMetrics {