From eca0eb9585e138e1026306641d1ddabfef822aec Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com> Date: Mon, 2 May 2022 20:53:25 -0700 Subject: [PATCH] (LedgerStore) Move metric-related functions to blockstore_metric.rs (#24854) #### Problem blockstore_db.rs becomes bigger. #### Summary of Changes This PR creates blockstore_metric.rs and moves metric-related functions out from blockstore_db.rs. --- ledger/src/blockstore_db.rs | 341 +++---------------------------- ledger/src/blockstore_metrics.rs | 292 ++++++++++++++++++++++++++ ledger/src/lib.rs | 1 + 3 files changed, 317 insertions(+), 317 deletions(-) create mode 100644 ledger/src/blockstore_metrics.rs diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 608fdd67e4..6d06a650a1 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -1,6 +1,11 @@ pub use rocksdb::Direction as IteratorDirection; use { - crate::blockstore_meta, + crate::{ + blockstore_meta, + blockstore_metrics::{ + maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf, + }, + }, bincode::{deserialize, serialize}, byteorder::{BigEndian, ByteOrder}, log::*, @@ -741,13 +746,13 @@ impl Rocks { } fn write(&self, batch: RWriteBatch) -> Result<()> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_write_counter, ); let result = self.db.write(batch); if is_perf_enabled { - report_write_perf(rocksdb_metric_header!( + report_rocksdb_write_perf(rocksdb_metric_header!( "blockstore_rocksdb_write_perf,op=write_batch", "write_batch", self.column_options @@ -2203,13 +2208,13 @@ where C: Column + ColumnName + ColumnMetrics, { pub fn get_bytes(&self, key: C::Index) -> Result>> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_read_counter, ); let result = self.backend.get_cf(self.handle(), &C::key(key)); if is_perf_enabled { - report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); + report_rocksdb_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } result } @@ -2281,13 +2286,13 @@ where } pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_write_counter, ); let result = self.backend.put_cf(self.handle(), &C::key(key), value); if is_perf_enabled { - report_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); + report_rocksdb_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); } result } @@ -2302,311 +2307,13 @@ where } } -mod rocks_metrics_utils { - use { - rocksdb::{ - perf::{set_perf_stats, PerfMetric, PerfStatsLevel}, - PerfContext, - }, - std::{ - cell::RefCell, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - }, - }; - - // Thread local instance of RocksDB's PerfContext. - thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefCell::new(PerfContext::default());} - - /// The function enables RocksDB PerfContext once for every `sample_interval`. - /// - /// PerfContext is a thread-local struct defined in RocksDB for collecting - /// per-thread read / write performance metrics. - /// - /// When this function enables PerfContext, the function will return true, - /// and the PerfContext of the ubsequent RocksDB operation will be collected. - pub fn maybe_enable_perf( - sample_interval: usize, - perf_samples_counter: &Arc, - ) -> bool { - if sample_interval == 0 { - return false; - } - - if perf_samples_counter.fetch_add(1, Ordering::Relaxed) < sample_interval { - return false; - } - // Ideally, fetch_sub(*sample_interval) should be used to keep it - // super precise. However, since we do not use Mutex to protect the - // above check and the below operation, we simply reset it to 0. - perf_samples_counter.store(0, Ordering::Relaxed); - - set_perf_stats(PerfStatsLevel::EnableTime); - PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| { - perf_context.borrow_mut().reset(); - }); - true - } - - /// Reports the collected PerfContext and disables the PerfContext after - /// reporting. - pub(crate) fn report_read_perf(metric_header: &'static str) { - PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context_cell| { - set_perf_stats(PerfStatsLevel::Disable); - let perf_context = perf_context_cell.borrow(); - datapoint_info!( - metric_header, - ( - "user_key_comparison_count", - perf_context.metric(PerfMetric::UserKeyComparisonCount) as i64, - i64 - ), - ( - "block_cache_hit_count", - perf_context.metric(PerfMetric::BlockCacheHitCount) as i64, - i64 - ), - ( - "block_read_count", - perf_context.metric(PerfMetric::BlockReadCount) as i64, - i64 - ), - ( - "block_read_byte", - perf_context.metric(PerfMetric::BlockReadByte) as i64, - i64 - ), - ( - "block_read_nanos", - perf_context.metric(PerfMetric::BlockReadTime) as i64, - i64 - ), - ( - "block_checksum_nanos", - perf_context.metric(PerfMetric::BlockChecksumTime) as i64, - i64 - ), - ( - "block_decompress_nanos", - perf_context.metric(PerfMetric::BlockDecompressTime) as i64, - i64 - ), - ( - "get_read_bytes", - perf_context.metric(PerfMetric::GetReadBytes) as i64, - i64 - ), - ( - "multiget_read_bytes", - perf_context.metric(PerfMetric::MultigetReadBytes) as i64, - i64 - ), - ( - "get_snapshot_nanos", - perf_context.metric(PerfMetric::GetSnapshotTime) as i64, - i64 - ), - ( - "get_from_memtable_nanos", - perf_context.metric(PerfMetric::GetFromMemtableTime) as i64, - i64 - ), - ( - "get_from_memtable_count", - perf_context.metric(PerfMetric::GetFromMemtableCount) as i64, - i64 - ), - ( - // total nanos spent after Get() finds a key - "get_post_process_nanos", - perf_context.metric(PerfMetric::GetPostProcessTime) as i64, - i64 - ), - ( - // total nanos reading from output files - "get_from_output_files_nanos", - perf_context.metric(PerfMetric::GetFromOutputFilesTime) as i64, - i64 - ), - ( - // time spent on acquiring DB mutex - "db_mutex_lock_nanos", - perf_context.metric(PerfMetric::DbMutexLockNanos) as i64, - i64 - ), - ( - // time spent on waiting with a condition variable created with DB mutex. - "db_condition_wait_nanos", - perf_context.metric(PerfMetric::DbConditionWaitNanos) as i64, - i64 - ), - ( - "merge_operator_nanos", - perf_context.metric(PerfMetric::MergeOperatorTimeNanos) as i64, - i64 - ), - ( - "read_index_block_nanos", - perf_context.metric(PerfMetric::ReadIndexBlockNanos) as i64, - i64 - ), - ( - "read_filter_block_nanos", - perf_context.metric(PerfMetric::ReadFilterBlockNanos) as i64, - i64 - ), - ( - "new_table_block_iter_nanos", - perf_context.metric(PerfMetric::NewTableBlockIterNanos) as i64, - i64 - ), - ( - "block_seek_nanos", - perf_context.metric(PerfMetric::BlockSeekNanos) as i64, - i64 - ), - ( - "find_table_nanos", - perf_context.metric(PerfMetric::FindTableNanos) as i64, - i64 - ), - ( - "bloom_memtable_hit_count", - perf_context.metric(PerfMetric::BloomMemtableHitCount) as i64, - i64 - ), - ( - "bloom_memtable_miss_count", - perf_context.metric(PerfMetric::BloomMemtableMissCount) as i64, - i64 - ), - ( - "bloom_sst_hit_count", - perf_context.metric(PerfMetric::BloomSstHitCount) as i64, - i64 - ), - ( - "bloom_sst_miss_count", - perf_context.metric(PerfMetric::BloomSstMissCount) as i64, - i64 - ), - ( - "key_lock_wait_time", - perf_context.metric(PerfMetric::KeyLockWaitTime) as i64, - i64 - ), - ( - "key_lock_wait_count", - perf_context.metric(PerfMetric::KeyLockWaitCount) as i64, - i64 - ), - ( - "env_file_exists_nanos", - perf_context.metric(PerfMetric::EnvFileExistsNanos) as i64, - i64 - ), - ( - "env_get_children_nanos", - perf_context.metric(PerfMetric::EnvGetChildrenNanos) as i64, - i64 - ), - ( - "env_lock_file_nanos", - perf_context.metric(PerfMetric::EnvLockFileNanos) as i64, - i64 - ), - ( - "env_unlock_file_nanos", - perf_context.metric(PerfMetric::EnvUnlockFileNanos) as i64, - i64 - ), - ( - "total_metric_count", - perf_context.metric(PerfMetric::TotalMetricCount) as i64, - i64 - ), - ); - }); - } - /// Reports the collected PerfContext and disables the PerfContext after - /// reporting. - pub fn report_write_perf(metric_header: &'static str) { - PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context_cell| { - set_perf_stats(PerfStatsLevel::Disable); - let perf_context = perf_context_cell.borrow(); - datapoint_info!( - metric_header, - // total nanos spent on writing to WAL - ( - "write_wal_nanos", - perf_context.metric(PerfMetric::WriteWalTime) as i64, - i64 - ), - // total nanos spent on writing to mem tables - ( - "write_memtable_nanos", - perf_context.metric(PerfMetric::WriteMemtableTime) as i64, - i64 - ), - // total nanos spent on delaying or throttling write - ( - "write_delay_nanos", - perf_context.metric(PerfMetric::WriteDelayTime) as i64, - i64 - ), - // total nanos spent on writing a record, excluding the above four things - ( - "write_pre_and_post_process_nanos", - perf_context.metric(PerfMetric::WritePreAndPostProcessTime) as i64, - i64 - ), - // time spent on acquiring DB mutex. - ( - "db_mutex_lock_nanos", - perf_context.metric(PerfMetric::DbMutexLockNanos) as i64, - i64 - ), - // Time spent on waiting with a condition variable created with DB mutex. - ( - "db_condition_wait_nanos", - perf_context.metric(PerfMetric::DbConditionWaitNanos) as i64, - i64 - ), - // Time spent on merge operator. - ( - "merge_operator_nanos_nanos", - perf_context.metric(PerfMetric::MergeOperatorTimeNanos) as i64, - i64 - ), - // Time spent waiting on key locks in transaction lock manager. - ( - "key_lock_wait_nanos", - perf_context.metric(PerfMetric::KeyLockWaitTime) as i64, - i64 - ), - // number of times acquiring a lock was blocked by another transaction. - ( - "key_lock_wait_count", - perf_context.metric(PerfMetric::KeyLockWaitCount) as i64, - i64 - ), - ); - }); - } -} -use crate::blockstore_db::rocks_metrics_utils::{ - maybe_enable_perf, report_read_perf, report_write_perf, -}; - impl LedgerColumn where C: TypedColumn + ColumnName + ColumnMetrics, { pub fn get(&self, key: C::Index) -> Result> { let mut result = Ok(None); - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_read_counter, ); @@ -2617,13 +2324,13 @@ where } if is_perf_enabled { - report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); + report_rocksdb_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } result } pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_write_counter, ); @@ -2634,19 +2341,19 @@ where .put_cf(self.handle(), &C::key(key), &serialized_value); if is_perf_enabled { - report_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); + report_rocksdb_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); } result } pub fn delete(&self, key: C::Index) -> Result<()> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_write_counter, ); let result = self.backend.delete_cf(self.handle(), &C::key(key)); if is_perf_enabled { - report_write_perf(C::rocksdb_delete_perf_metric_header(&self.column_options)); + report_rocksdb_write_perf(C::rocksdb_delete_perf_metric_header(&self.column_options)); } result } @@ -2660,13 +2367,13 @@ where &self, key: C::Index, ) -> Result> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_read_counter, ); let result = self.backend.get_cf(self.handle(), &C::key(key)); if is_perf_enabled { - report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); + report_rocksdb_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } if let Some(serialized_value) = result? { @@ -2681,13 +2388,13 @@ where } pub fn get_protobuf(&self, key: C::Index) -> Result> { - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_read_counter, ); let result = self.backend.get_cf(self.handle(), &C::key(key)); if is_perf_enabled { - report_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); + report_rocksdb_read_perf(C::rocksdb_get_perf_metric_header(&self.column_options)); } if let Some(serialized_value) = result? { @@ -2701,13 +2408,13 @@ where let mut buf = Vec::with_capacity(value.encoded_len()); value.encode(&mut buf)?; - let is_perf_enabled = maybe_enable_perf( + let is_perf_enabled = maybe_enable_rocksdb_perf( self.column_options.rocks_perf_sample_interval, &self.column_options.perf_write_counter, ); let result = self.backend.put_cf(self.handle(), &C::key(key), &buf); if is_perf_enabled { - report_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); + report_rocksdb_write_perf(C::rocksdb_put_perf_metric_header(&self.column_options)); } result diff --git a/ledger/src/blockstore_metrics.rs b/ledger/src/blockstore_metrics.rs new file mode 100644 index 0000000000..e9f0c84c37 --- /dev/null +++ b/ledger/src/blockstore_metrics.rs @@ -0,0 +1,292 @@ +use { + rocksdb::{ + perf::{set_perf_stats, PerfMetric, PerfStatsLevel}, + PerfContext, + }, + std::{ + cell::RefCell, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + }, +}; + +// Thread local instance of RocksDB's PerfContext. +thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefCell::new(PerfContext::default());} + +/// The function enables RocksDB PerfContext once for every `sample_interval`. +/// +/// PerfContext is a thread-local struct defined in RocksDB for collecting +/// per-thread read / write performance metrics. +/// +/// When this function enables PerfContext, the function will return true, +/// and the PerfContext of the ubsequent RocksDB operation will be collected. +pub(crate) fn maybe_enable_rocksdb_perf( + sample_interval: usize, + perf_samples_counter: &Arc, +) -> bool { + if sample_interval == 0 { + return false; + } + + if perf_samples_counter.fetch_add(1, Ordering::Relaxed) < sample_interval { + return false; + } + // Ideally, fetch_sub(*sample_interval) should be used to keep it + // super precise. However, since we do not use Mutex to protect the + // above check and the below operation, we simply reset it to 0. + perf_samples_counter.store(0, Ordering::Relaxed); + + set_perf_stats(PerfStatsLevel::EnableTime); + PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context| { + perf_context.borrow_mut().reset(); + }); + true +} + +/// Reports the collected PerfContext and disables the PerfContext after +/// reporting. +pub(crate) fn report_rocksdb_read_perf(metric_header: &'static str) { + PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context_cell| { + set_perf_stats(PerfStatsLevel::Disable); + let perf_context = perf_context_cell.borrow(); + datapoint_info!( + metric_header, + ( + "user_key_comparison_count", + perf_context.metric(PerfMetric::UserKeyComparisonCount) as i64, + i64 + ), + ( + "block_cache_hit_count", + perf_context.metric(PerfMetric::BlockCacheHitCount) as i64, + i64 + ), + ( + "block_read_count", + perf_context.metric(PerfMetric::BlockReadCount) as i64, + i64 + ), + ( + "block_read_byte", + perf_context.metric(PerfMetric::BlockReadByte) as i64, + i64 + ), + ( + "block_read_nanos", + perf_context.metric(PerfMetric::BlockReadTime) as i64, + i64 + ), + ( + "block_checksum_nanos", + perf_context.metric(PerfMetric::BlockChecksumTime) as i64, + i64 + ), + ( + "block_decompress_nanos", + perf_context.metric(PerfMetric::BlockDecompressTime) as i64, + i64 + ), + ( + "get_read_bytes", + perf_context.metric(PerfMetric::GetReadBytes) as i64, + i64 + ), + ( + "multiget_read_bytes", + perf_context.metric(PerfMetric::MultigetReadBytes) as i64, + i64 + ), + ( + "get_snapshot_nanos", + perf_context.metric(PerfMetric::GetSnapshotTime) as i64, + i64 + ), + ( + "get_from_memtable_nanos", + perf_context.metric(PerfMetric::GetFromMemtableTime) as i64, + i64 + ), + ( + "get_from_memtable_count", + perf_context.metric(PerfMetric::GetFromMemtableCount) as i64, + i64 + ), + ( + // total nanos spent after Get() finds a key + "get_post_process_nanos", + perf_context.metric(PerfMetric::GetPostProcessTime) as i64, + i64 + ), + ( + // total nanos reading from output files + "get_from_output_files_nanos", + perf_context.metric(PerfMetric::GetFromOutputFilesTime) as i64, + i64 + ), + ( + // time spent on acquiring DB mutex + "db_mutex_lock_nanos", + perf_context.metric(PerfMetric::DbMutexLockNanos) as i64, + i64 + ), + ( + // time spent on waiting with a condition variable created with DB mutex. + "db_condition_wait_nanos", + perf_context.metric(PerfMetric::DbConditionWaitNanos) as i64, + i64 + ), + ( + "merge_operator_nanos", + perf_context.metric(PerfMetric::MergeOperatorTimeNanos) as i64, + i64 + ), + ( + "read_index_block_nanos", + perf_context.metric(PerfMetric::ReadIndexBlockNanos) as i64, + i64 + ), + ( + "read_filter_block_nanos", + perf_context.metric(PerfMetric::ReadFilterBlockNanos) as i64, + i64 + ), + ( + "new_table_block_iter_nanos", + perf_context.metric(PerfMetric::NewTableBlockIterNanos) as i64, + i64 + ), + ( + "block_seek_nanos", + perf_context.metric(PerfMetric::BlockSeekNanos) as i64, + i64 + ), + ( + "find_table_nanos", + perf_context.metric(PerfMetric::FindTableNanos) as i64, + i64 + ), + ( + "bloom_memtable_hit_count", + perf_context.metric(PerfMetric::BloomMemtableHitCount) as i64, + i64 + ), + ( + "bloom_memtable_miss_count", + perf_context.metric(PerfMetric::BloomMemtableMissCount) as i64, + i64 + ), + ( + "bloom_sst_hit_count", + perf_context.metric(PerfMetric::BloomSstHitCount) as i64, + i64 + ), + ( + "bloom_sst_miss_count", + perf_context.metric(PerfMetric::BloomSstMissCount) as i64, + i64 + ), + ( + "key_lock_wait_time", + perf_context.metric(PerfMetric::KeyLockWaitTime) as i64, + i64 + ), + ( + "key_lock_wait_count", + perf_context.metric(PerfMetric::KeyLockWaitCount) as i64, + i64 + ), + ( + "env_file_exists_nanos", + perf_context.metric(PerfMetric::EnvFileExistsNanos) as i64, + i64 + ), + ( + "env_get_children_nanos", + perf_context.metric(PerfMetric::EnvGetChildrenNanos) as i64, + i64 + ), + ( + "env_lock_file_nanos", + perf_context.metric(PerfMetric::EnvLockFileNanos) as i64, + i64 + ), + ( + "env_unlock_file_nanos", + perf_context.metric(PerfMetric::EnvUnlockFileNanos) as i64, + i64 + ), + ( + "total_metric_count", + perf_context.metric(PerfMetric::TotalMetricCount) as i64, + i64 + ), + ); + }); +} +/// Reports the collected PerfContext and disables the PerfContext after +/// reporting. +pub(crate) fn report_rocksdb_write_perf(metric_header: &'static str) { + PER_THREAD_ROCKS_PERF_CONTEXT.with(|perf_context_cell| { + set_perf_stats(PerfStatsLevel::Disable); + let perf_context = perf_context_cell.borrow(); + datapoint_info!( + metric_header, + // total nanos spent on writing to WAL + ( + "write_wal_nanos", + perf_context.metric(PerfMetric::WriteWalTime) as i64, + i64 + ), + // total nanos spent on writing to mem tables + ( + "write_memtable_nanos", + perf_context.metric(PerfMetric::WriteMemtableTime) as i64, + i64 + ), + // total nanos spent on delaying or throttling write + ( + "write_delay_nanos", + perf_context.metric(PerfMetric::WriteDelayTime) as i64, + i64 + ), + // total nanos spent on writing a record, excluding the above four things + ( + "write_pre_and_post_process_nanos", + perf_context.metric(PerfMetric::WritePreAndPostProcessTime) as i64, + i64 + ), + // time spent on acquiring DB mutex. + ( + "db_mutex_lock_nanos", + perf_context.metric(PerfMetric::DbMutexLockNanos) as i64, + i64 + ), + // Time spent on waiting with a condition variable created with DB mutex. + ( + "db_condition_wait_nanos", + perf_context.metric(PerfMetric::DbConditionWaitNanos) as i64, + i64 + ), + // Time spent on merge operator. + ( + "merge_operator_nanos_nanos", + perf_context.metric(PerfMetric::MergeOperatorTimeNanos) as i64, + i64 + ), + // Time spent waiting on key locks in transaction lock manager. + ( + "key_lock_wait_nanos", + perf_context.metric(PerfMetric::KeyLockWaitTime) as i64, + i64 + ), + // number of times acquiring a lock was blocked by another transaction. + ( + "key_lock_wait_count", + perf_context.metric(PerfMetric::KeyLockWaitCount) as i64, + i64 + ), + ); + }); +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index e4db0a94e8..e423723c0e 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -13,6 +13,7 @@ pub mod blockstore; pub mod ancestor_iterator; pub mod blockstore_db; pub mod blockstore_meta; +pub mod blockstore_metrics; pub mod blockstore_processor; pub mod builtins; pub mod genesis_utils;