(LedgerStore) Add compression type (#23578)
This PR adds `--rocksdb-ledger-compression` as a hidden argument to the validator for specifying the compression algorithm for TransactionStatus. Available compression algorithms include `lz4`, `snappy`, `zlib`. The default value is `none`. Experimental results show that with lz4 compression, we can achieve ~37% size-reduction on the TransactionStatus column family, or ~8% size-reduction of the ledger store size.
This commit is contained in:
parent
49228573f4
commit
ae75b1a25f
|
@ -358,6 +358,7 @@ mod tests {
|
|||
..BlockstoreRocksFifoOptions::default()
|
||||
},
|
||||
),
|
||||
..LedgerColumnOptions::default()
|
||||
},
|
||||
..BlockstoreOptions::default()
|
||||
}
|
||||
|
|
|
@ -5,9 +5,9 @@ use {
|
|||
crate::{
|
||||
ancestor_iterator::AncestorIterator,
|
||||
blockstore_db::{
|
||||
columns as cf, AccessType, BlockstoreOptions, Column, ColumnName, Database,
|
||||
IteratorDirection, IteratorMode, LedgerColumn, LedgerColumnOptions, Result,
|
||||
ShredStorageType, WriteBatch,
|
||||
columns as cf, AccessType, BlockstoreCompressionType, BlockstoreOptions, Column,
|
||||
ColumnName, Database, IteratorDirection, IteratorMode, LedgerColumn,
|
||||
LedgerColumnOptions, Result, ShredStorageType, WriteBatch,
|
||||
},
|
||||
blockstore_meta::*,
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
|
@ -524,16 +524,46 @@ macro_rules! rocksdb_metric_header {
|
|||
($metric_name:literal, $cf_name:literal, $column_options:expr) => {
|
||||
match $column_options.shred_storage_type {
|
||||
ShredStorageType::RocksLevel =>
|
||||
rocksdb_metric_header!(@all_fields $metric_name, $cf_name, "rocks_level"),
|
||||
rocksdb_metric_header!(@compression_type $metric_name, $cf_name, $column_options, "rocks_level"),
|
||||
ShredStorageType::RocksFifo(_) =>
|
||||
rocksdb_metric_header!(@all_fields $metric_name, $cf_name, "rocks_fifo"),
|
||||
rocksdb_metric_header!(@compression_type $metric_name, $cf_name, $column_options, "rocks_fifo"),
|
||||
}
|
||||
};
|
||||
|
||||
(@all_fields $metric_name:literal, $cf_name:literal, $storage_type:literal) => {
|
||||
(@compression_type $metric_name:literal, $cf_name:literal, $column_options:expr, $storage_type:literal) => {
|
||||
match $column_options.compression_type {
|
||||
BlockstoreCompressionType::None => rocksdb_metric_header!(@all_fields
|
||||
$metric_name,
|
||||
$cf_name,
|
||||
$storage_type,
|
||||
"None"
|
||||
),
|
||||
BlockstoreCompressionType::Snappy => rocksdb_metric_header!(@all_fields
|
||||
$metric_name,
|
||||
$cf_name,
|
||||
$storage_type,
|
||||
"Snappy"
|
||||
),
|
||||
BlockstoreCompressionType::Lz4 => rocksdb_metric_header!(@all_fields
|
||||
$metric_name,
|
||||
$cf_name,
|
||||
$storage_type,
|
||||
"Lz4"
|
||||
),
|
||||
BlockstoreCompressionType::Zlib => rocksdb_metric_header!(@all_fields
|
||||
$metric_name,
|
||||
$cf_name,
|
||||
$storage_type,
|
||||
"Zlib"
|
||||
),
|
||||
}
|
||||
};
|
||||
|
||||
(@all_fields $metric_name:literal, $cf_name:literal, $storage_type:literal, $compression_type:literal) => {
|
||||
concat!($metric_name,
|
||||
",cf_name=", $cf_name,
|
||||
",storage=", $storage_type,
|
||||
",compression=", $compression_type,
|
||||
)
|
||||
};
|
||||
}
|
||||
|
@ -4358,6 +4388,7 @@ macro_rules! create_new_tmp_ledger_fifo_auto_delete {
|
|||
shred_storage_type: $crate::blockstore_db::ShredStorageType::RocksFifo(
|
||||
$crate::blockstore_db::BlockstoreRocksFifoOptions::default(),
|
||||
),
|
||||
..$crate::blockstore_db::LedgerColumnOptions::default()
|
||||
},
|
||||
)
|
||||
};
|
||||
|
@ -4722,6 +4753,7 @@ pub mod tests {
|
|||
shred_storage_type: ShredStorageType::RocksFifo(
|
||||
BlockstoreRocksFifoOptions::default(),
|
||||
),
|
||||
..LedgerColumnOptions::default()
|
||||
},
|
||||
..BlockstoreOptions::default()
|
||||
},
|
||||
|
|
|
@ -9,9 +9,10 @@ use {
|
|||
self,
|
||||
compaction_filter::CompactionFilter,
|
||||
compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
|
||||
ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, DBCompactionStyle, DBIterator,
|
||||
DBRawIterator, DBRecoveryMode, FifoCompactOptions, IteratorMode as RocksIteratorMode,
|
||||
Options, WriteBatch as RWriteBatch, DB,
|
||||
ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, DBCompactionStyle,
|
||||
DBCompressionType as RocksCompressionType, DBIterator, DBRawIterator, DBRecoveryMode,
|
||||
FifoCompactOptions, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch,
|
||||
DB,
|
||||
},
|
||||
serde::{de::DeserializeOwned, Serialize},
|
||||
solana_runtime::hardened_unpack::UnpackError,
|
||||
|
@ -991,6 +992,31 @@ impl Default for ShredStorageType {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum BlockstoreCompressionType {
|
||||
None,
|
||||
Snappy,
|
||||
Lz4,
|
||||
Zlib,
|
||||
}
|
||||
|
||||
impl Default for BlockstoreCompressionType {
|
||||
fn default() -> Self {
|
||||
Self::None
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockstoreCompressionType {
|
||||
fn to_rocksdb_compression_type(&self) -> RocksCompressionType {
|
||||
match self {
|
||||
Self::None => RocksCompressionType::None,
|
||||
Self::Snappy => RocksCompressionType::Snappy,
|
||||
Self::Lz4 => RocksCompressionType::Lz4,
|
||||
Self::Zlib => RocksCompressionType::Zlib,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Options for LedgerColumn.
|
||||
/// Each field might also be used as a tag that supports group-by operation when
|
||||
/// reporting metrics.
|
||||
|
@ -998,12 +1024,17 @@ impl Default for ShredStorageType {
|
|||
pub struct LedgerColumnOptions {
|
||||
// Determine how to store both data and coding shreds. Default: RocksLevel.
|
||||
pub shred_storage_type: ShredStorageType,
|
||||
|
||||
// Determine the way to compress column families which are eligible for
|
||||
// compression.
|
||||
pub compression_type: BlockstoreCompressionType,
|
||||
}
|
||||
|
||||
impl Default for LedgerColumnOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
shred_storage_type: ShredStorageType::RocksLevel,
|
||||
compression_type: BlockstoreCompressionType::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1444,9 +1475,24 @@ fn get_cf_options<C: 'static + Column + ColumnName>(
|
|||
});
|
||||
}
|
||||
|
||||
process_cf_options_advanced::<C>(&mut cf_options, &options.column_options);
|
||||
|
||||
cf_options
|
||||
}
|
||||
|
||||
fn process_cf_options_advanced<C: 'static + Column + ColumnName>(
|
||||
cf_options: &mut Options,
|
||||
column_options: &LedgerColumnOptions,
|
||||
) {
|
||||
if should_enable_compression::<C>() {
|
||||
cf_options.set_compression_type(
|
||||
column_options
|
||||
.compression_type
|
||||
.to_rocksdb_compression_type(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates and returns the column family descriptors for both data shreds and
|
||||
/// coding shreds column families.
|
||||
///
|
||||
|
@ -1465,17 +1511,21 @@ fn new_cf_descriptor_pair_shreds<
|
|||
new_cf_descriptor::<C>(options, oldest_slot),
|
||||
),
|
||||
ShredStorageType::RocksFifo(fifo_options) => (
|
||||
new_cf_descriptor_fifo::<D>(&fifo_options.shred_data_cf_size),
|
||||
new_cf_descriptor_fifo::<C>(&fifo_options.shred_code_cf_size),
|
||||
new_cf_descriptor_fifo::<D>(&fifo_options.shred_data_cf_size, &options.column_options),
|
||||
new_cf_descriptor_fifo::<C>(&fifo_options.shred_code_cf_size, &options.column_options),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_cf_descriptor_fifo<C: 'static + Column + ColumnName>(
|
||||
max_cf_size: &u64,
|
||||
column_options: &LedgerColumnOptions,
|
||||
) -> ColumnFamilyDescriptor {
|
||||
if *max_cf_size > FIFO_WRITE_BUFFER_SIZE {
|
||||
ColumnFamilyDescriptor::new(C::NAME, get_cf_options_fifo::<C>(max_cf_size))
|
||||
ColumnFamilyDescriptor::new(
|
||||
C::NAME,
|
||||
get_cf_options_fifo::<C>(max_cf_size, column_options),
|
||||
)
|
||||
} else {
|
||||
panic!(
|
||||
"{} cf_size must be greater than write buffer size {} when using ShredStorageType::RocksFifo.",
|
||||
|
@ -1495,7 +1545,10 @@ fn new_cf_descriptor_fifo<C: 'static + Column + ColumnName>(
|
|||
/// rocksdb will start deleting the oldest SST file when the column family
|
||||
/// size reaches `max_cf_size` - `FIFO_WRITE_BUFFER_SIZE` to strictly
|
||||
/// maintain the size limit.
|
||||
fn get_cf_options_fifo<C: 'static + Column + ColumnName>(max_cf_size: &u64) -> Options {
|
||||
fn get_cf_options_fifo<C: 'static + Column + ColumnName>(
|
||||
max_cf_size: &u64,
|
||||
column_options: &LedgerColumnOptions,
|
||||
) -> Options {
|
||||
let mut options = Options::default();
|
||||
|
||||
options.set_max_write_buffer_number(8);
|
||||
|
@ -1520,6 +1573,8 @@ fn get_cf_options_fifo<C: 'static + Column + ColumnName>(max_cf_size: &u64) -> O
|
|||
options.set_compaction_style(DBCompactionStyle::Fifo);
|
||||
options.set_fifo_compaction_options(&fifo_compact_options);
|
||||
|
||||
process_cf_options_advanced::<C>(&mut options, column_options);
|
||||
|
||||
options
|
||||
}
|
||||
|
||||
|
@ -1576,6 +1631,11 @@ fn should_exclude_from_compaction(cf_name: &str) -> bool {
|
|||
no_compaction_cfs.get(cf_name).is_some()
|
||||
}
|
||||
|
||||
// Returns true if the column family enables compression.
|
||||
fn should_enable_compression<C: 'static + Column + ColumnName>() -> bool {
|
||||
C::NAME == columns::TransactionStatus::NAME
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use {super::*, crate::blockstore_db::columns::ShredData};
|
||||
|
|
|
@ -34,8 +34,8 @@ use {
|
|||
contact_info::ContactInfo,
|
||||
},
|
||||
solana_ledger::blockstore_db::{
|
||||
BlockstoreRecoveryMode, BlockstoreRocksFifoOptions, LedgerColumnOptions, ShredStorageType,
|
||||
DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES,
|
||||
BlockstoreCompressionType, BlockstoreRecoveryMode, BlockstoreRocksFifoOptions,
|
||||
LedgerColumnOptions, ShredStorageType, DEFAULT_ROCKS_FIFO_SHRED_STORAGE_SIZE_BYTES,
|
||||
},
|
||||
solana_perf::recycler::enable_recycler_warming,
|
||||
solana_poh::poh_service,
|
||||
|
@ -1001,6 +1001,18 @@ pub fn main() {
|
|||
.help("The shred storage size in bytes. \
|
||||
The suggested value is 50% of your ledger storage size in bytes."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rocksdb_ledger_compression")
|
||||
.hidden(true)
|
||||
.long("rocksdb-ledger-compression")
|
||||
.value_name("COMPRESSION_TYPE")
|
||||
.takes_value(true)
|
||||
.possible_values(&["none", "lz4", "snappy", "zlib"])
|
||||
.default_value("none")
|
||||
.help("The compression alrogithm that is used to compress \
|
||||
transaction status data. \
|
||||
Turning on compression can save ~10% of the ledger size."),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("skip_poh_verify")
|
||||
.long("skip-poh-verify")
|
||||
|
@ -2601,6 +2613,19 @@ pub fn main() {
|
|||
}
|
||||
|
||||
validator_config.ledger_column_options = LedgerColumnOptions {
|
||||
compression_type: match matches.value_of("rocksdb_ledger_compression") {
|
||||
None => BlockstoreCompressionType::default(),
|
||||
Some(ledger_compression_string) => match ledger_compression_string {
|
||||
"none" => BlockstoreCompressionType::None,
|
||||
"snappy" => BlockstoreCompressionType::Snappy,
|
||||
"lz4" => BlockstoreCompressionType::Lz4,
|
||||
"zlib" => BlockstoreCompressionType::Zlib,
|
||||
_ => panic!(
|
||||
"Unsupported ledger_compression: {}",
|
||||
ledger_compression_string
|
||||
),
|
||||
},
|
||||
},
|
||||
shred_storage_type: match matches.value_of("rocksdb_shred_compaction") {
|
||||
None => ShredStorageType::default(),
|
||||
Some(shred_compaction_string) => match shred_compaction_string {
|
||||
|
|
Loading…
Reference in New Issue