From 99ef2184cc3e428e1e09a4493bf72cacc060d6aa Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com> Date: Tue, 9 Aug 2022 00:48:06 +0800 Subject: [PATCH] Delete files older than the lowest_cleanup_slot in LedgerCleanupService::cleanup_ledger (#26651) #### Problem LedgerCleanupService requires compactions to propagate & digest range-delete tombstones to eventually reclaim disk space. #### Summary of Changes This PR makes LedgerCleanupService::cleanup_ledger delete any file whose slot-range is older than the lowest_cleanup_slot. This allows us to reclaim disk space more often with fewer IOps. Experimental results on mainnet validators show that the PR can effectively reduce 33% to 40% ledger disk size. --- core/src/ledger_cleanup_service.rs | 34 +++----- ledger/src/blockstore/blockstore_purge.rs | 94 ++++++++++++++++++++++- ledger/src/blockstore_db.rs | 21 +++++ 3 files changed, 126 insertions(+), 23 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 6607ba8ed..5c006c826 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -124,28 +124,25 @@ impl LedgerCleanupService { /// A helper function to `cleanup_ledger` which returns a tuple of the /// following four elements suggesting whether to clean up the ledger: /// - /// Return value (bool, Slot, Slot, u64): + /// Return value (bool, Slot, u64): /// - `slots_to_clean` (bool): a boolean value indicating whether there /// are any slots to clean. If true, then `cleanup_ledger` function /// will then proceed with the ledger cleanup. - /// - `first_slot_to_purge` (Slot): the first slot to purge. - /// - `lowest_slot_to_puerge` (Slot): the lowest slot to purge. Together - /// with `first_slot_to_purge`, the two Slot values represent the - /// range of the clean up. + /// - `lowest_slot_to_purge` (Slot): the lowest slot to purge. Any + /// slot which is older or equal to `lowest_slot_to_purge` will be + /// cleaned up. /// - `total_shreds` (u64): the total estimated number of shreds before the /// `root`. fn find_slots_to_clean( blockstore: &Arc, root: Slot, max_ledger_shreds: u64, - ) -> (bool, Slot, Slot, u64) { + ) -> (bool, Slot, u64) { let mut total_slots = Vec::new(); let mut iterate_time = Measure::start("iterate_time"); let mut total_shreds = 0; - let mut first_slot = 0; for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() { if i == 0 { - first_slot = slot; debug!("purge: searching from slot: {}", slot); } // Not exact since non-full slots will have holes @@ -157,15 +154,14 @@ impl LedgerCleanupService { } iterate_time.stop(); info!( - "first_slot={} total_slots={} total_shreds={} max_ledger_shreds={}, {}", - first_slot, + "total_slots={} total_shreds={} max_ledger_shreds={}, {}", total_slots.len(), total_shreds, max_ledger_shreds, iterate_time ); if (total_shreds as u64) < max_ledger_shreds { - return (false, 0, 0, total_shreds); + return (false, 0, total_shreds); } let mut num_shreds_to_clean = 0; let mut lowest_cleanup_slot = total_slots[0].0; @@ -177,7 +173,7 @@ impl LedgerCleanupService { } } - (true, first_slot, lowest_cleanup_slot, total_shreds) + (true, lowest_cleanup_slot, total_shreds) } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { @@ -233,7 +229,7 @@ impl LedgerCleanupService { *last_purge_slot = root; - let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) = + let (slots_to_clean, lowest_cleanup_slot, total_shreds) = Self::find_slots_to_clean(blockstore, root, max_ledger_shreds); if slots_to_clean { @@ -248,18 +244,12 @@ impl LedgerCleanupService { *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; slot_update_time.stop(); - info!( - "purging data from slots {} to {}", - purge_first_slot, lowest_cleanup_slot - ); + info!("purging data older than {}", lowest_cleanup_slot); let mut purge_time = Measure::start("purge_slots"); - blockstore.purge_slots( - purge_first_slot, - lowest_cleanup_slot, - PurgeType::CompactionFilter, - ); + // purge any slots older than lowest_cleanup_slot. + blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter); // Update only after purge operation. // Safety: This value can be used by compaction_filters shared via Arc. // Compactions are async and run as a multi-threaded background job. However, this diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 640b338f2..1e79e4b81 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -4,6 +4,7 @@ use {super::*, solana_sdk::message::AccountKeys, std::time::Instant}; pub struct PurgeStats { delete_range: u64, write_batch: u64, + delete_files_in_range: u64, } #[derive(Clone, Copy)] @@ -46,7 +47,12 @@ impl Blockstore { ("from_slot", from_slot as i64, i64), ("to_slot", to_slot as i64, i64), ("delete_range_us", purge_stats.delete_range as i64, i64), - ("write_batch_us", purge_stats.write_batch as i64, i64) + ("write_batch_us", purge_stats.write_batch as i64, i64), + ( + "delete_files_in_range_us", + purge_stats.write_batch as i64, + i64 + ) ); if let Err(e) = purge_result { error!( @@ -141,6 +147,9 @@ impl Blockstore { /// A helper function to `purge_slots` that executes the ledger clean up /// from `from_slot` to `to_slot`. + /// + /// When `from_slot` is 0, any sst-file with a key-range completely older + /// than `to_slot` will also be deleted. pub(crate) fn run_purge_with_stats( &self, from_slot: Slot, @@ -239,6 +248,7 @@ impl Blockstore { } } delete_range_timer.stop(); + let mut write_timer = Measure::start("write_batch"); if let Err(e) = self.db.write(write_batch) { error!( @@ -248,8 +258,28 @@ impl Blockstore { return Err(e); } write_timer.stop(); + + let mut purge_files_in_range_timer = Measure::start("delete_file_in_range"); + // purge_files_in_range delete any files whose slot range is within + // [from_slot, to_slot]. When from_slot is 0, it is safe to run + // purge_files_in_range because if purge_files_in_range deletes any + // sst file that contains any range-deletion tombstone, the deletion + // range of that tombstone will be completely covered by the new + // range-delete tombstone (0, to_slot) issued above. + // + // On the other hand, purge_files_in_range is more effective and + // efficient than the compaction filter (which runs key-by-key) + // because all the sst files that have key range below to_slot + // can be deleted immediately. + if columns_purged && from_slot == 0 { + self.purge_files_in_range(from_slot, to_slot); + } + purge_files_in_range_timer.stop(); + purge_stats.delete_range += delete_range_timer.as_us(); purge_stats.write_batch += write_timer.as_us(); + purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us(); + // only drop w_active_transaction_status_index after we do db.write(write_batch); // otherwise, readers might be confused with inconsistent state between // self.active_transaction_status_index and RockDb's TransactionStatusIndex contents @@ -257,6 +287,68 @@ impl Blockstore { Ok(columns_purged) } + fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool { + self.db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + & self + .db + .delete_file_in_range_cf::(from_slot, to_slot) + .is_ok() + } + pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { if self.no_compaction { info!("compact_storage: compaction disabled"); diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 94064df7a..10ccf3f94 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -455,6 +455,16 @@ impl Rocks { Ok(()) } + fn delete_file_in_range_cf( + &self, + cf: &ColumnFamily, + from_key: &[u8], + to_key: &[u8], + ) -> Result<()> { + self.db.delete_file_in_range_cf(cf, from_key, to_key)?; + Ok(()) + } + fn iterator_cf(&self, cf: &ColumnFamily, iterator_mode: IteratorMode) -> DBIterator where C: Column, @@ -1117,6 +1127,17 @@ impl Database { batch.delete_range_cf::(cf, from_index, to_index) } + pub fn delete_file_in_range_cf(&self, from: Slot, to: Slot) -> Result<()> + where + C: Column + ColumnName, + { + self.backend.delete_file_in_range_cf( + self.cf_handle::(), + &C::key(C::as_index(from)), + &C::key(C::as_index(to)), + ) + } + pub fn is_primary_access(&self) -> bool { self.backend.is_primary_access() }