From b8b7163b66f338c92a6aa3def489d592b2a5dc94 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com> Date: Sat, 5 Mar 2022 16:13:03 -0800 Subject: [PATCH] (Ledger Store) Report RocksDB Column Family Metrics (#22503) This PR enables blockstore to periodically report RocksDB column family properties. The reported properties are under blockstore_rocksdb_cfs, and the properties also support group by operation on cf_name. --- core/src/ledger_metric_report_service.rs | 44 ++++ core/src/lib.rs | 1 + core/src/tvu.rs | 6 + core/tests/ledger_cleanup.rs | 2 +- ledger/src/blockstore.rs | 275 ++++++++++++++++++++++- ledger/src/blockstore_db.rs | 6 +- 6 files changed, 326 insertions(+), 8 deletions(-) create mode 100644 core/src/ledger_metric_report_service.rs diff --git a/core/src/ledger_metric_report_service.rs b/core/src/ledger_metric_report_service.rs new file mode 100644 index 0000000000..2493d0e1dd --- /dev/null +++ b/core/src/ledger_metric_report_service.rs @@ -0,0 +1,44 @@ +//! The `ledger_metric_report_service` periodically reports ledger store metrics. + +use { + solana_ledger::blockstore::Blockstore, + std::{ + string::ToString, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, + }, +}; + +// Determines how often we report blockstore metrics. +const BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS: u64 = 10000; + +pub struct LedgerMetricReportService { + t_cf_metric: JoinHandle<()>, +} + +impl LedgerMetricReportService { + pub fn new(blockstore: Arc, exit: &Arc) -> Self { + let exit_signal = exit.clone(); + let t_cf_metric = Builder::new() + .name("metric_report_rocksdb_cf_metrics".to_string()) + .spawn(move || loop { + if exit_signal.load(Ordering::Relaxed) { + break; + } + thread::sleep(Duration::from_millis( + BLOCKSTORE_METRICS_REPORT_PERIOD_MILLIS, + )); + blockstore.submit_rocksdb_cf_metrics_for_all_cfs(); + }) + .unwrap(); + Self { t_cf_metric } + } + + pub fn join(self) -> thread::Result<()> { + self.t_cf_metric.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 54453bd6c4..8cc6f046cd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -31,6 +31,7 @@ pub mod latest_validator_votes_for_frozen_banks; pub mod leader_slot_banking_stage_metrics; pub mod leader_slot_banking_stage_timing_metrics; pub mod ledger_cleanup_service; +pub mod ledger_metric_report_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; pub mod packet_hasher; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0b7942f3d1..43d681edcb 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -16,6 +16,7 @@ use { cost_update_service::CostUpdateService, drop_bank_service::DropBankService, ledger_cleanup_service::LedgerCleanupService, + ledger_metric_report_service::LedgerMetricReportService, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, rewards_recorder_service::RewardsRecorderSender, @@ -70,6 +71,7 @@ pub struct Tvu { retransmit_stage: RetransmitStage, replay_stage: ReplayStage, ledger_cleanup_service: Option, + ledger_metric_report_service: LedgerMetricReportService, accounts_background_service: AccountsBackgroundService, accounts_hash_verifier: AccountsHashVerifier, cost_update_service: CostUpdateService, @@ -357,6 +359,8 @@ impl Tvu { ) }); + let ledger_metric_report_service = LedgerMetricReportService::new(blockstore, exit); + let accounts_background_service = AccountsBackgroundService::new( bank_forks.clone(), exit, @@ -373,6 +377,7 @@ impl Tvu { retransmit_stage, replay_stage, ledger_cleanup_service, + ledger_metric_report_service, accounts_background_service, accounts_hash_verifier, cost_update_service, @@ -389,6 +394,7 @@ impl Tvu { if self.ledger_cleanup_service.is_some() { self.ledger_cleanup_service.unwrap().join()?; } + self.ledger_metric_report_service.join()?; self.accounts_background_service.join()?; self.replay_stage.join()?; self.accounts_hash_verifier.join()?; diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index b28fa271b9..f9fbef8b79 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -294,7 +294,7 @@ mod tests { *time_previous = time_now; *storage_previous = storage_now; - *data_shred_storage_previous = data_shred_storage_now; + *data_shred_storage_previous = data_shred_storage_now.try_into().unwrap(); } /// Helper function of the benchmark `test_ledger_cleanup_compaction` which diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e8df781d1f..ca1be6d196 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -5,8 +5,8 @@ use { crate::{ ancestor_iterator::AncestorIterator, blockstore_db::{ - columns as cf, AccessType, BlockstoreOptions, Column, Database, IteratorDirection, - IteratorMode, LedgerColumn, Result, ShredStorageType, WriteBatch, + columns as cf, AccessType, BlockstoreOptions, Column, ColumnName, Database, + IteratorDirection, IteratorMode, LedgerColumn, Result, ShredStorageType, WriteBatch, }, blockstore_meta::*, leader_schedule_cache::LeaderScheduleCache, @@ -74,6 +74,7 @@ pub mod blockstore_purge; pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb"; pub const BLOCKSTORE_DIRECTORY_ROCKS_FIFO: &str = "rocksdb_fifo"; +pub const BLOCKSTORE_METRICS_ERROR: i64 = -1; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() .num_threads(get_thread_count()) @@ -255,6 +256,101 @@ pub struct BlockstoreInsertionMetrics { num_coding_shreds_inserted: usize, } +#[derive(Default)] +/// A metrics struct that exposes RocksDB's column family properties. +/// +/// Here we only expose a subset of all the internal properties which are +/// relevant to the ledger store performance. +/// +/// The list of completed RocksDB internal properties can be found +/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689). +pub struct BlockstoreRocksDbColumnFamilyMetrics { + // Size related + + // The storage size occupied by the column family. + // RocksDB's internal property key: "rocksdb.total-sst-files-size" + pub total_sst_files_size: i64, + // The memory size occupied by the column family's in-memory buffer. + // RocksDB's internal property key: "rocksdb.size-all-mem-tables" + pub size_all_mem_tables: i64, + + // Snapshot related + + // Number of snapshots hold for the column family. + // RocksDB's internal property key: "rocksdb.num-snapshots" + pub num_snapshots: i64, + // Unit timestamp of the oldest unreleased snapshot. + // RocksDB's internal property key: "rocksdb.oldest-snapshot-time" + pub oldest_snapshot_time: i64, + + // Write related + + // The current actual delayed write rate. 0 means no delay. + // RocksDB's internal property key: "rocksdb.actual-delayed-write-rate" + pub actual_delayed_write_rate: i64, + // A flag indicating whether writes are stopped on this column family. + // 1 indicates writes have been stopped. + // RocksDB's internal property key: "rocksdb.is-write-stopped" + pub is_write_stopped: i64, + + // Memory / block cache related + + // The block cache capacity of the column family. + // RocksDB's internal property key: "rocksdb.block-cache-capacity" + pub block_cache_capacity: i64, + // The memory size used by the column family in the block cache. + // RocksDB's internal property key: "rocksdb.block-cache-usage" + pub block_cache_usage: i64, + // The memory size used by the column family in the block cache where + // entries are pinned. + // RocksDB's internal property key: "rocksdb.block-cache-pinned-usage" + pub block_cache_pinned_usage: i64, + + // The estimated memory size used for reading SST tables in this column + // family such as filters and index blocks. Note that this number does not + // include the memory used in block cache. + // RocksDB's internal property key: "rocksdb.estimate-table-readers-mem" + pub estimate_table_readers_mem: i64, + + // Flush and compaction + + // A 1 or 0 flag indicating whether a memtable flush is pending. + // If this number is 1, it means a memtable is waiting for being flushed, + // but there might be too many L0 files that prevents it from being flushed. + // RocksDB's internal property key: "rocksdb.mem-table-flush-pending" + pub mem_table_flush_pending: i64, + + // A 1 or 0 flag indicating whether a compaction job is pending. + // If this number is 1, it means some part of the column family requires + // compaction in order to maintain shape of LSM tree, but the compaction + // is pending because the desired compaction job is either waiting for + // other dependnent compactions to be finished or waiting for an available + // compaction thread. + // RocksDB's internal property key: "rocksdb.compaction-pending" + pub compaction_pending: i64, + + // The number of compactions that are currently running for the column family. + // RocksDB's internal property key: "rocksdb.num-running-compactions" + pub num_running_compactions: i64, + + // The number of flushes that are currently running for the column family. + // RocksDB's internal property key: "rocksdb.num-running-flushes" + pub num_running_flushes: i64, + + // FIFO Compaction related + + // returns an estimation of the oldest key timestamp in the DB. Only vailable + // for FIFO compaction with compaction_options_fifo.allow_compaction = false. + // RocksDB's internal property key: "rocksdb.estimate-oldest-key-time" + pub estimate_oldest_key_time: i64, + + // Misc + + // The accumulated number of RocksDB background errors. + // RocksDB's internal property key: "rocksdb.background-errors" + pub background_errors: i64, +} + impl SlotMetaWorkingSetEntry { /// Construct a new SlotMetaWorkingSetEntry with the specified `new_slot_meta` /// and `old_slot_meta`. `did_insert_occur` is set to false. @@ -350,6 +446,78 @@ impl BlockstoreInsertionMetrics { } } +impl BlockstoreRocksDbColumnFamilyMetrics { + /// Report metrics with the specified metric name and column family tag. + /// The metric name and the column family tag is embeded in the parameter + /// `metric_name_and_cf_tag` with the following format. + /// + /// For example, "blockstore_rocksdb_cfs,cf_name=shred_data". + pub fn report_metrics(&self, metric_name_and_cf_tag: &'static str) { + datapoint_info!( + metric_name_and_cf_tag, + // Size related + ( + "total_sst_files_size", + self.total_sst_files_size as i64, + i64 + ), + ("size_all_mem_tables", self.size_all_mem_tables as i64, i64), + // Snapshot related + ("num_snapshots", self.num_snapshots as i64, i64), + ( + "oldest_snapshot_time", + self.oldest_snapshot_time as i64, + i64 + ), + // Write related + ( + "actual_delayed_write_rate", + self.actual_delayed_write_rate as i64, + i64 + ), + ("is_write_stopped", self.is_write_stopped as i64, i64), + // Memory / block cache related + ( + "block_cache_capacity", + self.block_cache_capacity as i64, + i64 + ), + ("block_cache_usage", self.block_cache_usage as i64, i64), + ( + "block_cache_pinned_usage", + self.block_cache_pinned_usage as i64, + i64 + ), + ( + "estimate_table_readers_mem", + self.estimate_table_readers_mem as i64, + i64 + ), + // Flush and compaction + ( + "mem_table_flush_pending", + self.mem_table_flush_pending as i64, + i64 + ), + ("compaction_pending", self.compaction_pending as i64, i64), + ( + "num_running_compactions", + self.num_running_compactions as i64, + i64 + ), + ("num_running_flushes", self.num_running_flushes as i64, i64), + // FIFO Compaction related + ( + "estimate_oldest_key_time", + self.estimate_oldest_key_time as i64, + i64 + ), + // Misc + ("background_errors", self.background_errors as i64, i64), + ); + } +} + impl Blockstore { pub fn db(self) -> Arc { self.db @@ -766,6 +934,98 @@ impl Blockstore { ); } + /// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the + /// all the column families. + pub fn submit_rocksdb_cf_metrics_for_all_cfs(&self) { + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("slot_meta")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("dead_slots")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("duplicate_slots")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("erasure_meta")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("orphans")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("bank_hash")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("root")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("index")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("shred_data")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("shred_code")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!( + "transaction_status" + )); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!( + "address_signature" + )); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!( + "transaction_memos" + )); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!( + "transaction_status_index" + )); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("rewards")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("blocktime")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("perf_sample")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("block_height")); + self.submit_rocksdb_cf_metrics::(rocksdb_cf_metric!("program_costs")); + } + + /// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the + /// given column family. + fn submit_rocksdb_cf_metrics( + &self, + metric_name_and_cf_tag: &'static str, + ) { + let cf = self.db.column::(); + let cf_rocksdb_metrics = BlockstoreRocksDbColumnFamilyMetrics { + total_sst_files_size: cf + .get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + size_all_mem_tables: cf + .get_int_property(RocksProperties::SIZE_ALL_MEM_TABLES) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + num_snapshots: cf + .get_int_property(RocksProperties::NUM_SNAPSHOTS) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + oldest_snapshot_time: cf + .get_int_property(RocksProperties::OLDEST_SNAPSHOT_TIME) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + actual_delayed_write_rate: cf + .get_int_property(RocksProperties::ACTUAL_DELAYED_WRITE_RATE) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + is_write_stopped: cf + .get_int_property(RocksProperties::IS_WRITE_STOPPED) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + block_cache_capacity: cf + .get_int_property(RocksProperties::BLOCK_CACHE_CAPACITY) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + block_cache_usage: cf + .get_int_property(RocksProperties::BLOCK_CACHE_USAGE) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + block_cache_pinned_usage: cf + .get_int_property(RocksProperties::BLOCK_CACHE_PINNED_USAGE) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + estimate_table_readers_mem: cf + .get_int_property(RocksProperties::ESTIMATE_TABLE_READERS_MEM) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + mem_table_flush_pending: cf + .get_int_property(RocksProperties::MEM_TABLE_FLUSH_PENDING) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + compaction_pending: cf + .get_int_property(RocksProperties::COMPACTION_PENDING) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + num_running_compactions: cf + .get_int_property(RocksProperties::NUM_RUNNING_COMPACTIONS) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + num_running_flushes: cf + .get_int_property(RocksProperties::NUM_RUNNING_FLUSHES) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + estimate_oldest_key_time: cf + .get_int_property(RocksProperties::ESTIMATE_OLDEST_KEY_TIME) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + background_errors: cf + .get_int_property(RocksProperties::BACKGROUND_ERRORS) + .unwrap_or(BLOCKSTORE_METRICS_ERROR), + }; + cf_rocksdb_metrics.report_metrics(metric_name_and_cf_tag); + } + fn try_shred_recovery( db: &Database, erasure_metas: &HashMap, @@ -3203,7 +3463,7 @@ impl Blockstore { /// /// Note that the reported size does not include those recently inserted /// shreds that are still in memory. - pub fn total_data_shred_storage_size(&self) -> Result { + pub fn total_data_shred_storage_size(&self) -> Result { let shred_data_cf = self.db.column::(); shred_data_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) } @@ -3212,7 +3472,7 @@ impl Blockstore { /// /// Note that the reported size does not include those recently inserted /// shreds that are still in memory. - pub fn total_coding_shred_storage_size(&self) -> Result { + pub fn total_coding_shred_storage_size(&self) -> Result { let shred_code_cf = self.db.column::(); shred_code_cf.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE) } @@ -3942,6 +4202,13 @@ macro_rules! get_tmp_ledger_path_auto_delete { }; } +macro_rules! rocksdb_cf_metric { + ($tag_name:literal) => { + concat!("blockstore_rocksdb_cfs,cf_name=", $tag_name) + }; +} +use rocksdb_cf_metric; + pub fn get_ledger_path_from_name_auto_delete(name: &str) -> TempDir { let mut path = get_ledger_path_from_name(name); // path is a directory so .file_name() returns the last component of the path diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index c59a2e4345..92a39b8324 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -528,9 +528,9 @@ impl Rocks { /// /// Full list of properties that return int values could be found /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689). - fn get_int_property_cf(&self, cf: &ColumnFamily, name: &str) -> Result { + fn get_int_property_cf(&self, cf: &ColumnFamily, name: &str) -> Result { match self.0.property_int_value_cf(cf, name) { - Ok(Some(value)) => Ok(value), + Ok(Some(value)) => Ok(value.try_into().unwrap()), Ok(None) => Ok(0), Err(e) => Err(BlockstoreError::RocksDb(e)), } @@ -1233,7 +1233,7 @@ where /// /// Full list of properties that return int values could be found /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689). - pub fn get_int_property(&self, name: &str) -> Result { + pub fn get_int_property(&self, name: &str) -> Result { self.backend.get_int_property_cf(self.handle(), name) } }