From aa6832964c31b62aff541a3b09575f5bc71f8eaa Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 4 Jun 2020 21:06:06 -0700 Subject: [PATCH] ledger_cleanup_service: compact at a slower rate than purging (#10414) --- core/src/ledger_cleanup_service.rs | 59 +++++++++++++++++++---- core/tests/ledger_cleanup.rs | 7 ++- ledger/src/blockstore/blockstore_purge.rs | 24 ++++----- 3 files changed, 67 insertions(+), 23 deletions(-) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index ffcb7facb5..9b111e2e44 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -3,7 +3,7 @@ use solana_ledger::blockstore::{Blockstore, PurgeType}; use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_measure::measure::Measure; -use solana_sdk::clock::Slot; +use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY}; use std::string::ToString; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; @@ -32,6 +32,10 @@ pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; // Delay between purges to cooperate with other blockstore users pub const DEFAULT_DELAY_BETWEEN_PURGES: Duration = Duration::from_millis(500); +// Compacting at a slower interval than purging helps keep IOPS down. +// Once a day should be ample +const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT; + pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, } @@ -49,6 +53,8 @@ impl LedgerCleanupService { ); let exit = exit.clone(); let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; + let t_cleanup = Builder::new() .name("solana-ledger-cleanup".to_string()) .spawn(move || loop { @@ -62,6 +68,8 @@ impl LedgerCleanupService { &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, Some(DEFAULT_DELAY_BETWEEN_PURGES), + &mut last_compaction_slot, + DEFAULT_COMPACTION_SLOT_INTERVAL, ) { match e { RecvTimeoutError::Disconnected => break, @@ -116,7 +124,7 @@ impl LedgerCleanupService { } } - (true, lowest_cleanup_slot, first_slot, total_shreds) + (true, first_slot, lowest_cleanup_slot, total_shreds) } fn receive_new_roots(new_root_receiver: &Receiver) -> Result { @@ -135,6 +143,8 @@ impl LedgerCleanupService { last_purge_slot: &mut u64, purge_interval: u64, delay_between_purges: Option, + last_compaction_slot: &mut u64, + compaction_interval: u64, ) -> Result<(), RecvTimeoutError> { let root = Self::receive_new_roots(new_root_receiver)?; if root - *last_purge_slot <= purge_interval { @@ -143,19 +153,20 @@ impl LedgerCleanupService { let disk_utilization_pre = blockstore.storage_size(); info!( - "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", - root, last_purge_slot, purge_interval, disk_utilization_pre + "purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}", + root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre ); *last_purge_slot = root; - let (slots_to_clean, lowest_cleanup_slot, first_slot, total_shreds) = + let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) = Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds); if slots_to_clean { - info!( - "purging data from slots {} to {}", - first_slot, lowest_cleanup_slot - ); + let mut compact_first_slot = std::u64::MAX; + if lowest_cleanup_slot.saturating_sub(*last_compaction_slot) > compaction_interval { + compact_first_slot = *last_compaction_slot; + *last_compaction_slot = lowest_cleanup_slot; + } let purge_complete = Arc::new(AtomicBool::new(false)); let blockstore = blockstore.clone(); @@ -167,15 +178,37 @@ impl LedgerCleanupService { *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; slot_update_time.stop(); + info!( + "purging data from slots {} to {}", + purge_first_slot, lowest_cleanup_slot + ); + let mut purge_time = Measure::start("purge_slots_with_delay"); blockstore.purge_slots_with_delay( - first_slot, + purge_first_slot, lowest_cleanup_slot, delay_between_purges, PurgeType::PrimaryIndex, ); purge_time.stop(); info!("{}", purge_time); + + if compact_first_slot < lowest_cleanup_slot { + info!( + "compacting data from slots {} to {}", + compact_first_slot, lowest_cleanup_slot + ); + if let Err(err) = + blockstore.compact_storage(compact_first_slot, lowest_cleanup_slot) + { + // This error is not fatal and indicates an internal error? + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + err, compact_first_slot, lowest_cleanup_slot + ); + } + } + purge_complete1.store(true, Ordering::Relaxed); }) .unwrap(); @@ -234,6 +267,7 @@ mod tests { //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; sender.send(50).unwrap(); LedgerCleanupService::cleanup_ledger( &receiver, @@ -242,6 +276,8 @@ mod tests { &mut last_purge_slot, 10, None, + &mut last_compaction_slot, + 10, ) .unwrap(); @@ -273,6 +309,7 @@ mod tests { info!("{}", first_insert); let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; let mut slot = initial_slots; let mut num_slots = 6; for _ in 0..5 { @@ -297,6 +334,8 @@ mod tests { &mut last_purge_slot, 10, None, + &mut last_compaction_slot, + 10, ) .unwrap(); time.stop(); diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index d7f8a2ad06..ab0029b2fe 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -374,14 +374,17 @@ mod tests { // send signal to cleanup slots let (sender, receiver) = channel(); sender.send(n).unwrap(); - let mut next_purge_batch = 0; + let mut last_purge_slot = 0; + let mut last_compaction_slot = 0; LedgerCleanupService::cleanup_ledger( &receiver, &blockstore, max_ledger_shreds, - &mut next_purge_batch, + &mut last_purge_slot, 10, None, + &mut last_compaction_slot, + 10, ) .unwrap(); diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index 7e705195f9..bfef9aa6dc 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -35,20 +35,18 @@ impl Blockstore { } } } - - if !self.no_compaction { - if let Err(e) = self.compact_storage(from_slot, to_slot) { - // This error is not fatal and indicates an internal error - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - e, from_slot, to_slot - ); - } - } } + // TODO: rename purge_slots() to purge_and_compact_slots() pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) { - self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact) + self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact); + if let Err(e) = self.compact_storage(from_slot, to_slot) { + // This error is not fatal and indicates an internal error? + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + e, from_slot, to_slot + ); + } } /// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the @@ -169,6 +167,10 @@ impl Blockstore { } pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { + if self.no_compaction { + info!("compact_storage: compaction disabled"); + return Ok(false); + } info!("compact_storage: from {} to {}", from_slot, to_slot); let mut compact_timer = Measure::start("compact_range"); let result = self