From 58c0db97049990d5ba16aa867f7c085b24282624 Mon Sep 17 00:00:00 2001 From: steviez Date: Thu, 10 Mar 2022 02:08:38 -0600 Subject: [PATCH] Cleanup several blockstore functions (#23390) * Rename excludes_from_compaction to should_exclude_from_compaction * Make subfunction to create all cf descriptors * Condense logic for when to disable compactions --- ledger/src/blockstore_db.rs | 141 +++++++++++++++++++++--------------- 1 file changed, 82 insertions(+), 59 deletions(-) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 92a39b832..ce6994193 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -214,7 +214,7 @@ pub mod columns { // When adding a new column ... // - Add struct below and implement `Column` and `ColumnName` traits - // - Add descriptor in Rocks::open() and name in Rocks::columns() + // - Add descriptor in Rocks::cf_descriptors() and name in Rocks::columns() // - Account for column in both `run_purge_with_stats()` and // `compact_storage()` in ledger/src/blockstore/blockstore_purge.rs !! // - Account for column in `analyze_storage()` in ledger-tool/src/main.rs @@ -296,65 +296,33 @@ struct Rocks(rocksdb::DB, ActualAccessType, OldestSlot); impl Rocks { fn open(path: &Path, options: BlockstoreOptions) -> Result { - use columns::*; - let access_type = options.access_type; - let recovery_mode = options.recovery_mode; + let access_type = &options.access_type; + let recovery_mode = options.recovery_mode.clone(); fs::create_dir_all(&path)?; // Use default database options - if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { + if should_disable_auto_compactions(access_type) { warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update..."); } - let mut db_options = get_db_options(&access_type); + let mut db_options = get_db_options(access_type); if let Some(recovery_mode) = recovery_mode { db_options.set_wal_recovery_mode(recovery_mode.into()); } let oldest_slot = OldestSlot::default(); - - // Get column family descriptors and names - let (cf_descriptor_shred_data, cf_descriptor_shred_code) = - new_cf_descriptor_pair_shreds::( - &options.shred_storage_type, - &access_type, - &oldest_slot, - ); - let cfs = vec![ - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - cf_descriptor_shred_data, - cf_descriptor_shred_code, - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - new_cf_descriptor::(&access_type, &oldest_slot), - ]; + let cf_descriptors = Self::cf_descriptors(&options, &oldest_slot); let cf_names = Self::columns(); - // The names and descriptors don't have to be in the same - // order, but there should be the same number of each. - assert_eq!(cfs.len(), cf_names.len()); // Open the database let db = match access_type { AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks( - DB::open_cf_descriptors(&db_options, path, cfs)?, + DB::open_cf_descriptors(&db_options, path, cf_descriptors)?, ActualAccessType::Primary, oldest_slot, ), AccessType::TryPrimaryThenSecondary => { - match DB::open_cf_descriptors(&db_options, path, cfs) { + match DB::open_cf_descriptors(&db_options, path, cf_descriptors) { Ok(db) => Rocks(db, ActualAccessType::Primary, oldest_slot), Err(err) => { let secondary_path = path.join("solana-secondary"); @@ -382,7 +350,7 @@ impl Rocks { for cf_name in cf_names { // these special column families must be excluded from LedgerCleanupService's rocksdb // compactions - if excludes_from_compaction(cf_name) { + if should_exclude_from_compaction(cf_name) { continue; } @@ -437,6 +405,42 @@ impl Rocks { Ok(db) } + fn cf_descriptors( + options: &BlockstoreOptions, + oldest_slot: &OldestSlot, + ) -> Vec { + use columns::*; + let access_type = &options.access_type; + + let (cf_descriptor_shred_data, cf_descriptor_shred_code) = + new_cf_descriptor_pair_shreds::( + &options.shred_storage_type, + access_type, + oldest_slot, + ); + vec![ + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + cf_descriptor_shred_data, + cf_descriptor_shred_code, + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + new_cf_descriptor::(access_type, oldest_slot), + ] + } + fn columns() -> Vec<&'static str> { use columns::*; @@ -1416,9 +1420,12 @@ fn get_cf_options( options.set_max_bytes_for_level_base(total_size_base); options.set_target_file_size_base(file_size_base); - // TransactionStatusIndex and ProgramCosts must be excluded from LedgerCleanupService's rocksdb - // compactions.... - if matches!(access_type, AccessType::PrimaryOnly) && !excludes_from_compaction(C::NAME) { + let disable_auto_compactions = should_disable_auto_compactions(access_type); + if disable_auto_compactions { + options.set_disable_auto_compactions(true); + } + + if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) { options.set_compaction_filter_factory(PurgedSlotFilterFactory:: { oldest_slot: oldest_slot.clone(), name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(), @@ -1426,11 +1433,6 @@ fn get_cf_options( }); } - // Disable automatic compactions in maintenance mode to prevent accidental cleaning - if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { - options.set_disable_auto_compactions(true); - } - options } @@ -1532,8 +1534,7 @@ fn get_db_options(access_type: &AccessType) -> Options { // Set max total wal size to 4G. options.set_max_total_wal_size(4 * 1024 * 1024 * 1024); - // Disable automatic compactions in maintenance mode to prevent accidental cleaning - if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) { + if should_disable_auto_compactions(access_type) { options.set_disable_auto_compactions(true); } @@ -1545,8 +1546,15 @@ fn get_db_options(access_type: &AccessType) -> Options { options } -fn excludes_from_compaction(cf_name: &str) -> bool { - // list of Column Families must be excluded from compaction: +// Returns whether automatic compactions should be disabled based upon access type +fn should_disable_auto_compactions(access_type: &AccessType) -> bool { + // Disable automatic compactions in maintenance mode to prevent accidental cleaning + matches!(access_type, AccessType::PrimaryOnlyForMaintenance) +} + +// Returns whether the supplied column (name) should be excluded from compaction +fn should_exclude_from_compaction(cf_name: &str) -> bool { + // List of column families to be excluded from compactions let no_compaction_cfs: HashSet<&'static str> = vec![ columns::TransactionStatusIndex::NAME, columns::ProgramCosts::NAME, @@ -1611,13 +1619,28 @@ pub mod tests { } #[test] - fn test_excludes_from_compaction() { - // currently there are two CFs are excluded from compaction: - assert!(excludes_from_compaction( + fn test_cf_names_and_descriptors_equal_length() { + let options = BlockstoreOptions::default(); + let oldest_slot = OldestSlot::default(); + // The names and descriptors don't need to be in the same order for our use cases; + // however, there should be the same number of each. For example, adding a new column + // should update both lists. + assert_eq!( + Rocks::columns().len(), + Rocks::cf_descriptors(&options, &oldest_slot).len() + ); + } + + #[test] + fn test_should_exclude_from_compaction() { + // currently there are three CFs excluded from compaction: + assert!(should_exclude_from_compaction( columns::TransactionStatusIndex::NAME )); - assert!(excludes_from_compaction(columns::ProgramCosts::NAME)); - assert!(excludes_from_compaction(columns::TransactionMemos::NAME)); - assert!(!excludes_from_compaction("something else")); + assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME)); + assert!(should_exclude_from_compaction( + columns::TransactionMemos::NAME + )); + assert!(!should_exclude_from_compaction("something else")); } }