From de2033f2f2b2440a707f498b633bf2c708620643 Mon Sep 17 00:00:00 2001
From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com>
Date: Fri, 20 May 2022 10:54:27 -0700
Subject: [PATCH] (LedgerStore) Rate-limit RocksDB perf sample by a minimum
time interval (#25093)
#### Problem
When the number of RocksDB read/write operations spikes, its payload size
might exceed the limit (413 Payload Too Large).
#### Summary of Changes
This PR rate-limit the perf-sampling of RocksDB read/write operations by one second
in addition to the existing sampling that is configurable via the hidden validator
argument --rocksdb-perf-sample-interval.
---
ledger/src/blockstore_db.rs | 18 ++++----
ledger/src/blockstore_metrics.rs | 70 +++++++++++++++++++++++---------
2 files changed, 60 insertions(+), 28 deletions(-)
diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs
index 9f1534f969..2851e3acb9 100644
--- a/ledger/src/blockstore_db.rs
+++ b/ledger/src/blockstore_db.rs
@@ -536,7 +536,7 @@ impl Rocks {
fn write(&self, batch: RWriteBatch) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.write_batch_perf_status.op_count,
+ &self.write_batch_perf_status,
);
let result = self.db.write(batch);
if is_perf_enabled {
@@ -1302,7 +1302,7 @@ where
pub fn get_bytes(&self, key: C::Index) -> Result>> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.read_perf_status.op_count,
+ &self.read_perf_status,
);
let result = self.backend.get_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@@ -1380,7 +1380,7 @@ where
pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.write_perf_status.op_count,
+ &self.write_perf_status,
);
let result = self.backend.put_cf(self.handle(), &C::key(key), value);
if is_perf_enabled {
@@ -1407,7 +1407,7 @@ where
let mut result = Ok(None);
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.read_perf_status.op_count,
+ &self.read_perf_status,
);
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
let value = deserialize(&serialized_value)?;
@@ -1424,7 +1424,7 @@ where
pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.write_perf_status.op_count,
+ &self.write_perf_status,
);
let serialized_value = serialize(value)?;
@@ -1441,7 +1441,7 @@ where
pub fn delete(&self, key: C::Index) -> Result<()> {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.write_perf_status.op_count,
+ &self.write_perf_status,
);
let result = self.backend.delete_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@@ -1461,7 +1461,7 @@ where
) -> Result > {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.read_perf_status.op_count,
+ &self.read_perf_status,
);
let result = self.backend.get_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@@ -1482,7 +1482,7 @@ where
pub fn get_protobuf(&self, key: C::Index) -> Result > {
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.read_perf_status.op_count,
+ &self.read_perf_status,
);
let result = self.backend.get_cf(self.handle(), &C::key(key));
if is_perf_enabled {
@@ -1502,7 +1502,7 @@ where
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
- &self.write_perf_status.op_count,
+ &self.write_perf_status,
);
let result = self.backend.put_cf(self.handle(), &C::key(key), &buf);
if is_perf_enabled {
diff --git a/ledger/src/blockstore_metrics.rs b/ledger/src/blockstore_metrics.rs
index 2a918af05a..1eba08ba0a 100644
--- a/ledger/src/blockstore_metrics.rs
+++ b/ledger/src/blockstore_metrics.rs
@@ -7,13 +7,15 @@ use {
PerfContext,
},
solana_metrics::datapoint_info,
+ solana_sdk::timing::timestamp,
std::{
cell::RefCell,
fmt::Debug,
sync::{
- atomic::{AtomicUsize, Ordering},
+ atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
+ time::Duration,
},
};
@@ -236,6 +238,9 @@ impl BlockstoreRocksDbColumnFamilyMetrics {
// Thread local instance of RocksDB's PerfContext.
thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefCell::new(PerfContext::default());}
+// The minimum time duration between two RocksDB perf samples of the same operation.
+const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1);
+
/// The function enables RocksDB PerfContext once for every `sample_interval`.
///
/// PerfContext is a thread-local struct defined in RocksDB for collecting
@@ -245,25 +250,16 @@ thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefC
/// and the PerfContext of the ubsequent RocksDB operation will be collected.
pub(crate) fn maybe_enable_rocksdb_perf(
sample_interval: usize,
- perf_samples_counter: &AtomicUsize,
+ perf_status: &PerfSamplingStatus,
) -> bool {
- if sample_interval == 0 {
- return false;
+ if perf_status.should_sample(sample_interval) {
+ set_perf_stats(PerfStatsLevel::EnableTime);
+ PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| {
+ perf_context.borrow_mut().reset();
+ });
+ return true;
}
-
- if perf_samples_counter.fetch_add(1, Ordering::Relaxed) < sample_interval {
- return false;
- }
- // Ideally, fetch_sub(*sample_interval) should be used to keep it
- // super precise. However, since we do not use Mutex to protect the
- // above check and the below operation, we simply reset it to 0.
- perf_samples_counter.store(0, Ordering::Relaxed);
-
- set_perf_stats(PerfStatsLevel::EnableTime);
- PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| {
- perf_context.borrow_mut().reset();
- });
- true
+ false
}
/// Reports the collected PerfContext and disables the PerfContext after
@@ -516,7 +512,43 @@ pub(crate) fn report_rocksdb_write_perf(metric_header: &'static str) {
/// A struct that holds the current status of RocksDB perf sampling.
pub struct PerfSamplingStatus {
// The number of RocksDB operations since the last perf sample.
- pub(crate) op_count: AtomicUsize,
+ op_count: AtomicUsize,
+ // The timestamp of the latest operation with perf stats collection.
+ last_sample_time_ms: AtomicU64,
+}
+
+impl PerfSamplingStatus {
+ fn should_sample(&self, sample_count_interval: usize) -> bool {
+ if sample_count_interval == 0 {
+ return false;
+ }
+
+ // Rate-limiting based on the number of samples.
+ if self.op_count.fetch_add(1, Ordering::Relaxed) < sample_count_interval {
+ return false;
+ }
+ self.op_count.store(0, Ordering::Relaxed);
+
+ // Rate-limiting based on the time duration.
+ let current_time_ms = timestamp();
+ let old_time_ms = self.last_sample_time_ms.load(Ordering::Relaxed);
+ if old_time_ms + (PERF_SAMPLING_MIN_DURATION.as_millis() as u64) > current_time_ms {
+ return false;
+ }
+
+ // If the `last_sample_time_ms` has a different value than `old_time_ms`,
+ // it means some other thread has performed the sampling and updated
+ // the last sample time. In this case, the current thread will skip
+ // the current sample.
+ self.last_sample_time_ms
+ .compare_exchange_weak(
+ old_time_ms,
+ current_time_ms,
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ )
+ .is_ok()
+ }
}
pub trait ColumnMetrics {