Delete files older than the lowest_cleanup_slot in LedgerCleanupService::cleanup_ledger (#26651)

#### Problem
LedgerCleanupService requires compactions to propagate & digest range-delete tombstones
to eventually reclaim disk space.

#### Summary of Changes
This PR makes LedgerCleanupService::cleanup_ledger delete any file whose slot-range is
older than the lowest_cleanup_slot.  This allows us to reclaim disk space more often with
fewer IOps.  Experimental results on mainnet validators show that the PR can effectively
reduce 33% to 40% ledger disk size.
This commit is contained in:
Yueh-Hsuan Chiang 2022-08-09 00:48:06 +08:00 committed by GitHub
parent 9b54b15016
commit 99ef2184cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 126 additions and 23 deletions

View File

@ -124,28 +124,25 @@ impl LedgerCleanupService {
/// A helper function to `cleanup_ledger` which returns a tuple of the /// A helper function to `cleanup_ledger` which returns a tuple of the
/// following four elements suggesting whether to clean up the ledger: /// following four elements suggesting whether to clean up the ledger:
/// ///
/// Return value (bool, Slot, Slot, u64): /// Return value (bool, Slot, u64):
/// - `slots_to_clean` (bool): a boolean value indicating whether there /// - `slots_to_clean` (bool): a boolean value indicating whether there
/// are any slots to clean. If true, then `cleanup_ledger` function /// are any slots to clean. If true, then `cleanup_ledger` function
/// will then proceed with the ledger cleanup. /// will then proceed with the ledger cleanup.
/// - `first_slot_to_purge` (Slot): the first slot to purge. /// - `lowest_slot_to_purge` (Slot): the lowest slot to purge. Any
/// - `lowest_slot_to_puerge` (Slot): the lowest slot to purge. Together /// slot which is older or equal to `lowest_slot_to_purge` will be
/// with `first_slot_to_purge`, the two Slot values represent the /// cleaned up.
/// range of the clean up.
/// - `total_shreds` (u64): the total estimated number of shreds before the /// - `total_shreds` (u64): the total estimated number of shreds before the
/// `root`. /// `root`.
fn find_slots_to_clean( fn find_slots_to_clean(
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
root: Slot, root: Slot,
max_ledger_shreds: u64, max_ledger_shreds: u64,
) -> (bool, Slot, Slot, u64) { ) -> (bool, Slot, u64) {
let mut total_slots = Vec::new(); let mut total_slots = Vec::new();
let mut iterate_time = Measure::start("iterate_time"); let mut iterate_time = Measure::start("iterate_time");
let mut total_shreds = 0; let mut total_shreds = 0;
let mut first_slot = 0;
for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() { for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() {
if i == 0 { if i == 0 {
first_slot = slot;
debug!("purge: searching from slot: {}", slot); debug!("purge: searching from slot: {}", slot);
} }
// Not exact since non-full slots will have holes // Not exact since non-full slots will have holes
@ -157,15 +154,14 @@ impl LedgerCleanupService {
} }
iterate_time.stop(); iterate_time.stop();
info!( info!(
"first_slot={} total_slots={} total_shreds={} max_ledger_shreds={}, {}", "total_slots={} total_shreds={} max_ledger_shreds={}, {}",
first_slot,
total_slots.len(), total_slots.len(),
total_shreds, total_shreds,
max_ledger_shreds, max_ledger_shreds,
iterate_time iterate_time
); );
if (total_shreds as u64) < max_ledger_shreds { if (total_shreds as u64) < max_ledger_shreds {
return (false, 0, 0, total_shreds); return (false, 0, total_shreds);
} }
let mut num_shreds_to_clean = 0; let mut num_shreds_to_clean = 0;
let mut lowest_cleanup_slot = total_slots[0].0; let mut lowest_cleanup_slot = total_slots[0].0;
@ -177,7 +173,7 @@ impl LedgerCleanupService {
} }
} }
(true, first_slot, lowest_cleanup_slot, total_shreds) (true, lowest_cleanup_slot, total_shreds)
} }
fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> { fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
@ -233,7 +229,7 @@ impl LedgerCleanupService {
*last_purge_slot = root; *last_purge_slot = root;
let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) = let (slots_to_clean, lowest_cleanup_slot, total_shreds) =
Self::find_slots_to_clean(blockstore, root, max_ledger_shreds); Self::find_slots_to_clean(blockstore, root, max_ledger_shreds);
if slots_to_clean { if slots_to_clean {
@ -248,18 +244,12 @@ impl LedgerCleanupService {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
slot_update_time.stop(); slot_update_time.stop();
info!( info!("purging data older than {}", lowest_cleanup_slot);
"purging data from slots {} to {}",
purge_first_slot, lowest_cleanup_slot
);
let mut purge_time = Measure::start("purge_slots"); let mut purge_time = Measure::start("purge_slots");
blockstore.purge_slots( // purge any slots older than lowest_cleanup_slot.
purge_first_slot, blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter);
lowest_cleanup_slot,
PurgeType::CompactionFilter,
);
// Update only after purge operation. // Update only after purge operation.
// Safety: This value can be used by compaction_filters shared via Arc<AtomicU64>. // Safety: This value can be used by compaction_filters shared via Arc<AtomicU64>.
// Compactions are async and run as a multi-threaded background job. However, this // Compactions are async and run as a multi-threaded background job. However, this

View File

@ -4,6 +4,7 @@ use {super::*, solana_sdk::message::AccountKeys, std::time::Instant};
pub struct PurgeStats { pub struct PurgeStats {
delete_range: u64, delete_range: u64,
write_batch: u64, write_batch: u64,
delete_files_in_range: u64,
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -46,7 +47,12 @@ impl Blockstore {
("from_slot", from_slot as i64, i64), ("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64), ("to_slot", to_slot as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64), ("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64) ("write_batch_us", purge_stats.write_batch as i64, i64),
(
"delete_files_in_range_us",
purge_stats.write_batch as i64,
i64
)
); );
if let Err(e) = purge_result { if let Err(e) = purge_result {
error!( error!(
@ -141,6 +147,9 @@ impl Blockstore {
/// A helper function to `purge_slots` that executes the ledger clean up /// A helper function to `purge_slots` that executes the ledger clean up
/// from `from_slot` to `to_slot`. /// from `from_slot` to `to_slot`.
///
/// When `from_slot` is 0, any sst-file with a key-range completely older
/// than `to_slot` will also be deleted.
pub(crate) fn run_purge_with_stats( pub(crate) fn run_purge_with_stats(
&self, &self,
from_slot: Slot, from_slot: Slot,
@ -239,6 +248,7 @@ impl Blockstore {
} }
} }
delete_range_timer.stop(); delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch"); let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) { if let Err(e) = self.db.write(write_batch) {
error!( error!(
@ -248,8 +258,28 @@ impl Blockstore {
return Err(e); return Err(e);
} }
write_timer.stop(); write_timer.stop();
let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
// purge_files_in_range delete any files whose slot range is within
// [from_slot, to_slot]. When from_slot is 0, it is safe to run
// purge_files_in_range because if purge_files_in_range deletes any
// sst file that contains any range-deletion tombstone, the deletion
// range of that tombstone will be completely covered by the new
// range-delete tombstone (0, to_slot) issued above.
//
// On the other hand, purge_files_in_range is more effective and
// efficient than the compaction filter (which runs key-by-key)
// because all the sst files that have key range below to_slot
// can be deleted immediately.
if columns_purged && from_slot == 0 {
self.purge_files_in_range(from_slot, to_slot);
}
purge_files_in_range_timer.stop();
purge_stats.delete_range += delete_range_timer.as_us(); purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us(); purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();
// only drop w_active_transaction_status_index after we do db.write(write_batch); // only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between // otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents // self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
@ -257,6 +287,68 @@ impl Blockstore {
Ok(columns_purged) Ok(columns_purged)
} }
fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool {
self.db
.delete_file_in_range_cf::<cf::SlotMeta>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::BankHash>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Root>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ShredData>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ShredCode>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::DeadSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::DuplicateSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ErasureMeta>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Orphans>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Index>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Rewards>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Blocktime>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::PerfSamples>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::BlockHeight>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::OptimisticSlots>(from_slot, to_slot)
.is_ok()
}
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> { pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
if self.no_compaction { if self.no_compaction {
info!("compact_storage: compaction disabled"); info!("compact_storage: compaction disabled");

View File

@ -455,6 +455,16 @@ impl Rocks {
Ok(()) Ok(())
} }
fn delete_file_in_range_cf(
&self,
cf: &ColumnFamily,
from_key: &[u8],
to_key: &[u8],
) -> Result<()> {
self.db.delete_file_in_range_cf(cf, from_key, to_key)?;
Ok(())
}
fn iterator_cf<C>(&self, cf: &ColumnFamily, iterator_mode: IteratorMode<C::Index>) -> DBIterator fn iterator_cf<C>(&self, cf: &ColumnFamily, iterator_mode: IteratorMode<C::Index>) -> DBIterator
where where
C: Column, C: Column,
@ -1117,6 +1127,17 @@ impl Database {
batch.delete_range_cf::<C>(cf, from_index, to_index) batch.delete_range_cf::<C>(cf, from_index, to_index)
} }
pub fn delete_file_in_range_cf<C>(&self, from: Slot, to: Slot) -> Result<()>
where
C: Column + ColumnName,
{
self.backend.delete_file_in_range_cf(
self.cf_handle::<C>(),
&C::key(C::as_index(from)),
&C::key(C::as_index(to)),
)
}
pub fn is_primary_access(&self) -> bool { pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access() self.backend.is_primary_access()
} }