Cleanup several blockstore functions (#23390)

* Rename excludes_from_compaction to should_exclude_from_compaction
* Make subfunction to create all cf descriptors
* Condense logic for when to disable compactions
This commit is contained in:
steviez 2022-03-10 02:08:38 -06:00 committed by GitHub
parent 37189f20c5
commit 58c0db9704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 82 additions and 59 deletions

View File

@ -214,7 +214,7 @@ pub mod columns {
// When adding a new column ...
// - Add struct below and implement `Column` and `ColumnName` traits
// - Add descriptor in Rocks::open() and name in Rocks::columns()
// - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns()
// - Account for column in both `run_purge_with_stats()` and
// `compact_storage()` in ledger/src/blockstore/blockstore_purge.rs !!
// - Account for column in `analyze_storage()` in ledger-tool/src/main.rs
@ -296,65 +296,33 @@ struct Rocks(rocksdb::DB, ActualAccessType, OldestSlot);
impl Rocks {
fn open(path: &Path, options: BlockstoreOptions) -> Result<Rocks> {
use columns::*;
let access_type = options.access_type;
let recovery_mode = options.recovery_mode;
let access_type = &options.access_type;
let recovery_mode = options.recovery_mode.clone();
fs::create_dir_all(&path)?;
// Use default database options
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
if should_disable_auto_compactions(access_type) {
warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update...");
}
let mut db_options = get_db_options(&access_type);
let mut db_options = get_db_options(access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
let oldest_slot = OldestSlot::default();
// Get column family descriptors and names
let (cf_descriptor_shred_data, cf_descriptor_shred_code) =
new_cf_descriptor_pair_shreds::<ShredData, ShredCode>(
&options.shred_storage_type,
&access_type,
&oldest_slot,
);
let cfs = vec![
new_cf_descriptor::<SlotMeta>(&access_type, &oldest_slot),
new_cf_descriptor::<DeadSlots>(&access_type, &oldest_slot),
new_cf_descriptor::<DuplicateSlots>(&access_type, &oldest_slot),
new_cf_descriptor::<ErasureMeta>(&access_type, &oldest_slot),
new_cf_descriptor::<Orphans>(&access_type, &oldest_slot),
new_cf_descriptor::<BankHash>(&access_type, &oldest_slot),
new_cf_descriptor::<Root>(&access_type, &oldest_slot),
new_cf_descriptor::<Index>(&access_type, &oldest_slot),
cf_descriptor_shred_data,
cf_descriptor_shred_code,
new_cf_descriptor::<TransactionStatus>(&access_type, &oldest_slot),
new_cf_descriptor::<AddressSignatures>(&access_type, &oldest_slot),
new_cf_descriptor::<TransactionMemos>(&access_type, &oldest_slot),
new_cf_descriptor::<TransactionStatusIndex>(&access_type, &oldest_slot),
new_cf_descriptor::<Rewards>(&access_type, &oldest_slot),
new_cf_descriptor::<Blocktime>(&access_type, &oldest_slot),
new_cf_descriptor::<PerfSamples>(&access_type, &oldest_slot),
new_cf_descriptor::<BlockHeight>(&access_type, &oldest_slot),
new_cf_descriptor::<ProgramCosts>(&access_type, &oldest_slot),
];
let cf_descriptors = Self::cf_descriptors(&options, &oldest_slot);
let cf_names = Self::columns();
// The names and descriptors don't have to be in the same
// order, but there should be the same number of each.
assert_eq!(cfs.len(), cf_names.len());
// Open the database
let db = match access_type {
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs)?,
DB::open_cf_descriptors(&db_options, path, cf_descriptors)?,
ActualAccessType::Primary,
oldest_slot,
),
AccessType::TryPrimaryThenSecondary => {
match DB::open_cf_descriptors(&db_options, path, cfs) {
match DB::open_cf_descriptors(&db_options, path, cf_descriptors) {
Ok(db) => Rocks(db, ActualAccessType::Primary, oldest_slot),
Err(err) => {
let secondary_path = path.join("solana-secondary");
@ -382,7 +350,7 @@ impl Rocks {
for cf_name in cf_names {
// these special column families must be excluded from LedgerCleanupService's rocksdb
// compactions
if excludes_from_compaction(cf_name) {
if should_exclude_from_compaction(cf_name) {
continue;
}
@ -437,6 +405,42 @@ impl Rocks {
Ok(db)
}
fn cf_descriptors(
options: &BlockstoreOptions,
oldest_slot: &OldestSlot,
) -> Vec<ColumnFamilyDescriptor> {
use columns::*;
let access_type = &options.access_type;
let (cf_descriptor_shred_data, cf_descriptor_shred_code) =
new_cf_descriptor_pair_shreds::<ShredData, ShredCode>(
&options.shred_storage_type,
access_type,
oldest_slot,
);
vec![
new_cf_descriptor::<SlotMeta>(access_type, oldest_slot),
new_cf_descriptor::<DeadSlots>(access_type, oldest_slot),
new_cf_descriptor::<DuplicateSlots>(access_type, oldest_slot),
new_cf_descriptor::<ErasureMeta>(access_type, oldest_slot),
new_cf_descriptor::<Orphans>(access_type, oldest_slot),
new_cf_descriptor::<BankHash>(access_type, oldest_slot),
new_cf_descriptor::<Root>(access_type, oldest_slot),
new_cf_descriptor::<Index>(access_type, oldest_slot),
cf_descriptor_shred_data,
cf_descriptor_shred_code,
new_cf_descriptor::<TransactionStatus>(access_type, oldest_slot),
new_cf_descriptor::<AddressSignatures>(access_type, oldest_slot),
new_cf_descriptor::<TransactionMemos>(access_type, oldest_slot),
new_cf_descriptor::<TransactionStatusIndex>(access_type, oldest_slot),
new_cf_descriptor::<Rewards>(access_type, oldest_slot),
new_cf_descriptor::<Blocktime>(access_type, oldest_slot),
new_cf_descriptor::<PerfSamples>(access_type, oldest_slot),
new_cf_descriptor::<BlockHeight>(access_type, oldest_slot),
new_cf_descriptor::<ProgramCosts>(access_type, oldest_slot),
]
}
fn columns() -> Vec<&'static str> {
use columns::*;
@ -1416,9 +1420,12 @@ fn get_cf_options<C: 'static + Column + ColumnName>(
options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base);
// TransactionStatusIndex and ProgramCosts must be excluded from LedgerCleanupService's rocksdb
// compactions....
if matches!(access_type, AccessType::PrimaryOnly) && !excludes_from_compaction(C::NAME) {
let disable_auto_compactions = should_disable_auto_compactions(access_type);
if disable_auto_compactions {
options.set_disable_auto_compactions(true);
}
if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) {
options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
oldest_slot: oldest_slot.clone(),
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
@ -1426,11 +1433,6 @@ fn get_cf_options<C: 'static + Column + ColumnName>(
});
}
// Disable automatic compactions in maintenance mode to prevent accidental cleaning
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}
options
}
@ -1532,8 +1534,7 @@ fn get_db_options(access_type: &AccessType) -> Options {
// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
// Disable automatic compactions in maintenance mode to prevent accidental cleaning
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
if should_disable_auto_compactions(access_type) {
options.set_disable_auto_compactions(true);
}
@ -1545,8 +1546,15 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}
fn excludes_from_compaction(cf_name: &str) -> bool {
// list of Column Families must be excluded from compaction:
// Returns whether automatic compactions should be disabled based upon access type
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
// Disable automatic compactions in maintenance mode to prevent accidental cleaning
matches!(access_type, AccessType::PrimaryOnlyForMaintenance)
}
// Returns whether the supplied column (name) should be excluded from compaction
fn should_exclude_from_compaction(cf_name: &str) -> bool {
// List of column families to be excluded from compactions
let no_compaction_cfs: HashSet<&'static str> = vec![
columns::TransactionStatusIndex::NAME,
columns::ProgramCosts::NAME,
@ -1611,13 +1619,28 @@ pub mod tests {
}
#[test]
fn test_excludes_from_compaction() {
// currently there are two CFs are excluded from compaction:
assert!(excludes_from_compaction(
fn test_cf_names_and_descriptors_equal_length() {
let options = BlockstoreOptions::default();
let oldest_slot = OldestSlot::default();
// The names and descriptors don't need to be in the same order for our use cases;
// however, there should be the same number of each. For example, adding a new column
// should update both lists.
assert_eq!(
Rocks::columns().len(),
Rocks::cf_descriptors(&options, &oldest_slot).len()
);
}
#[test]
fn test_should_exclude_from_compaction() {
// currently there are three CFs excluded from compaction:
assert!(should_exclude_from_compaction(
columns::TransactionStatusIndex::NAME
));
assert!(excludes_from_compaction(columns::ProgramCosts::NAME));
assert!(excludes_from_compaction(columns::TransactionMemos::NAME));
assert!(!excludes_from_compaction("something else"));
assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME));
assert!(should_exclude_from_compaction(
columns::TransactionMemos::NAME
));
assert!(!should_exclude_from_compaction("something else"));
}
}