(Ledger Store) Report RocksDB Column Family Metrics (#22503)

This PR enables blockstore to periodically report RocksDB column family properties.
The reported properties are under blockstore_rocksdb_cfs, and the properties also
support group by operation on cf_name.
This commit is contained in:
Yueh-Hsuan Chiang 2022-03-05 16:13:03 -08:00 committed by GitHub
parent ba771cdc45
commit b8b7163b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 326 additions and 8 deletions

View File

@ -0,0 +1,44 @@
//! The `ledger_metric_report_service` periodically reports ledger store metrics.
use {
solana_ledger::blockstore::Blockstore,
std::{
string::ToString,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
// Determines how often we report blockstore metrics.
const BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS: u64 = 10000;
pub struct LedgerMetricReportService {
t_cf_metric: JoinHandle<()>,
}
impl LedgerMetricReportService {
pub fn new(blockstore: Arc<Blockstore>, exit: &Arc<AtomicBool>) -> Self {
let exit_signal = exit.clone();
let t_cf_metric = Builder::new()
.name("metric_report_rocksdb_cf_metrics".to_string())
.spawn(move || loop {
if exit_signal.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(
BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS,
));
blockstore.submit_rocksdb_cf_metrics_for_all_cfs();
})
.unwrap();
Self { t_cf_metric }
}
pub fn join(self) -> thread::Result<()> {
self.t_cf_metric.join()
}
}

View File

@ -31,6 +31,7 @@ pub mod latest_validator_votes_for_frozen_banks;
pub mod leader_slot_banking_stage_metrics;
pub mod leader_slot_banking_stage_timing_metrics;
pub mod ledger_cleanup_service;
pub mod ledger_metric_report_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_hasher;

View File

@ -16,6 +16,7 @@ use {
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
ledger_metric_report_service::LedgerMetricReportService,
replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage,
rewards_recorder_service::RewardsRecorderSender,
@ -70,6 +71,7 @@ pub struct Tvu {
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
ledger_cleanup_service: Option<LedgerCleanupService>,
ledger_metric_report_service: LedgerMetricReportService,
accounts_background_service: AccountsBackgroundService,
accounts_hash_verifier: AccountsHashVerifier,
cost_update_service: CostUpdateService,
@ -357,6 +359,8 @@ impl Tvu {
)
});
let ledger_metric_report_service = LedgerMetricReportService::new(blockstore, exit);
let accounts_background_service = AccountsBackgroundService::new(
bank_forks.clone(),
exit,
@ -373,6 +377,7 @@ impl Tvu {
retransmit_stage,
replay_stage,
ledger_cleanup_service,
ledger_metric_report_service,
accounts_background_service,
accounts_hash_verifier,
cost_update_service,
@ -389,6 +394,7 @@ impl Tvu {
if self.ledger_cleanup_service.is_some() {
self.ledger_cleanup_service.unwrap().join()?;
}
self.ledger_metric_report_service.join()?;
self.accounts_background_service.join()?;
self.replay_stage.join()?;
self.accounts_hash_verifier.join()?;

View File

@ -294,7 +294,7 @@ mod tests {
*time_previous = time_now;
*storage_previous = storage_now;
*data_shred_storage_previous = data_shred_storage_now;
*data_shred_storage_previous = data_shred_storage_now.try_into().unwrap();
}
/// Helper function of the benchmark `test_ledger_cleanup_compaction` which

View File

@ -5,8 +5,8 @@ use {
crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, AccessType, BlockstoreOptions, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, ShredStorageType, WriteBatch,
columns as cf, AccessType, BlockstoreOptions, Column, ColumnName, Database,
IteratorDirection, IteratorMode, LedgerColumn, Result, ShredStorageType, WriteBatch,
},
blockstore_meta::*,
leader_schedule_cache::LeaderScheduleCache,
@ -74,6 +74,7 @@ pub mod blockstore_purge;
pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
pub const BLOCKSTORE_DIRECTORY_ROCKS_FIFO: &str = "rocksdb_fifo";
pub const BLOCKSTORE_METRICS_ERROR: i64 = -1;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
@ -255,6 +256,101 @@ pub struct BlockstoreInsertionMetrics {
num_coding_shreds_inserted: usize,
}
#[derive(Default)]
/// A metrics struct that exposes RocksDB's column family properties.
///
/// Here we only expose a subset of all the internal properties which are
/// relevant to the ledger store performance.
///
/// The list of completed RocksDB internal properties can be found
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
pub struct BlockstoreRocksDbColumnFamilyMetrics {
// Size related
// The storage size occupied by the column family.
// RocksDB's internal property key: "rocksdb.total-sst-files-size"
pub total_sst_files_size: i64,
// The memory size occupied by the column family's in-memory buffer.
// RocksDB's internal property key: "rocksdb.size-all-mem-tables"
pub size_all_mem_tables: i64,
// Snapshot related
// Number of snapshots hold for the column family.
// RocksDB's internal property key: "rocksdb.num-snapshots"
pub num_snapshots: i64,
// Unit timestamp of the oldest unreleased snapshot.
// RocksDB's internal property key: "rocksdb.oldest-snapshot-time"
pub oldest_snapshot_time: i64,
// Write related
// The current actual delayed write rate. 0 means no delay.
// RocksDB's internal property key: "rocksdb.actual-delayed-write-rate"
pub actual_delayed_write_rate: i64,
// A flag indicating whether writes are stopped on this column family.
// 1 indicates writes have been stopped.
// RocksDB's internal property key: "rocksdb.is-write-stopped"
pub is_write_stopped: i64,
// Memory / block cache related
// The block cache capacity of the column family.
// RocksDB's internal property key: "rocksdb.block-cache-capacity"
pub block_cache_capacity: i64,
// The memory size used by the column family in the block cache.
// RocksDB's internal property key: "rocksdb.block-cache-usage"
pub block_cache_usage: i64,
// The memory size used by the column family in the block cache where
// entries are pinned.
// RocksDB's internal property key: "rocksdb.block-cache-pinned-usage"
pub block_cache_pinned_usage: i64,
// The estimated memory size used for reading SST tables in this column
// family such as filters and index blocks. Note that this number does not
// include the memory used in block cache.
// RocksDB's internal property key: "rocksdb.estimate-table-readers-mem"
pub estimate_table_readers_mem: i64,
// Flush and compaction
// A 1 or 0 flag indicating whether a memtable flush is pending.
// If this number is 1, it means a memtable is waiting for being flushed,
// but there might be too many L0 files that prevents it from being flushed.
// RocksDB's internal property key: "rocksdb.mem-table-flush-pending"
pub mem_table_flush_pending: i64,
// A 1 or 0 flag indicating whether a compaction job is pending.
// If this number is 1, it means some part of the column family requires
// compaction in order to maintain shape of LSM tree, but the compaction
// is pending because the desired compaction job is either waiting for
// other dependnent compactions to be finished or waiting for an available
// compaction thread.
// RocksDB's internal property key: "rocksdb.compaction-pending"
pub compaction_pending: i64,
// The number of compactions that are currently running for the column family.
// RocksDB's internal property key: "rocksdb.num-running-compactions"
pub num_running_compactions: i64,
// The number of flushes that are currently running for the column family.
// RocksDB's internal property key: "rocksdb.num-running-flushes"
pub num_running_flushes: i64,
// FIFO Compaction related
// returns an estimation of the oldest key timestamp in the DB. Only vailable
// for FIFO compaction with compaction_options_fifo.allow_compaction = false.
// RocksDB's internal property key: "rocksdb.estimate-oldest-key-time"
pub estimate_oldest_key_time: i64,
// Misc
// The accumulated number of RocksDB background errors.
// RocksDB's internal property key: "rocksdb.background-errors"
pub background_errors: i64,
}
impl SlotMetaWorkingSetEntry {
/// Construct a new SlotMetaWorkingSetEntry with the specified `new_slot_meta`
/// and `old_slot_meta`. `did_insert_occur` is set to false.
@ -350,6 +446,78 @@ impl BlockstoreInsertionMetrics {
}
}
impl BlockstoreRocksDbColumnFamilyMetrics {
/// Report metrics with the specified metric name and column family tag.
/// The metric name and the column family tag is embeded in the parameter
/// `metric_name_and_cf_tag` with the following format.
///
/// For example, "blockstore_rocksdb_cfs,cf_name=shred_data".
pub fn report_metrics(&self, metric_name_and_cf_tag: &'static str) {
datapoint_info!(
metric_name_and_cf_tag,
// Size related
(
"total_sst_files_size",
self.total_sst_files_size as i64,
i64
),
("size_all_mem_tables", self.size_all_mem_tables as i64, i64),
// Snapshot related
("num_snapshots", self.num_snapshots as i64, i64),
(
"oldest_snapshot_time",
self.oldest_snapshot_time as i64,
i64
),
// Write related
(
"actual_delayed_write_rate",
self.actual_delayed_write_rate as i64,
i64
),
("is_write_stopped", self.is_write_stopped as i64, i64),
// Memory / block cache related
(
"block_cache_capacity",
self.block_cache_capacity as i64,
i64
),
("block_cache_usage", self.block_cache_usage as i64, i64),
(
"block_cache_pinned_usage",
self.block_cache_pinned_usage as i64,
i64
),
(
"estimate_table_readers_mem",
self.estimate_table_readers_mem as i64,
i64
),
// Flush and compaction
(
"mem_table_flush_pending",
self.mem_table_flush_pending as i64,
i64
),
("compaction_pending", self.compaction_pending as i64, i64),
(
"num_running_compactions",
self.num_running_compactions as i64,
i64
),
("num_running_flushes", self.num_running_flushes as i64, i64),
// FIFO Compaction related
(
"estimate_oldest_key_time",
self.estimate_oldest_key_time as i64,
i64
),
// Misc
("background_errors", self.background_errors as i64, i64),
);
}
}
impl Blockstore {
pub fn db(self) -> Arc<Database> {
self.db
@ -766,6 +934,98 @@ impl Blockstore {
);
}
/// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the
/// all the column families.
pub fn submit_rocksdb_cf_metrics_for_all_cfs(&self) {
self.submit_rocksdb_cf_metrics::<cf::SlotMeta>(rocksdb_cf_metric!("slot_meta"));
self.submit_rocksdb_cf_metrics::<cf::DeadSlots>(rocksdb_cf_metric!("dead_slots"));
self.submit_rocksdb_cf_metrics::<cf::DuplicateSlots>(rocksdb_cf_metric!("duplicate_slots"));
self.submit_rocksdb_cf_metrics::<cf::ErasureMeta>(rocksdb_cf_metric!("erasure_meta"));
self.submit_rocksdb_cf_metrics::<cf::Orphans>(rocksdb_cf_metric!("orphans"));
self.submit_rocksdb_cf_metrics::<cf::BankHash>(rocksdb_cf_metric!("bank_hash"));
self.submit_rocksdb_cf_metrics::<cf::Root>(rocksdb_cf_metric!("root"));
self.submit_rocksdb_cf_metrics::<cf::Index>(rocksdb_cf_metric!("index"));
self.submit_rocksdb_cf_metrics::<cf::ShredData>(rocksdb_cf_metric!("shred_data"));
self.submit_rocksdb_cf_metrics::<cf::ShredCode>(rocksdb_cf_metric!("shred_code"));
self.submit_rocksdb_cf_metrics::<cf::TransactionStatus>(rocksdb_cf_metric!(
"transaction_status"
));
self.submit_rocksdb_cf_metrics::<cf::AddressSignatures>(rocksdb_cf_metric!(
"address_signature"
));
self.submit_rocksdb_cf_metrics::<cf::TransactionMemos>(rocksdb_cf_metric!(
"transaction_memos"
));
self.submit_rocksdb_cf_metrics::<cf::TransactionStatusIndex>(rocksdb_cf_metric!(
"transaction_status_index"
));
self.submit_rocksdb_cf_metrics::<cf::Rewards>(rocksdb_cf_metric!("rewards"));
self.submit_rocksdb_cf_metrics::<cf::Blocktime>(rocksdb_cf_metric!("blocktime"));
self.submit_rocksdb_cf_metrics::<cf::PerfSamples>(rocksdb_cf_metric!("perf_sample"));
self.submit_rocksdb_cf_metrics::<cf::BlockHeight>(rocksdb_cf_metric!("block_height"));
self.submit_rocksdb_cf_metrics::<cf::ProgramCosts>(rocksdb_cf_metric!("program_costs"));
}
/// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the
/// given column family.
fn submit_rocksdb_cf_metrics<C: 'static + Column + ColumnName>(
&self,
metric_name_and_cf_tag: &'static str,
) {
let cf = self.db.column::<C>();
let cf_rocksdb_metrics = BlockstoreRocksDbColumnFamilyMetrics {
total_sst_files_size: cf
.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
size_all_mem_tables: cf
.get_int_property(RocksProperties::SIZE_ALL_MEM_TABLES)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_snapshots: cf
.get_int_property(RocksProperties::NUM_SNAPSHOTS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
oldest_snapshot_time: cf
.get_int_property(RocksProperties::OLDEST_SNAPSHOT_TIME)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
actual_delayed_write_rate: cf
.get_int_property(RocksProperties::ACTUAL_DELAYED_WRITE_RATE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
is_write_stopped: cf
.get_int_property(RocksProperties::IS_WRITE_STOPPED)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_capacity: cf
.get_int_property(RocksProperties::BLOCK_CACHE_CAPACITY)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_usage: cf
.get_int_property(RocksProperties::BLOCK_CACHE_USAGE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_pinned_usage: cf
.get_int_property(RocksProperties::BLOCK_CACHE_PINNED_USAGE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
estimate_table_readers_mem: cf
.get_int_property(RocksProperties::ESTIMATE_TABLE_READERS_MEM)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
mem_table_flush_pending: cf
.get_int_property(RocksProperties::MEM_TABLE_FLUSH_PENDING)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
compaction_pending: cf
.get_int_property(RocksProperties::COMPACTION_PENDING)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_running_compactions: cf
.get_int_property(RocksProperties::NUM_RUNNING_COMPACTIONS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_running_flushes: cf
.get_int_property(RocksProperties::NUM_RUNNING_FLUSHES)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
estimate_oldest_key_time: cf
.get_int_property(RocksProperties::ESTIMATE_OLDEST_KEY_TIME)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
background_errors: cf
.get_int_property(RocksProperties::BACKGROUND_ERRORS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
};
cf_rocksdb_metrics.report_metrics(metric_name_and_cf_tag);
}
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<ErasureSetId, ErasureMeta>,
@ -3203,7 +3463,7 @@ impl Blockstore {
///
/// Note that the reported size does not include those recently inserted
/// shreds that are still in memory.
pub fn total_data_shred_storage_size(&self) -> Result<u64> {
pub fn total_data_shred_storage_size(&self) -> Result<i64> {
let shred_data_cf = self.db.column::<cf::ShredData>();
shred_data_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
}
@ -3212,7 +3472,7 @@ impl Blockstore {
///
/// Note that the reported size does not include those recently inserted
/// shreds that are still in memory.
pub fn total_coding_shred_storage_size(&self) -> Result<u64> {
pub fn total_coding_shred_storage_size(&self) -> Result<i64> {
let shred_code_cf = self.db.column::<cf::ShredCode>();
shred_code_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
}
@ -3942,6 +4202,13 @@ macro_rules! get_tmp_ledger_path_auto_delete {
};
}
macro_rules! rocksdb_cf_metric {
($tag_name:literal) => {
concat!("blockstore_rocksdb_cfs,cf_name=", $tag_name)
};
}
use rocksdb_cf_metric;
pub fn get_ledger_path_from_name_auto_delete(name: &str) -> TempDir {
let mut path = get_ledger_path_from_name(name);
// path is a directory so .file_name() returns the last component of the path

View File

@ -528,9 +528,9 @@ impl Rocks {
///
/// Full list of properties that return int values could be found
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
fn get_int_property_cf(&self, cf: &ColumnFamily, name: &str) -> Result<u64> {
fn get_int_property_cf(&self, cf: &ColumnFamily, name: &str) -> Result<i64> {
match self.0.property_int_value_cf(cf, name) {
Ok(Some(value)) => Ok(value),
Ok(Some(value)) => Ok(value.try_into().unwrap()),
Ok(None) => Ok(0),
Err(e) => Err(BlockstoreError::RocksDb(e)),
}
@ -1233,7 +1233,7 @@ where
///
/// Full list of properties that return int values could be found
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
pub fn get_int_property(&self, name: &str) -> Result<u64> {
pub fn get_int_property(&self, name: &str) -> Result<i64> {
self.backend.get_int_property_cf(self.handle(), name)
}
}