diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 5ed16af65d..c563186697 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -34,7 +34,7 @@ use { }, solana_storage_proto::convert::generated, std::{ - collections::{HashMap, HashSet}, + collections::HashMap, ffi::{CStr, CString}, fs, marker::PhantomData, @@ -52,6 +52,14 @@ const BLOCKSTORE_METRICS_ERROR: i64 = -1; const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB const FIFO_WRITE_BUFFER_SIZE: u64 = 2 * MAX_WRITE_BUFFER_SIZE; +// SST files older than this value will be picked up for compaction. This value +// was chosen to be one day to strike a balance between storage getting +// reclaimed in a timely manner and the additional I/O that compaction incurs. +// For more details on this property, see +// https://github.com/facebook/rocksdb/blob/749b179c041347d150fa6721992ae8398b7d2b39/ +// include/rocksdb/advanced_options.h#L908C30-L908C30 +const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24; + // Column family for metadata about a leader slot const META_CF: &str = "meta"; // Column family for slots that have been marked as dead @@ -362,9 +370,6 @@ impl Rocks { fs::create_dir_all(path)?; // Use default database options - if should_disable_auto_compactions(&access_type) { - info!("Disabling rocksdb's automatic compactions..."); - } let mut db_options = get_db_options(&access_type); if let Some(recovery_mode) = recovery_mode { db_options.set_wal_recovery_mode(recovery_mode.into()); @@ -408,6 +413,7 @@ impl Rocks { } } }; + db.configure_compaction(); Ok(db) } @@ -471,6 +477,53 @@ impl Rocks { ] } + // Configure compaction on a per-column basis + fn configure_compaction(&self) { + // If compactions are disabled altogether, no need to tune values + if should_disable_auto_compactions(&self.access_type) { + info!( + "Rocks's automatic compactions are disabled due to {:?} access", + self.access_type + ); + return; + } + + // Some columns make use of rocksdb's compaction to help in cleaning + // the database. See comments in should_enable_cf_compaction() for more + // details on why some columns need compaction and why others do not. + // + // More specifically, periodic (automatic) compaction is used as + // opposed to manual compaction requests on a range. + // - Periodic compaction operates on individual files once the file + // has reached a certain (configurable) age. See comments at + // PERIODIC_COMPACTION_SECONDS for some more deatil. + // - Manual compaction operates on a range and could end up propagating + // through several files and/or levels of the db. + // + // Given that data is inserted into the db at a somewhat steady rate, + // the age of the individual files will be fairly evently distributed + // over time as well. Thus, the I/O to perform cleanup with periodic + // compaction is also evenly distributed over time. On the other hand, + // a manual compaction spanning a large numbers of files could cause + // a sudden burst in I/O. Such a burst could potentially cause a write + // stall in addition to negatively impacting other parts of the system. + // Thus, the choice to use periodic compactions is fairly easy. + for cf_name in Self::columns() { + if should_enable_cf_compaction(cf_name) { + let cf_handle = self.cf_handle(cf_name); + self.db + .set_options_cf( + &cf_handle, + &[( + "periodic_compaction_seconds", + &PERIODIC_COMPACTION_SECONDS.to_string(), + )], + ) + .unwrap(); + } + } + } + fn destroy(path: &Path) -> Result<()> { DB::destroy(&Options::default(), path)?; @@ -1610,7 +1663,9 @@ impl<'a> WriteBatch<'a> { } } +/// A CompactionFilter implementation to remove keys older than a given slot. struct PurgedSlotFilter { + /// The oldest slot to keep; any slot < oldest_slot will be removed oldest_slot: Slot, name: CString, _phantom: PhantomData, @@ -1621,8 +1676,6 @@ impl CompactionFilter for PurgedSlotFilter { use rocksdb::CompactionDecision::*; let slot_in_key = C::slot(C::index(key)); - // Refer to a comment about periodic_compaction_seconds, especially regarding implicit - // periodic execution of compaction_filters if slot_in_key >= self.oldest_slot { Keep } else { @@ -1693,7 +1746,7 @@ fn get_cf_options( cf_options.set_disable_auto_compactions(true); } - if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) { + if !disable_auto_compactions && should_enable_cf_compaction(C::NAME) { cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory:: { oldest_slot: oldest_slot.clone(), name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(), @@ -1841,25 +1894,36 @@ fn get_db_options(access_type: &AccessType) -> Options { options } -// Returns whether automatic compactions should be disabled based upon access type +// Returns whether automatic compactions should be disabled for the entire +// database based upon the given access type. fn should_disable_auto_compactions(access_type: &AccessType) -> bool { // Leave automatic compactions enabled (do not disable) in Primary mode; // disable in all other modes to prevent accidental cleaning !matches!(access_type, AccessType::Primary) } -// Returns whether the supplied column (name) should be excluded from compaction -fn should_exclude_from_compaction(cf_name: &str) -> bool { - // List of column families to be excluded from compactions - let no_compaction_cfs: HashSet<&'static str> = vec![ - columns::TransactionStatusIndex::NAME, - columns::ProgramCosts::NAME, - columns::TransactionMemos::NAME, - ] - .into_iter() - .collect(); - - no_compaction_cfs.get(cf_name).is_some() +// Returns whether compactions should be enabled for the given column (name). +fn should_enable_cf_compaction(cf_name: &str) -> bool { + // In order to keep the ledger storage footprint within a desired size, + // LedgerCleanupService removes data in FIFO order by slot. + // + // Several columns do not contain slot in their key. These columns must + // be manually managed to avoid unbounded storage growth. + // + // Columns where slot is the primary index can be efficiently cleaned via + // Database::delete_range_cf() && Database::delete_file_in_range_cf(). + // + // Columns where a slot is part of the key but not the primary index can + // not be range deleted like above. Instead, the individual key/value pairs + // must be iterated over and a decision to keep or discard that pair is + // made. The comparison logic is implemented in PurgedSlotFilter which is + // configured to run as part of rocksdb's automatic compactions. Storage + // space is reclaimed on this class of columns once compaction has + // completed on a given range or file. + matches!( + cf_name, + columns::TransactionStatus::NAME | columns::AddressSignatures::NAME + ) } // Returns true if the column family enables compression. @@ -1942,15 +2006,14 @@ pub mod tests { } #[test] - fn test_should_exclude_from_compaction() { - // currently there are three CFs excluded from compaction: - assert!(should_exclude_from_compaction( - columns::TransactionStatusIndex::NAME - )); - assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME)); - assert!(should_exclude_from_compaction( - columns::TransactionMemos::NAME - )); - assert!(!should_exclude_from_compaction("something else")); + fn test_should_enable_cf_compaction() { + let columns_to_compact = vec![ + columns::TransactionStatus::NAME, + columns::AddressSignatures::NAME, + ]; + columns_to_compact.iter().for_each(|cf_name| { + assert!(should_enable_cf_compaction(cf_name)); + }); + assert!(!should_enable_cf_compaction("something else")); } }