(LedgerStore) Create ColumnMetrics trait for CF metric reporting (#23763)

This PR does a refactoring on column family-related metrics reporting.
As the metric reporting is per column family basis, the PR creates
ColumnMetrics trait and move the metric reporting logic into it.

This refactoring will make future column metric reporting (such as
read PerfContext) much cleaner.
This commit is contained in:
Yueh-Hsuan Chiang 2022-03-23 20:51:49 -07:00 committed by GitHub
parent 5a892af2fe
commit c83c95b56b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 548 additions and 398 deletions

View File

@ -5,9 +5,8 @@ use {
crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, AccessType, BlockstoreCompressionType, BlockstoreOptions, Column,
ColumnName, Database, IteratorDirection, IteratorMode, LedgerColumn,
LedgerColumnOptions, Result, ShredStorageType, WriteBatch,
columns as cf, AccessType, BlockstoreOptions, Column, Database, IteratorDirection,
IteratorMode, LedgerColumn, LedgerColumnOptions, Result, ShredStorageType, WriteBatch,
},
blockstore_meta::*,
leader_schedule_cache::LeaderScheduleCache,
@ -75,7 +74,6 @@ pub mod blockstore_purge;
pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
pub const BLOCKSTORE_DIRECTORY_ROCKS_FIFO: &str = "rocksdb_fifo";
pub const BLOCKSTORE_METRICS_ERROR: i64 = -1;
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
@ -178,7 +176,6 @@ pub struct Blockstore {
pub lowest_cleanup_slot: RwLock<Slot>,
no_compaction: bool,
slots_stats: Mutex<SlotsStats>,
column_options: LedgerColumnOptions,
}
struct SlotsStats {
@ -258,101 +255,6 @@ pub struct BlockstoreInsertionMetrics {
num_coding_shreds_inserted: usize,
}
#[derive(Default)]
/// A metrics struct that exposes RocksDB's column family properties.
///
/// Here we only expose a subset of all the internal properties which are
/// relevant to the ledger store performance.
///
/// The list of completed RocksDB internal properties can be found
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
pub struct BlockstoreRocksDbColumnFamilyMetrics {
// Size related
// The storage size occupied by the column family.
// RocksDB's internal property key: "rocksdb.total-sst-files-size"
pub total_sst_files_size: i64,
// The memory size occupied by the column family's in-memory buffer.
// RocksDB's internal property key: "rocksdb.size-all-mem-tables"
pub size_all_mem_tables: i64,
// Snapshot related
// Number of snapshots hold for the column family.
// RocksDB's internal property key: "rocksdb.num-snapshots"
pub num_snapshots: i64,
// Unit timestamp of the oldest unreleased snapshot.
// RocksDB's internal property key: "rocksdb.oldest-snapshot-time"
pub oldest_snapshot_time: i64,
// Write related
// The current actual delayed write rate. 0 means no delay.
// RocksDB's internal property key: "rocksdb.actual-delayed-write-rate"
pub actual_delayed_write_rate: i64,
// A flag indicating whether writes are stopped on this column family.
// 1 indicates writes have been stopped.
// RocksDB's internal property key: "rocksdb.is-write-stopped"
pub is_write_stopped: i64,
// Memory / block cache related
// The block cache capacity of the column family.
// RocksDB's internal property key: "rocksdb.block-cache-capacity"
pub block_cache_capacity: i64,
// The memory size used by the column family in the block cache.
// RocksDB's internal property key: "rocksdb.block-cache-usage"
pub block_cache_usage: i64,
// The memory size used by the column family in the block cache where
// entries are pinned.
// RocksDB's internal property key: "rocksdb.block-cache-pinned-usage"
pub block_cache_pinned_usage: i64,
// The estimated memory size used for reading SST tables in this column
// family such as filters and index blocks. Note that this number does not
// include the memory used in block cache.
// RocksDB's internal property key: "rocksdb.estimate-table-readers-mem"
pub estimate_table_readers_mem: i64,
// Flush and compaction
// A 1 or 0 flag indicating whether a memtable flush is pending.
// If this number is 1, it means a memtable is waiting for being flushed,
// but there might be too many L0 files that prevents it from being flushed.
// RocksDB's internal property key: "rocksdb.mem-table-flush-pending"
pub mem_table_flush_pending: i64,
// A 1 or 0 flag indicating whether a compaction job is pending.
// If this number is 1, it means some part of the column family requires
// compaction in order to maintain shape of LSM tree, but the compaction
// is pending because the desired compaction job is either waiting for
// other dependnent compactions to be finished or waiting for an available
// compaction thread.
// RocksDB's internal property key: "rocksdb.compaction-pending"
pub compaction_pending: i64,
// The number of compactions that are currently running for the column family.
// RocksDB's internal property key: "rocksdb.num-running-compactions"
pub num_running_compactions: i64,
// The number of flushes that are currently running for the column family.
// RocksDB's internal property key: "rocksdb.num-running-flushes"
pub num_running_flushes: i64,
// FIFO Compaction related
// returns an estimation of the oldest key timestamp in the DB. Only vailable
// for FIFO compaction with compaction_options_fifo.allow_compaction = false.
// RocksDB's internal property key: "rocksdb.estimate-oldest-key-time"
pub estimate_oldest_key_time: i64,
// Misc
// The accumulated number of RocksDB background errors.
// RocksDB's internal property key: "rocksdb.background-errors"
pub background_errors: i64,
}
impl SlotMetaWorkingSetEntry {
/// Construct a new SlotMetaWorkingSetEntry with the specified `new_slot_meta`
/// and `old_slot_meta`. `did_insert_occur` is set to false.
@ -448,127 +350,6 @@ impl BlockstoreInsertionMetrics {
}
}
impl BlockstoreRocksDbColumnFamilyMetrics {
/// Report metrics with the specified metric name and column family tag.
/// The metric name and the column family tag is embeded in the parameter
/// `metric_name_and_cf_tag` with the following format.
///
/// For example, "blockstore_rocksdb_cfs,cf_name=shred_data".
pub fn report_metrics(&self, metric_name_and_cf_tag: &'static str) {
datapoint_info!(
metric_name_and_cf_tag,
// Size related
(
"total_sst_files_size",
self.total_sst_files_size as i64,
i64
),
("size_all_mem_tables", self.size_all_mem_tables as i64, i64),
// Snapshot related
("num_snapshots", self.num_snapshots as i64, i64),
(
"oldest_snapshot_time",
self.oldest_snapshot_time as i64,
i64
),
// Write related
(
"actual_delayed_write_rate",
self.actual_delayed_write_rate as i64,
i64
),
("is_write_stopped", self.is_write_stopped as i64, i64),
// Memory / block cache related
(
"block_cache_capacity",
self.block_cache_capacity as i64,
i64
),
("block_cache_usage", self.block_cache_usage as i64, i64),
(
"block_cache_pinned_usage",
self.block_cache_pinned_usage as i64,
i64
),
(
"estimate_table_readers_mem",
self.estimate_table_readers_mem as i64,
i64
),
// Flush and compaction
(
"mem_table_flush_pending",
self.mem_table_flush_pending as i64,
i64
),
("compaction_pending", self.compaction_pending as i64, i64),
(
"num_running_compactions",
self.num_running_compactions as i64,
i64
),
("num_running_flushes", self.num_running_flushes as i64, i64),
// FIFO Compaction related
(
"estimate_oldest_key_time",
self.estimate_oldest_key_time as i64,
i64
),
// Misc
("background_errors", self.background_errors as i64, i64),
);
}
}
macro_rules! rocksdb_metric_header {
($metric_name:literal, $cf_name:literal, $column_options:expr) => {
match $column_options.shred_storage_type {
ShredStorageType::RocksLevel =>
rocksdb_metric_header!(@compression_type $metric_name, $cf_name, $column_options, "rocks_level"),
ShredStorageType::RocksFifo(_) =>
rocksdb_metric_header!(@compression_type $metric_name, $cf_name, $column_options, "rocks_fifo"),
}
};
(@compression_type $metric_name:literal, $cf_name:literal, $column_options:expr, $storage_type:literal) => {
match $column_options.compression_type {
BlockstoreCompressionType::None => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"None"
),
BlockstoreCompressionType::Snappy => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"Snappy"
),
BlockstoreCompressionType::Lz4 => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"Lz4"
),
BlockstoreCompressionType::Zlib => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"Zlib"
),
}
};
(@all_fields $metric_name:literal, $cf_name:literal, $storage_type:literal, $compression_type:literal) => {
concat!($metric_name,
",cf_name=", $cf_name,
",storage=", $storage_type,
",compression=", $compression_type,
)
};
}
use rocksdb_metric_header;
impl Blockstore {
pub fn db(self) -> Arc<Database> {
self.db
@ -601,7 +382,6 @@ impl Blockstore {
let blockstore_path = ledger_path.join(Self::blockstore_directory(
&options.column_options.shred_storage_type,
));
let column_options = options.column_options.clone();
adjust_ulimit_nofile(options.enforce_ulimit_nofile)?;
@ -694,7 +474,6 @@ impl Blockstore {
lowest_cleanup_slot: RwLock::<Slot>::default(),
no_compaction: false,
slots_stats: Mutex::<SlotsStats>::default(),
column_options,
};
if initialize_transaction_status_index {
blockstore.initialize_transaction_status_index()?;
@ -991,162 +770,24 @@ impl Blockstore {
/// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the
/// all the column families.
pub fn submit_rocksdb_cf_metrics_for_all_cfs(&self) {
let column_options = &self.column_options;
self.submit_rocksdb_cf_metrics::<cf::SlotMeta>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"slot_meta",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::DeadSlots>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"dead_slots",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::DuplicateSlots>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"duplicate_slots",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::ErasureMeta>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"erasure_meta",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::Orphans>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"orphans",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::BankHash>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"bank_hash",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::Root>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"root",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::Index>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"index",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::ShredData>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"shred_data",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::ShredCode>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"shred_code",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::TransactionStatus>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"transaction_status",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::AddressSignatures>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"address_signature",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::TransactionMemos>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"transaction_memos",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::TransactionStatusIndex>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"transaction_status_index",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::Rewards>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"rewards",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::Blocktime>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"blocktime",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::PerfSamples>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"perf_sample",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::BlockHeight>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"block_height",
column_options
));
self.submit_rocksdb_cf_metrics::<cf::ProgramCosts>(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"program_costs",
column_options
));
}
/// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the
/// given column family.
fn submit_rocksdb_cf_metrics<C: 'static + Column + ColumnName>(
&self,
metric_name_and_cf_tag: &'static str,
) {
let cf = self.db.column::<C>();
let cf_rocksdb_metrics = BlockstoreRocksDbColumnFamilyMetrics {
total_sst_files_size: cf
.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
size_all_mem_tables: cf
.get_int_property(RocksProperties::SIZE_ALL_MEM_TABLES)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_snapshots: cf
.get_int_property(RocksProperties::NUM_SNAPSHOTS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
oldest_snapshot_time: cf
.get_int_property(RocksProperties::OLDEST_SNAPSHOT_TIME)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
actual_delayed_write_rate: cf
.get_int_property(RocksProperties::ACTUAL_DELAYED_WRITE_RATE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
is_write_stopped: cf
.get_int_property(RocksProperties::IS_WRITE_STOPPED)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_capacity: cf
.get_int_property(RocksProperties::BLOCK_CACHE_CAPACITY)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_usage: cf
.get_int_property(RocksProperties::BLOCK_CACHE_USAGE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_pinned_usage: cf
.get_int_property(RocksProperties::BLOCK_CACHE_PINNED_USAGE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
estimate_table_readers_mem: cf
.get_int_property(RocksProperties::ESTIMATE_TABLE_READERS_MEM)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
mem_table_flush_pending: cf
.get_int_property(RocksProperties::MEM_TABLE_FLUSH_PENDING)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
compaction_pending: cf
.get_int_property(RocksProperties::COMPACTION_PENDING)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_running_compactions: cf
.get_int_property(RocksProperties::NUM_RUNNING_COMPACTIONS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_running_flushes: cf
.get_int_property(RocksProperties::NUM_RUNNING_FLUSHES)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
estimate_oldest_key_time: cf
.get_int_property(RocksProperties::ESTIMATE_OLDEST_KEY_TIME)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
background_errors: cf
.get_int_property(RocksProperties::BACKGROUND_ERRORS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
};
cf_rocksdb_metrics.report_metrics(metric_name_and_cf_tag);
self.meta_cf.submit_rocksdb_cf_metrics();
self.dead_slots_cf.submit_rocksdb_cf_metrics();
self.duplicate_slots_cf.submit_rocksdb_cf_metrics();
self.erasure_meta_cf.submit_rocksdb_cf_metrics();
self.orphans_cf.submit_rocksdb_cf_metrics();
self.index_cf.submit_rocksdb_cf_metrics();
self.data_shred_cf.submit_rocksdb_cf_metrics();
self.code_shred_cf.submit_rocksdb_cf_metrics();
self.transaction_status_cf.submit_rocksdb_cf_metrics();
self.address_signatures_cf.submit_rocksdb_cf_metrics();
self.transaction_memos_cf.submit_rocksdb_cf_metrics();
self.transaction_status_index_cf.submit_rocksdb_cf_metrics();
self.rewards_cf.submit_rocksdb_cf_metrics();
self.blocktime_cf.submit_rocksdb_cf_metrics();
self.perf_samples_cf.submit_rocksdb_cf_metrics();
self.block_height_cf.submit_rocksdb_cf_metrics();
self.program_costs_cf.submit_rocksdb_cf_metrics();
self.bank_hash_cf.submit_rocksdb_cf_metrics();
}
fn try_shred_recovery(

View File

@ -9,10 +9,10 @@ use {
self,
compaction_filter::CompactionFilter,
compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, DBCompactionStyle,
DBCompressionType as RocksCompressionType, DBIterator, DBRawIterator, DBRecoveryMode,
FifoCompactOptions, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch,
DB,
properties as RocksProperties, ColumnFamily, ColumnFamilyDescriptor, CompactionDecision,
DBCompactionStyle, DBCompressionType as RocksCompressionType, DBIterator, DBRawIterator,
DBRecoveryMode, FifoCompactOptions, IteratorMode as RocksIteratorMode, Options,
WriteBatch as RWriteBatch, DB,
},
serde::{de::DeserializeOwned, Serialize},
solana_runtime::hardened_unpack::UnpackError,
@ -36,6 +36,8 @@ use {
thiserror::Error,
};
const BLOCKSTORE_METRICS_ERROR: i64 = -1;
// The default storage size for storing shreds when `rocksdb-shred-compaction`
// is set to `fifo` in the validator arguments. This amount of storage size
// in bytes will equally allocated to both data shreds and coding shreds.
@ -99,6 +101,222 @@ const PROGRAM_COSTS_CF: &str = "program_costs";
// 1 day is chosen for the same reasoning of DEFAULT_COMPACTION_SLOT_INTERVAL
const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;
#[derive(Default)]
/// A metrics struct that exposes RocksDB's column family properties.
///
/// Here we only expose a subset of all the internal properties which are
/// relevant to the ledger store performance.
///
/// The list of completed RocksDB internal properties can be found
/// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
pub struct BlockstoreRocksDbColumnFamilyMetrics {
// Size related
// The storage size occupied by the column family.
// RocksDB's internal property key: "rocksdb.total-sst-files-size"
pub total_sst_files_size: i64,
// The memory size occupied by the column family's in-memory buffer.
// RocksDB's internal property key: "rocksdb.size-all-mem-tables"
pub size_all_mem_tables: i64,
// Snapshot related
// Number of snapshots hold for the column family.
// RocksDB's internal property key: "rocksdb.num-snapshots"
pub num_snapshots: i64,
// Unit timestamp of the oldest unreleased snapshot.
// RocksDB's internal property key: "rocksdb.oldest-snapshot-time"
pub oldest_snapshot_time: i64,
// Write related
// The current actual delayed write rate. 0 means no delay.
// RocksDB's internal property key: "rocksdb.actual-delayed-write-rate"
pub actual_delayed_write_rate: i64,
// A flag indicating whether writes are stopped on this column family.
// 1 indicates writes have been stopped.
// RocksDB's internal property key: "rocksdb.is-write-stopped"
pub is_write_stopped: i64,
// Memory / block cache related
// The block cache capacity of the column family.
// RocksDB's internal property key: "rocksdb.block-cache-capacity"
pub block_cache_capacity: i64,
// The memory size used by the column family in the block cache.
// RocksDB's internal property key: "rocksdb.block-cache-usage"
pub block_cache_usage: i64,
// The memory size used by the column family in the block cache where
// entries are pinned.
// RocksDB's internal property key: "rocksdb.block-cache-pinned-usage"
pub block_cache_pinned_usage: i64,
// The estimated memory size used for reading SST tables in this column
// family such as filters and index blocks. Note that this number does not
// include the memory used in block cache.
// RocksDB's internal property key: "rocksdb.estimate-table-readers-mem"
pub estimate_table_readers_mem: i64,
// Flush and compaction
// A 1 or 0 flag indicating whether a memtable flush is pending.
// If this number is 1, it means a memtable is waiting for being flushed,
// but there might be too many L0 files that prevents it from being flushed.
// RocksDB's internal property key: "rocksdb.mem-table-flush-pending"
pub mem_table_flush_pending: i64,
// A 1 or 0 flag indicating whether a compaction job is pending.
// If this number is 1, it means some part of the column family requires
// compaction in order to maintain shape of LSM tree, but the compaction
// is pending because the desired compaction job is either waiting for
// other dependnent compactions to be finished or waiting for an available
// compaction thread.
// RocksDB's internal property key: "rocksdb.compaction-pending"
pub compaction_pending: i64,
// The number of compactions that are currently running for the column family.
// RocksDB's internal property key: "rocksdb.num-running-compactions"
pub num_running_compactions: i64,
// The number of flushes that are currently running for the column family.
// RocksDB's internal property key: "rocksdb.num-running-flushes"
pub num_running_flushes: i64,
// FIFO Compaction related
// returns an estimation of the oldest key timestamp in the DB. Only vailable
// for FIFO compaction with compaction_options_fifo.allow_compaction = false.
// RocksDB's internal property key: "rocksdb.estimate-oldest-key-time"
pub estimate_oldest_key_time: i64,
// Misc
// The accumulated number of RocksDB background errors.
// RocksDB's internal property key: "rocksdb.background-errors"
pub background_errors: i64,
}
impl BlockstoreRocksDbColumnFamilyMetrics {
/// Report metrics with the specified metric name and column family tag.
/// The metric name and the column family tag is embeded in the parameter
/// `metric_name_and_cf_tag` with the following format.
///
/// For example, "blockstore_rocksdb_cfs,cf_name=shred_data".
pub fn report_metrics(&self, metric_name_and_cf_tag: &'static str) {
datapoint_info!(
metric_name_and_cf_tag,
// Size related
(
"total_sst_files_size",
self.total_sst_files_size as i64,
i64
),
("size_all_mem_tables", self.size_all_mem_tables as i64, i64),
// Snapshot related
("num_snapshots", self.num_snapshots as i64, i64),
(
"oldest_snapshot_time",
self.oldest_snapshot_time as i64,
i64
),
// Write related
(
"actual_delayed_write_rate",
self.actual_delayed_write_rate as i64,
i64
),
("is_write_stopped", self.is_write_stopped as i64, i64),
// Memory / block cache related
(
"block_cache_capacity",
self.block_cache_capacity as i64,
i64
),
("block_cache_usage", self.block_cache_usage as i64, i64),
(
"block_cache_pinned_usage",
self.block_cache_pinned_usage as i64,
i64
),
(
"estimate_table_readers_mem",
self.estimate_table_readers_mem as i64,
i64
),
// Flush and compaction
(
"mem_table_flush_pending",
self.mem_table_flush_pending as i64,
i64
),
("compaction_pending", self.compaction_pending as i64, i64),
(
"num_running_compactions",
self.num_running_compactions as i64,
i64
),
("num_running_flushes", self.num_running_flushes as i64, i64),
// FIFO Compaction related
(
"estimate_oldest_key_time",
self.estimate_oldest_key_time as i64,
i64
),
// Misc
("background_errors", self.background_errors as i64, i64),
);
}
}
macro_rules! rocksdb_metric_header {
($metric_name:literal, $cf_name:literal, $column_options:expr) => {
match $column_options.shred_storage_type {
ShredStorageType::RocksLevel =>
rocksdb_metric_header!(@compression_type $metric_name, $cf_name, $column_options, "rocks_level"),
ShredStorageType::RocksFifo(_) =>
rocksdb_metric_header!(@compression_type $metric_name, $cf_name, $column_options, "rocks_fifo"),
}
};
(@compression_type $metric_name:literal, $cf_name:literal, $column_options:expr, $storage_type:literal) => {
match $column_options.compression_type {
BlockstoreCompressionType::None => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"None"
),
BlockstoreCompressionType::Snappy => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"Snappy"
),
BlockstoreCompressionType::Lz4 => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"Lz4"
),
BlockstoreCompressionType::Zlib => rocksdb_metric_header!(@all_fields
$metric_name,
$cf_name,
$storage_type,
"Zlib"
),
}
};
(@all_fields $metric_name:literal, $cf_name:literal, $storage_type:literal, $compression_type:literal) => {
concat!($metric_name,
",cf_name=", $cf_name,
",storage=", $storage_type,
",compression=", $compression_type,
)
};
}
use rocksdb_metric_header;
#[derive(Error, Debug)]
pub enum BlockstoreError {
ShredForIndexExists,
@ -555,6 +773,13 @@ pub trait Column {
}
}
pub trait ColumnMetrics {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
);
}
pub trait ColumnName {
const NAME: &'static str;
}
@ -639,7 +864,18 @@ impl Column for columns::TransactionStatus {
(index, Signature::default(), 0)
}
}
impl ColumnMetrics for columns::TransactionStatus {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"transaction_status",
column_options
));
}
}
impl ColumnName for columns::TransactionStatus {
const NAME: &'static str = TRANSACTION_STATUS_CF;
}
@ -680,7 +916,18 @@ impl Column for columns::AddressSignatures {
(index, Pubkey::default(), 0, Signature::default())
}
}
impl ColumnMetrics for columns::AddressSignatures {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"address_signatures",
column_options
));
}
}
impl ColumnName for columns::AddressSignatures {
const NAME: &'static str = ADDRESS_SIGNATURES_CF;
}
@ -711,7 +958,18 @@ impl Column for columns::TransactionMemos {
Signature::default()
}
}
impl ColumnMetrics for columns::TransactionMemos {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"transaction_memos",
column_options
));
}
}
impl ColumnName for columns::TransactionMemos {
const NAME: &'static str = TRANSACTION_MEMOS_CF;
}
@ -742,12 +1000,35 @@ impl Column for columns::TransactionStatusIndex {
slot
}
}
impl ColumnMetrics for columns::TransactionStatusIndex {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"transaction_status_index",
column_options
));
}
}
impl ColumnName for columns::TransactionStatusIndex {
const NAME: &'static str = TRANSACTION_STATUS_INDEX_CF;
}
impl SlotColumn for columns::Rewards {}
impl ColumnMetrics for columns::Rewards {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"rewards",
column_options
));
}
}
impl ColumnName for columns::Rewards {
const NAME: &'static str = REWARDS_CF;
}
@ -756,6 +1037,18 @@ impl ProtobufColumn for columns::Rewards {
}
impl SlotColumn for columns::Blocktime {}
impl ColumnMetrics for columns::Blocktime {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"blocktime",
column_options
));
}
}
impl ColumnName for columns::Blocktime {
const NAME: &'static str = BLOCKTIME_CF;
}
@ -764,6 +1057,18 @@ impl TypedColumn for columns::Blocktime {
}
impl SlotColumn for columns::PerfSamples {}
impl ColumnMetrics for columns::PerfSamples {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"perf_samples",
column_options
));
}
}
impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF;
}
@ -772,6 +1077,18 @@ impl TypedColumn for columns::PerfSamples {
}
impl SlotColumn for columns::BlockHeight {}
impl ColumnMetrics for columns::BlockHeight {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"block_height",
column_options
));
}
}
impl ColumnName for columns::BlockHeight {
const NAME: &'static str = BLOCK_HEIGHT_CF;
}
@ -779,6 +1096,19 @@ impl TypedColumn for columns::BlockHeight {
type Type = u64;
}
impl ColumnMetrics for columns::ProgramCosts {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"program_costs",
column_options
));
}
}
impl ColumnName for columns::ProgramCosts {
const NAME: &'static str = PROGRAM_COSTS_CF;
}
@ -832,7 +1162,18 @@ impl Column for columns::ShredCode {
(slot, 0)
}
}
impl ColumnMetrics for columns::ShredCode {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"shred_code",
column_options
));
}
}
impl ColumnName for columns::ShredCode {
const NAME: &'static str = CODE_SHRED_CF;
}
@ -862,12 +1203,35 @@ impl Column for columns::ShredData {
(slot, 0)
}
}
impl ColumnMetrics for columns::ShredData {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"shred_data",
column_options
));
}
}
impl ColumnName for columns::ShredData {
const NAME: &'static str = DATA_SHRED_CF;
}
impl SlotColumn for columns::Index {}
impl ColumnMetrics for columns::Index {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"index",
column_options
));
}
}
impl ColumnName for columns::Index {
const NAME: &'static str = INDEX_CF;
}
@ -876,6 +1240,18 @@ impl TypedColumn for columns::Index {
}
impl SlotColumn for columns::DeadSlots {}
impl ColumnMetrics for columns::DeadSlots {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"dead_slots",
column_options
));
}
}
impl ColumnName for columns::DeadSlots {
const NAME: &'static str = DEAD_SLOTS_CF;
}
@ -884,6 +1260,18 @@ impl TypedColumn for columns::DeadSlots {
}
impl SlotColumn for columns::DuplicateSlots {}
impl ColumnMetrics for columns::DuplicateSlots {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"duplicate_slots",
column_options
));
}
}
impl ColumnName for columns::DuplicateSlots {
const NAME: &'static str = DUPLICATE_SLOTS_CF;
}
@ -892,6 +1280,18 @@ impl TypedColumn for columns::DuplicateSlots {
}
impl SlotColumn for columns::Orphans {}
impl ColumnMetrics for columns::Orphans {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"orphans",
column_options
));
}
}
impl ColumnName for columns::Orphans {
const NAME: &'static str = ORPHANS_CF;
}
@ -900,6 +1300,18 @@ impl TypedColumn for columns::Orphans {
}
impl SlotColumn for columns::BankHash {}
impl ColumnMetrics for columns::BankHash {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"bank_hash",
column_options
));
}
}
impl ColumnName for columns::BankHash {
const NAME: &'static str = BANK_HASH_CF;
}
@ -908,6 +1320,18 @@ impl TypedColumn for columns::BankHash {
}
impl SlotColumn for columns::Root {}
impl ColumnMetrics for columns::Root {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"root",
column_options
));
}
}
impl ColumnName for columns::Root {
const NAME: &'static str = ROOT_CF;
}
@ -916,6 +1340,18 @@ impl TypedColumn for columns::Root {
}
impl SlotColumn for columns::SlotMeta {}
impl ColumnMetrics for columns::SlotMeta {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"slot_meta",
column_options
));
}
}
impl ColumnName for columns::SlotMeta {
const NAME: &'static str = META_CF;
}
@ -949,6 +1385,18 @@ impl Column for columns::ErasureMeta {
(slot, 0)
}
}
impl ColumnMetrics for columns::ErasureMeta {
fn report_cf_metrics(
cf_metrics: BlockstoreRocksDbColumnFamilyMetrics,
column_options: &Arc<LedgerColumnOptions>,
) {
cf_metrics.report_metrics(rocksdb_metric_header!(
"blockstore_rocksdb_cfs",
"erasure_meta",
column_options
));
}
}
impl ColumnName for columns::ErasureMeta {
const NAME: &'static str = ERASURE_META_CF;
}
@ -960,15 +1408,73 @@ impl TypedColumn for columns::ErasureMeta {
pub struct Database {
backend: Arc<Rocks>,
path: Arc<Path>,
column_options: Arc<LedgerColumnOptions>,
}
#[derive(Debug, Clone)]
pub struct LedgerColumn<C>
where
C: Column,
C: Column + ColumnName + ColumnMetrics,
{
backend: Arc<Rocks>,
column: PhantomData<C>,
pub column_options: Arc<LedgerColumnOptions>,
}
impl<C: Column + ColumnName + ColumnMetrics> LedgerColumn<C> {
pub fn submit_rocksdb_cf_metrics(&self) {
let cf_rocksdb_metrics = BlockstoreRocksDbColumnFamilyMetrics {
total_sst_files_size: self
.get_int_property(RocksProperties::TOTAL_SST_FILES_SIZE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
size_all_mem_tables: self
.get_int_property(RocksProperties::SIZE_ALL_MEM_TABLES)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_snapshots: self
.get_int_property(RocksProperties::NUM_SNAPSHOTS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
oldest_snapshot_time: self
.get_int_property(RocksProperties::OLDEST_SNAPSHOT_TIME)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
actual_delayed_write_rate: self
.get_int_property(RocksProperties::ACTUAL_DELAYED_WRITE_RATE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
is_write_stopped: self
.get_int_property(RocksProperties::IS_WRITE_STOPPED)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_capacity: self
.get_int_property(RocksProperties::BLOCK_CACHE_CAPACITY)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_usage: self
.get_int_property(RocksProperties::BLOCK_CACHE_USAGE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
block_cache_pinned_usage: self
.get_int_property(RocksProperties::BLOCK_CACHE_PINNED_USAGE)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
estimate_table_readers_mem: self
.get_int_property(RocksProperties::ESTIMATE_TABLE_READERS_MEM)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
mem_table_flush_pending: self
.get_int_property(RocksProperties::MEM_TABLE_FLUSH_PENDING)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
compaction_pending: self
.get_int_property(RocksProperties::COMPACTION_PENDING)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_running_compactions: self
.get_int_property(RocksProperties::NUM_RUNNING_COMPACTIONS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
num_running_flushes: self
.get_int_property(RocksProperties::NUM_RUNNING_FLUSHES)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
estimate_oldest_key_time: self
.get_int_property(RocksProperties::ESTIMATE_OLDEST_KEY_TIME)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
background_errors: self
.get_int_property(RocksProperties::BACKGROUND_ERRORS)
.unwrap_or(BLOCKSTORE_METRICS_ERROR),
};
C::report_cf_metrics(cf_rocksdb_metrics, &self.column_options);
}
}
pub struct WriteBatch<'a> {
@ -976,7 +1482,7 @@ pub struct WriteBatch<'a> {
map: HashMap<&'static str, &'a ColumnFamily>,
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum ShredStorageType {
// Stores shreds under RocksDB's default compaction (level).
RocksLevel,
@ -992,7 +1498,7 @@ impl Default for ShredStorageType {
}
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub enum BlockstoreCompressionType {
None,
Snappy,
@ -1020,7 +1526,7 @@ impl BlockstoreCompressionType {
/// Options for LedgerColumn.
/// Each field might also be used as a tag that supports group-by operation when
/// reporting metrics.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct LedgerColumnOptions {
// Determine how to store both data and coding shreds. Default: RocksLevel.
pub shred_storage_type: ShredStorageType,
@ -1061,7 +1567,7 @@ impl Default for BlockstoreOptions {
}
}
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct BlockstoreRocksFifoOptions {
// The maximum storage size for storing data shreds in column family
// [`cf::DataShred`]. Typically, data shreds contribute around 25% of the
@ -1096,11 +1602,13 @@ impl Default for BlockstoreRocksFifoOptions {
impl Database {
pub fn open(path: &Path, options: BlockstoreOptions) -> Result<Self> {
let column_options = Arc::new(options.column_options.clone());
let backend = Arc::new(Rocks::open(path, options)?);
Ok(Database {
backend,
path: Arc::from(path),
column_options,
})
}
@ -1145,11 +1653,12 @@ impl Database {
pub fn column<C>(&self) -> LedgerColumn<C>
where
C: Column + ColumnName,
C: Column + ColumnName + ColumnMetrics,
{
LedgerColumn {
backend: Arc::clone(&self.backend),
column: PhantomData,
column_options: Arc::clone(&self.column_options),
}
}
@ -1198,7 +1707,7 @@ impl Database {
impl<C> LedgerColumn<C>
where
C: Column + ColumnName,
C: Column + ColumnName + ColumnMetrics,
{
pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> {
self.backend.get_cf(self.handle(), &C::key(key))
@ -1286,7 +1795,7 @@ where
impl<C> LedgerColumn<C>
where
C: TypedColumn + ColumnName,
C: TypedColumn + ColumnName + ColumnMetrics,
{
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
@ -1312,7 +1821,7 @@ where
impl<C> LedgerColumn<C>
where
C: ProtobufColumn + ColumnName,
C: ProtobufColumn + ColumnName + ColumnMetrics,
{
pub fn get_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
&self,