(LedgerStore) Rate-limit RocksDB perf sample by a minimum time interval (#25093)

#### Problem
When the number of RocksDB read/write operations spikes, its payload size
might exceed the limit (413 Payload Too Large).

#### Summary of Changes
This PR rate-limit the perf-sampling of RocksDB read/write operations by one second
in addition to the existing sampling that is configurable via the hidden validator
argument --rocksdb-perf-sample-interval.
This commit is contained in:
Yueh-Hsuan Chiang 2022-05-20 10:54:27 -07:00 committed by GitHub
parent 397a14b127
commit de2033f2f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 28 deletions

View File

@ -536,7 +536,7 @@ impl Rocks {
fn write(&self, batch: RWriteBatch) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.write_batch_perf_status.op_count,
&self.write_batch_perf_status,
);
let result = self.db.write(batch);
if is_perf_enabled {
@ -1302,7 +1302,7 @@ where
pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status.op_count,
&self.read_perf_status,
);
let result = self.backend.get_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@ -1380,7 +1380,7 @@ where
pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.write_perf_status.op_count,
&self.write_perf_status,
);
let result = self.backend.put_cf(self.handle(), &C::key(key), value);
if is_perf_enabled {
@ -1407,7 +1407,7 @@ where
let mut result = Ok(None);
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status.op_count,
&self.read_perf_status,
);
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
let value = deserialize(&serialized_value)?;
@ -1424,7 +1424,7 @@ where
pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.write_perf_status.op_count,
&self.write_perf_status,
);
let serialized_value = serialize(value)?;
@ -1441,7 +1441,7 @@ where
pub fn delete(&self, key: C::Index) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.write_perf_status.op_count,
&self.write_perf_status,
);
let result = self.backend.delete_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@ -1461,7 +1461,7 @@ where
) -> Result<Option<C::Type>> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status.op_count,
&self.read_perf_status,
);
let result = self.backend.get_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@ -1482,7 +1482,7 @@ where
pub fn get_protobuf(&self, key: C::Index) -> Result<Option<C::Type>> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status.op_count,
&self.read_perf_status,
);
let result = self.backend.get_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@ -1502,7 +1502,7 @@ where
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.write_perf_status.op_count,
&self.write_perf_status,
);
let result = self.backend.put_cf(self.handle(), &C::key(key), &buf);
if is_perf_enabled {

View File

@ -7,13 +7,15 @@ use {
PerfContext,
},
solana_metrics::datapoint_info,
solana_sdk::timing::timestamp,
std::{
cell::RefCell,
fmt::Debug,
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
},
};
@ -236,6 +238,9 @@ impl BlockstoreRocksDbColumnFamilyMetrics {
// Thread local instance of RocksDB's PerfContext.
thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell<PerfContext> = RefCell::new(PerfContext::default());}
// The minimum time duration between two RocksDB perf samples of the same operation.
const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1);
/// The function enables RocksDB PerfContext once for every `sample_interval`.
///
/// PerfContext is a thread-local struct defined in RocksDB for collecting
@ -245,25 +250,16 @@ thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell<PerfContext> = RefC
/// and the PerfContext of the ubsequent RocksDB operation will be collected.
pub(crate) fn maybe_enable_rocksdb_perf(
sample_interval: usize,
perf_samples_counter: &AtomicUsize,
perf_status: &PerfSamplingStatus,
) -> bool {
if sample_interval == 0 {
return false;
if perf_status.should_sample(sample_interval) {
set_perf_stats(PerfStatsLevel::EnableTime);
PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| {
perf_context.borrow_mut().reset();
});
return true;
}
if perf_samples_counter.fetch_add(1, Ordering::Relaxed) < sample_interval {
return false;
}
// Ideally, fetch_sub(*sample_interval) should be used to keep it
// super precise. However, since we do not use Mutex to protect the
// above check and the below operation, we simply reset it to 0.
perf_samples_counter.store(0, Ordering::Relaxed);
set_perf_stats(PerfStatsLevel::EnableTime);
PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| {
perf_context.borrow_mut().reset();
});
true
false
}
/// Reports the collected PerfContext and disables the PerfContext after
@ -516,7 +512,43 @@ pub(crate) fn report_rocksdb_write_perf(metric_header: &'static str) {
/// A struct that holds the current status of RocksDB perf sampling.
pub struct PerfSamplingStatus {
// The number of RocksDB operations since the last perf sample.
pub(crate) op_count: AtomicUsize,
op_count: AtomicUsize,
// The timestamp of the latest operation with perf stats collection.
last_sample_time_ms: AtomicU64,
}
impl PerfSamplingStatus {
fn should_sample(&self, sample_count_interval: usize) -> bool {
if sample_count_interval == 0 {
return false;
}
// Rate-limiting based on the number of samples.
if self.op_count.fetch_add(1, Ordering::Relaxed) < sample_count_interval {
return false;
}
self.op_count.store(0, Ordering::Relaxed);
// Rate-limiting based on the time duration.
let current_time_ms = timestamp();
let old_time_ms = self.last_sample_time_ms.load(Ordering::Relaxed);
if old_time_ms + (PERF_SAMPLING_MIN_DURATION.as_millis() as u64) > current_time_ms {
return false;
}
// If the `last_sample_time_ms` has a different value than `old_time_ms`,
// it means some other thread has performed the sampling and updated
// the last sample time. In this case, the current thread will skip
// the current sample.
self.last_sample_time_ms
.compare_exchange_weak(
old_time_ms,
current_time_ms,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
}
}
pub trait ColumnMetrics {