diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 607f9f82bd..b149dda0a8 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -1,15 +1,16 @@ //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage +use rand::{thread_rng, Rng}; use solana_ledger::blockstore::{Blockstore, PurgeType}; use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_measure::measure::Measure; use solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY}; use std::string::ToString; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread; -use std::thread::{Builder, JoinHandle}; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::Duration; // - To try and keep the RocksDB size under 400GB: @@ -35,6 +36,7 @@ const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_ pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, + t_compact: JoinHandle<()>, } impl LedgerCleanupService { @@ -43,6 +45,8 @@ impl LedgerCleanupService { blockstore: Arc, max_ledger_shreds: u64, exit: &Arc, + compaction_interval: Option, + max_compaction_jitter: Option, ) -> Self { info!( "LedgerCleanupService active. Max Ledger Slots {}", @@ -51,9 +55,16 @@ impl LedgerCleanupService { let exit = exit.clone(); let mut last_purge_slot = 0; let mut last_compaction_slot = 0; + let mut compaction_jitter = 0; + let compaction_interval = compaction_interval.unwrap_or(DEFAULT_COMPACTION_SLOT_INTERVAL); + let last_compact_slot = Arc::new(AtomicU64::new(0)); + let last_compact_slot2 = last_compact_slot.clone(); + + let exit_compact = exit.clone(); + let blockstore_compact = blockstore.clone(); let t_cleanup = Builder::new() - .name("solana-ledger-cleanup".to_string()) + .name("sol-led-cleanup".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; @@ -64,8 +75,7 @@ impl LedgerCleanupService { max_ledger_shreds, &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, - &mut last_compaction_slot, - DEFAULT_COMPACTION_SLOT_INTERVAL, + &last_compact_slot, ) { match e { RecvTimeoutError::Disconnected => break, @@ -74,7 +84,29 @@ impl LedgerCleanupService { } }) .unwrap(); - Self { t_cleanup } + + let t_compact = Builder::new() + .name("sol-led-compact".to_string()) + .spawn(move || loop { + if exit_compact.load(Ordering::Relaxed) { + break; + } + Self::compact_ledger( + &blockstore_compact, + &mut last_compaction_slot, + compaction_interval, + &last_compact_slot2, + &mut compaction_jitter, + max_compaction_jitter, + ); + sleep(Duration::from_secs(1)); + }) + .unwrap(); + + Self { + t_cleanup, + t_compact, + } } fn find_slots_to_clean( @@ -138,8 +170,7 @@ impl LedgerCleanupService { max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, - last_compaction_slot: &mut u64, - compaction_interval: u64, + last_compact_slot: &Arc, ) -> Result<(), RecvTimeoutError> { let root = Self::receive_new_roots(new_root_receiver)?; if root - *last_purge_slot <= purge_interval { @@ -148,8 +179,8 @@ impl LedgerCleanupService { let disk_utilization_pre = blockstore.storage_size(); info!( - "purge: last_root={}, last_purge_slot={}, purge_interval={}, last_compaction_slot={}, disk_utilization={:?}", - root, last_purge_slot, purge_interval, last_compaction_slot, disk_utilization_pre + "purge: last_root={}, last_purge_slot={}, purge_interval={}, disk_utilization={:?}", + root, last_purge_slot, purge_interval, disk_utilization_pre ); *last_purge_slot = root; @@ -158,15 +189,10 @@ impl LedgerCleanupService { Self::find_slots_to_clean(&blockstore, root, max_ledger_shreds); if slots_to_clean { - let mut compact_first_slot = std::u64::MAX; - if lowest_cleanup_slot.saturating_sub(*last_compaction_slot) > compaction_interval { - compact_first_slot = *last_compaction_slot; - *last_compaction_slot = lowest_cleanup_slot; - } - let purge_complete = Arc::new(AtomicBool::new(false)); let blockstore = blockstore.clone(); let purge_complete1 = purge_complete.clone(); + let last_compact_slot1 = last_compact_slot.clone(); let _t_purge = Builder::new() .name("solana-ledger-purge".to_string()) .spawn(move || { @@ -188,21 +214,7 @@ impl LedgerCleanupService { purge_time.stop(); info!("{}", purge_time); - if compact_first_slot < lowest_cleanup_slot { - info!( - "compacting data from slots {} to {}", - compact_first_slot, lowest_cleanup_slot - ); - if let Err(err) = - blockstore.compact_storage(compact_first_slot, lowest_cleanup_slot) - { - // This error is not fatal and indicates an internal error? - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - err, compact_first_slot, lowest_cleanup_slot - ); - } - } + last_compact_slot1.store(lowest_cleanup_slot, Ordering::Relaxed); purge_complete1.store(true, Ordering::Relaxed); }) @@ -223,6 +235,39 @@ impl LedgerCleanupService { Ok(()) } + pub fn compact_ledger( + blockstore: &Arc, + last_compaction_slot: &mut u64, + compaction_interval: u64, + highest_compact_slot: &Arc, + compaction_jitter: &mut u64, + max_jitter: Option, + ) { + let highest_compaction_slot = highest_compact_slot.load(Ordering::Relaxed); + if highest_compaction_slot.saturating_sub(*last_compaction_slot) + > (compaction_interval + *compaction_jitter) + { + info!( + "compacting data from slots {} to {}", + *last_compaction_slot, highest_compaction_slot, + ); + if let Err(err) = + blockstore.compact_storage(*last_compaction_slot, highest_compaction_slot) + { + // This error is not fatal and indicates an internal error? + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + err, last_compaction_slot, highest_compaction_slot, + ); + } + *last_compaction_slot = highest_compaction_slot; + let jitter = max_jitter.unwrap_or(0); + if jitter > 0 { + *compaction_jitter = thread_rng().gen_range(0, jitter); + } + } + } + fn report_disk_metrics( pre: BlockstoreResult, post: BlockstoreResult, @@ -240,7 +285,8 @@ impl LedgerCleanupService { } pub fn join(self) -> thread::Result<()> { - self.t_cleanup.join() + self.t_cleanup.join()?; + self.t_compact.join() } } #[cfg(test)] @@ -251,7 +297,7 @@ mod tests { use std::sync::mpsc::channel; #[test] - fn test_cleanup() { + fn test_cleanup1() { solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); let blockstore = Blockstore::open(&blockstore_path).unwrap(); @@ -262,7 +308,7 @@ mod tests { //send a signal to kill all but 5 shreds, which will be in the newest slots let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; + let highest_compaction_slot = Arc::new(AtomicU64::new(0)); sender.send(50).unwrap(); LedgerCleanupService::cleanup_ledger( &receiver, @@ -270,10 +316,11 @@ mod tests { 5, &mut last_purge_slot, 10, - &mut last_compaction_slot, - 10, + &highest_compaction_slot, ) .unwrap(); + assert_eq!(last_purge_slot, 50); + assert_eq!(highest_compaction_slot.load(Ordering::Relaxed), 44); //check that 0-40 don't exist blockstore @@ -281,6 +328,18 @@ mod tests { .unwrap() .for_each(|(slot, _)| assert!(slot > 40)); + let mut last_compaction_slot = 0; + let mut jitter = 0; + LedgerCleanupService::compact_ledger( + &blockstore, + &mut last_compaction_slot, + 10, + &highest_compaction_slot, + &mut jitter, + None, + ); + assert_eq!(jitter, 0); + drop(blockstore); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } @@ -303,7 +362,7 @@ mod tests { info!("{}", first_insert); let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; + let last_compaction_slot = Arc::new(AtomicU64::new(0)); let mut slot = initial_slots; let mut num_slots = 6; for _ in 0..5 { @@ -327,8 +386,7 @@ mod tests { initial_slots, &mut last_purge_slot, 10, - &mut last_compaction_slot, - 10, + &last_compaction_slot, ) .unwrap(); time.stop(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4b471f4a55..b6ab5c557e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -80,6 +80,8 @@ pub struct TvuConfig { pub accounts_hash_fault_injection_slots: u64, pub accounts_db_caching_enabled: bool, pub test_hash_calculation: bool, + pub rocksdb_compaction_interval: Option, + pub rocksdb_max_compaction_jitter: Option, } impl Tvu { @@ -151,6 +153,8 @@ impl Tvu { let cluster_slots = Arc::new(ClusterSlots::default()); let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); + let compaction_interval = tvu_config.rocksdb_compaction_interval; + let max_compaction_jitter = tvu_config.rocksdb_max_compaction_jitter; let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -267,6 +271,8 @@ impl Tvu { blockstore.clone(), max_ledger_shreds, &exit, + compaction_interval, + max_compaction_jitter, ) }); diff --git a/core/src/validator.rs b/core/src/validator.rs index 11db515fe9..5bbd6c6c99 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -104,6 +104,8 @@ pub struct ValidatorConfig { pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub frozen_accounts: Vec, pub no_rocksdb_compaction: bool, + pub rocksdb_compaction_interval: Option, + pub rocksdb_max_compaction_jitter: Option, pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, pub wal_recovery_mode: Option, @@ -152,6 +154,8 @@ impl Default for ValidatorConfig { accounts_hash_fault_injection_slots: 0, frozen_accounts: vec![], no_rocksdb_compaction: false, + rocksdb_compaction_interval: None, + rocksdb_max_compaction_jitter: None, accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, wal_recovery_mode: None, @@ -644,6 +648,8 @@ impl Validator { accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, accounts_db_caching_enabled: config.accounts_db_caching_enabled, test_hash_calculation: config.accounts_db_test_hash_calculation, + rocksdb_compaction_interval: config.rocksdb_compaction_interval, + rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, }, ); diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index 8d314541b1..c0977e2515 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -8,7 +8,7 @@ mod tests { use solana_ledger::shred::Shred; use std::collections::VecDeque; use std::str::FromStr; - use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; @@ -223,8 +223,14 @@ mod tests { let (sender, receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let cleaner = - LedgerCleanupService::new(receiver, blockstore.clone(), max_ledger_shreds, &exit); + let cleaner = LedgerCleanupService::new( + receiver, + blockstore.clone(), + max_ledger_shreds, + &exit, + None, + None, + ); let exit_cpu = Arc::new(AtomicBool::new(false)); let sys = CpuStatsUpdater::new(&exit_cpu); @@ -375,18 +381,28 @@ mod tests { let (sender, receiver) = channel(); sender.send(n).unwrap(); let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; + let highest_compact_slot = Arc::new(AtomicU64::new(0)); LedgerCleanupService::cleanup_ledger( &receiver, &blockstore, max_ledger_shreds, &mut last_purge_slot, 10, - &mut last_compaction_slot, - 10, + &highest_compact_slot, ) .unwrap(); + let mut compaction_jitter = 0; + let mut last_compaction_slot = 0; + LedgerCleanupService::compact_ledger( + &blockstore, + &mut last_compaction_slot, + 10, + &highest_compact_slot, + &mut compaction_jitter, + None, + ); + thread::sleep(Duration::from_secs(2)); let u2 = blockstore.storage_size().unwrap() as f64; diff --git a/validator/src/main.rs b/validator/src/main.rs index a1dc21a20d..fbd5499c5d 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1235,6 +1235,20 @@ pub fn main() { .takes_value(false) .help("Disable manual compaction of the ledger database. May increase storage requirements.") ) + .arg( + Arg::with_name("rocksdb_compaction_interval") + .long("rocksdb-compaction-interval-slots") + .value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS") + .takes_value(true) + .help("Number of slots between compacting ledger"), + ) + .arg( + Arg::with_name("rocksdb_max_compaction_jitter") + .long("rocksdb-max-compaction-jitter-slots") + .value_name("ROCKSDB_MAX_COMPACTION_JITTER_SLOTS") + .takes_value(true) + .help("Introduce jitter into the compaction to offset compaction operation"), + ) .arg( Arg::with_name("bind_address") .long("bind-address") @@ -1486,6 +1500,9 @@ pub fn main() { let private_rpc = matches.is_present("private_rpc"); let no_port_check = matches.is_present("no_port_check"); let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction"); + let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok(); + let rocksdb_max_compaction_jitter = + value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok(); let wal_recovery_mode = matches .value_of("wal_recovery_mode") .map(BlockstoreRecoveryMode::from); @@ -1620,6 +1637,8 @@ pub fn main() { gossip_validators, frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), no_rocksdb_compaction, + rocksdb_compaction_interval, + rocksdb_max_compaction_jitter, wal_recovery_mode, poh_verify: !matches.is_present("skip_poh_verify"), debug_keys,