Re-enable periodic compaction on several columns (#32548)
Periodic compaction was previously disabled on all columns in #27571 in favor of the delete_file_in_range() approach that #26651 introduced. However, several columns still rely on periodic compaction to reclaim storage. Namely, the TransactionStatus and AddressSignatures columns, as these columns contain a slot in their key, but as a non-primary index. The result of periodic compaction not running on these columns is that no storage space is being reclaimed from columns. This is obviously bad and would lead to a node eventually running of storage space and crashing. This PR reintroduces periodic compaction, but only for the columns that need it.
This commit is contained in:
parent
80f708298b
commit
d73fa1b590
|
@ -34,7 +34,7 @@ use {
|
||||||
},
|
},
|
||||||
solana_storage_proto::convert::generated,
|
solana_storage_proto::convert::generated,
|
||||||
std::{
|
std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::HashMap,
|
||||||
ffi::{CStr, CString},
|
ffi::{CStr, CString},
|
||||||
fs,
|
fs,
|
||||||
marker::PhantomData,
|
marker::PhantomData,
|
||||||
|
@ -52,6 +52,14 @@ const BLOCKSTORE_METRICS_ERROR: i64 = -1;
|
||||||
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
|
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
|
||||||
const FIFO_WRITE_BUFFER_SIZE: u64 = 2 * MAX_WRITE_BUFFER_SIZE;
|
const FIFO_WRITE_BUFFER_SIZE: u64 = 2 * MAX_WRITE_BUFFER_SIZE;
|
||||||
|
|
||||||
|
// SST files older than this value will be picked up for compaction. This value
|
||||||
|
// was chosen to be one day to strike a balance between storage getting
|
||||||
|
// reclaimed in a timely manner and the additional I/O that compaction incurs.
|
||||||
|
// For more details on this property, see
|
||||||
|
// https://github.com/facebook/rocksdb/blob/749b179c041347d150fa6721992ae8398b7d2b39/
|
||||||
|
// include/rocksdb/advanced_options.h#L908C30-L908C30
|
||||||
|
const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;
|
||||||
|
|
||||||
// Column family for metadata about a leader slot
|
// Column family for metadata about a leader slot
|
||||||
const META_CF: &str = "meta";
|
const META_CF: &str = "meta";
|
||||||
// Column family for slots that have been marked as dead
|
// Column family for slots that have been marked as dead
|
||||||
|
@ -362,9 +370,6 @@ impl Rocks {
|
||||||
fs::create_dir_all(path)?;
|
fs::create_dir_all(path)?;
|
||||||
|
|
||||||
// Use default database options
|
// Use default database options
|
||||||
if should_disable_auto_compactions(&access_type) {
|
|
||||||
info!("Disabling rocksdb's automatic compactions...");
|
|
||||||
}
|
|
||||||
let mut db_options = get_db_options(&access_type);
|
let mut db_options = get_db_options(&access_type);
|
||||||
if let Some(recovery_mode) = recovery_mode {
|
if let Some(recovery_mode) = recovery_mode {
|
||||||
db_options.set_wal_recovery_mode(recovery_mode.into());
|
db_options.set_wal_recovery_mode(recovery_mode.into());
|
||||||
|
@ -408,6 +413,7 @@ impl Rocks {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
db.configure_compaction();
|
||||||
|
|
||||||
Ok(db)
|
Ok(db)
|
||||||
}
|
}
|
||||||
|
@ -471,6 +477,53 @@ impl Rocks {
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Configure compaction on a per-column basis
|
||||||
|
fn configure_compaction(&self) {
|
||||||
|
// If compactions are disabled altogether, no need to tune values
|
||||||
|
if should_disable_auto_compactions(&self.access_type) {
|
||||||
|
info!(
|
||||||
|
"Rocks's automatic compactions are disabled due to {:?} access",
|
||||||
|
self.access_type
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Some columns make use of rocksdb's compaction to help in cleaning
|
||||||
|
// the database. See comments in should_enable_cf_compaction() for more
|
||||||
|
// details on why some columns need compaction and why others do not.
|
||||||
|
//
|
||||||
|
// More specifically, periodic (automatic) compaction is used as
|
||||||
|
// opposed to manual compaction requests on a range.
|
||||||
|
// - Periodic compaction operates on individual files once the file
|
||||||
|
// has reached a certain (configurable) age. See comments at
|
||||||
|
// PERIODIC_COMPACTION_SECONDS for some more deatil.
|
||||||
|
// - Manual compaction operates on a range and could end up propagating
|
||||||
|
// through several files and/or levels of the db.
|
||||||
|
//
|
||||||
|
// Given that data is inserted into the db at a somewhat steady rate,
|
||||||
|
// the age of the individual files will be fairly evently distributed
|
||||||
|
// over time as well. Thus, the I/O to perform cleanup with periodic
|
||||||
|
// compaction is also evenly distributed over time. On the other hand,
|
||||||
|
// a manual compaction spanning a large numbers of files could cause
|
||||||
|
// a sudden burst in I/O. Such a burst could potentially cause a write
|
||||||
|
// stall in addition to negatively impacting other parts of the system.
|
||||||
|
// Thus, the choice to use periodic compactions is fairly easy.
|
||||||
|
for cf_name in Self::columns() {
|
||||||
|
if should_enable_cf_compaction(cf_name) {
|
||||||
|
let cf_handle = self.cf_handle(cf_name);
|
||||||
|
self.db
|
||||||
|
.set_options_cf(
|
||||||
|
&cf_handle,
|
||||||
|
&[(
|
||||||
|
"periodic_compaction_seconds",
|
||||||
|
&PERIODIC_COMPACTION_SECONDS.to_string(),
|
||||||
|
)],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn destroy(path: &Path) -> Result<()> {
|
fn destroy(path: &Path) -> Result<()> {
|
||||||
DB::destroy(&Options::default(), path)?;
|
DB::destroy(&Options::default(), path)?;
|
||||||
|
|
||||||
|
@ -1610,7 +1663,9 @@ impl<'a> WriteBatch<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A CompactionFilter implementation to remove keys older than a given slot.
|
||||||
struct PurgedSlotFilter<C: Column + ColumnName> {
|
struct PurgedSlotFilter<C: Column + ColumnName> {
|
||||||
|
/// The oldest slot to keep; any slot < oldest_slot will be removed
|
||||||
oldest_slot: Slot,
|
oldest_slot: Slot,
|
||||||
name: CString,
|
name: CString,
|
||||||
_phantom: PhantomData<C>,
|
_phantom: PhantomData<C>,
|
||||||
|
@ -1621,8 +1676,6 @@ impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
|
||||||
use rocksdb::CompactionDecision::*;
|
use rocksdb::CompactionDecision::*;
|
||||||
|
|
||||||
let slot_in_key = C::slot(C::index(key));
|
let slot_in_key = C::slot(C::index(key));
|
||||||
// Refer to a comment about periodic_compaction_seconds, especially regarding implicit
|
|
||||||
// periodic execution of compaction_filters
|
|
||||||
if slot_in_key >= self.oldest_slot {
|
if slot_in_key >= self.oldest_slot {
|
||||||
Keep
|
Keep
|
||||||
} else {
|
} else {
|
||||||
|
@ -1693,7 +1746,7 @@ fn get_cf_options<C: 'static + Column + ColumnName>(
|
||||||
cf_options.set_disable_auto_compactions(true);
|
cf_options.set_disable_auto_compactions(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) {
|
if !disable_auto_compactions && should_enable_cf_compaction(C::NAME) {
|
||||||
cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
|
cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
|
||||||
oldest_slot: oldest_slot.clone(),
|
oldest_slot: oldest_slot.clone(),
|
||||||
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
|
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
|
||||||
|
@ -1841,25 +1894,36 @@ fn get_db_options(access_type: &AccessType) -> Options {
|
||||||
options
|
options
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns whether automatic compactions should be disabled based upon access type
|
// Returns whether automatic compactions should be disabled for the entire
|
||||||
|
// database based upon the given access type.
|
||||||
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
|
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
|
||||||
// Leave automatic compactions enabled (do not disable) in Primary mode;
|
// Leave automatic compactions enabled (do not disable) in Primary mode;
|
||||||
// disable in all other modes to prevent accidental cleaning
|
// disable in all other modes to prevent accidental cleaning
|
||||||
!matches!(access_type, AccessType::Primary)
|
!matches!(access_type, AccessType::Primary)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns whether the supplied column (name) should be excluded from compaction
|
// Returns whether compactions should be enabled for the given column (name).
|
||||||
fn should_exclude_from_compaction(cf_name: &str) -> bool {
|
fn should_enable_cf_compaction(cf_name: &str) -> bool {
|
||||||
// List of column families to be excluded from compactions
|
// In order to keep the ledger storage footprint within a desired size,
|
||||||
let no_compaction_cfs: HashSet<&'static str> = vec![
|
// LedgerCleanupService removes data in FIFO order by slot.
|
||||||
columns::TransactionStatusIndex::NAME,
|
//
|
||||||
columns::ProgramCosts::NAME,
|
// Several columns do not contain slot in their key. These columns must
|
||||||
columns::TransactionMemos::NAME,
|
// be manually managed to avoid unbounded storage growth.
|
||||||
]
|
//
|
||||||
.into_iter()
|
// Columns where slot is the primary index can be efficiently cleaned via
|
||||||
.collect();
|
// Database::delete_range_cf() && Database::delete_file_in_range_cf().
|
||||||
|
//
|
||||||
no_compaction_cfs.get(cf_name).is_some()
|
// Columns where a slot is part of the key but not the primary index can
|
||||||
|
// not be range deleted like above. Instead, the individual key/value pairs
|
||||||
|
// must be iterated over and a decision to keep or discard that pair is
|
||||||
|
// made. The comparison logic is implemented in PurgedSlotFilter which is
|
||||||
|
// configured to run as part of rocksdb's automatic compactions. Storage
|
||||||
|
// space is reclaimed on this class of columns once compaction has
|
||||||
|
// completed on a given range or file.
|
||||||
|
matches!(
|
||||||
|
cf_name,
|
||||||
|
columns::TransactionStatus::NAME | columns::AddressSignatures::NAME
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns true if the column family enables compression.
|
// Returns true if the column family enables compression.
|
||||||
|
@ -1942,15 +2006,14 @@ pub mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_should_exclude_from_compaction() {
|
fn test_should_enable_cf_compaction() {
|
||||||
// currently there are three CFs excluded from compaction:
|
let columns_to_compact = vec![
|
||||||
assert!(should_exclude_from_compaction(
|
columns::TransactionStatus::NAME,
|
||||||
columns::TransactionStatusIndex::NAME
|
columns::AddressSignatures::NAME,
|
||||||
));
|
];
|
||||||
assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME));
|
columns_to_compact.iter().for_each(|cf_name| {
|
||||||
assert!(should_exclude_from_compaction(
|
assert!(should_enable_cf_compaction(cf_name));
|
||||||
columns::TransactionMemos::NAME
|
});
|
||||||
));
|
assert!(!should_enable_cf_compaction("something else"));
|
||||||
assert!(!should_exclude_from_compaction("something else"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue