Remove Blockstore manual compaction code (#28409)

The manual Blockstore compaction that was being initiated from
LedgerCleanupService has been disabled for quite some time in favor of
several optimizations.

Co-authored-by: Ryo Onodera <ryoqun@gmail.com>
This commit is contained in:
steviez 2022-10-28 10:39:00 +02:00 committed by GitHub
parent 2579c0fcb5
commit 2272fd807e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 34 additions and 412 deletions

View File

@ -6,20 +6,19 @@
use { use {
crossbeam_channel::{Receiver, RecvTimeoutError}, crossbeam_channel::{Receiver, RecvTimeoutError},
rand::{thread_rng, Rng},
solana_ledger::{ solana_ledger::{
blockstore::{Blockstore, PurgeType}, blockstore::{Blockstore, PurgeType},
blockstore_db::Result as BlockstoreResult, blockstore_db::Result as BlockstoreResult,
}, },
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY}, solana_sdk::clock::Slot,
std::{ std::{
string::ToString, string::ToString,
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, Ordering},
Arc, Arc,
}, },
thread::{self, sleep, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
}, },
}; };
@ -41,13 +40,8 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
// and starve other blockstore users. // and starve other blockstore users.
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;
// Compacting at a slower interval than purging helps keep IOPS down.
// Once a day should be ample
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;
pub struct LedgerCleanupService { pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>, t_cleanup: JoinHandle<()>,
t_compact: JoinHandle<()>,
} }
impl LedgerCleanupService { impl LedgerCleanupService {
@ -56,25 +50,15 @@ impl LedgerCleanupService {
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
max_ledger_shreds: u64, max_ledger_shreds: u64,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
compaction_interval: Option<u64>,
max_compaction_jitter: Option<u64>,
) -> Self { ) -> Self {
let exit = exit.clone(); let exit = exit.clone();
let mut last_purge_slot = 0; let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;
let mut compaction_jitter = 0;
let compaction_interval = compaction_interval.unwrap_or(DEFAULT_COMPACTION_SLOT_INTERVAL);
let last_compact_slot = Arc::new(AtomicU64::new(0));
let last_compact_slot2 = last_compact_slot.clone();
info!( info!(
"LedgerCleanupService active. max ledger shreds={}, compaction interval={}", "LedgerCleanupService active. max ledger shreds={}",
max_ledger_shreds, compaction_interval, max_ledger_shreds
); );
let exit_compact = exit.clone();
let blockstore_compact = blockstore.clone();
let t_cleanup = Builder::new() let t_cleanup = Builder::new()
.name("solLedgerClean".to_string()) .name("solLedgerClean".to_string())
.spawn(move || loop { .spawn(move || loop {
@ -87,7 +71,6 @@ impl LedgerCleanupService {
max_ledger_shreds, max_ledger_shreds,
&mut last_purge_slot, &mut last_purge_slot,
DEFAULT_PURGE_SLOT_INTERVAL, DEFAULT_PURGE_SLOT_INTERVAL,
&last_compact_slot,
) { ) {
match e { match e {
RecvTimeoutError::Disconnected => break, RecvTimeoutError::Disconnected => break,
@ -97,28 +80,7 @@ impl LedgerCleanupService {
}) })
.unwrap(); .unwrap();
let t_compact = Builder::new() Self { t_cleanup }
.name("solLedgerComp".to_string())
.spawn(move || loop {
if exit_compact.load(Ordering::Relaxed) {
break;
}
Self::compact_ledger(
&blockstore_compact,
&mut last_compaction_slot,
compaction_interval,
&last_compact_slot2,
&mut compaction_jitter,
max_compaction_jitter,
);
sleep(Duration::from_secs(1));
})
.unwrap();
Self {
t_cleanup,
t_compact,
}
} }
/// A helper function to `cleanup_ledger` which returns a tuple of the /// A helper function to `cleanup_ledger` which returns a tuple of the
@ -202,10 +164,6 @@ impl LedgerCleanupService {
/// `last_purge_slot` is fewer than `purge_interval`, the function will /// `last_purge_slot` is fewer than `purge_interval`, the function will
/// simply return `Ok` without actually running the ledger cleanup. /// simply return `Ok` without actually running the ledger cleanup.
/// In this case, `purge_interval` will remain unchanged. /// In this case, `purge_interval` will remain unchanged.
/// - `last_compact_slot`: an output value which indicates the most recent
/// slot which has been cleaned up after this call. If this parameter is
/// updated after this function call, it means the ledger cleanup has
/// been performed.
/// ///
/// Also see `blockstore::purge_slot`. /// Also see `blockstore::purge_slot`.
pub fn cleanup_ledger( pub fn cleanup_ledger(
@ -214,7 +172,6 @@ impl LedgerCleanupService {
max_ledger_shreds: u64, max_ledger_shreds: u64,
last_purge_slot: &mut u64, last_purge_slot: &mut u64,
purge_interval: u64, purge_interval: u64,
last_compact_slot: &Arc<AtomicU64>,
) -> Result<(), RecvTimeoutError> { ) -> Result<(), RecvTimeoutError> {
let root = Self::receive_new_roots(new_root_receiver)?; let root = Self::receive_new_roots(new_root_receiver)?;
if root - *last_purge_slot <= purge_interval { if root - *last_purge_slot <= purge_interval {
@ -236,7 +193,6 @@ impl LedgerCleanupService {
let purge_complete = Arc::new(AtomicBool::new(false)); let purge_complete = Arc::new(AtomicBool::new(false));
let blockstore = blockstore.clone(); let blockstore = blockstore.clone();
let purge_complete1 = purge_complete.clone(); let purge_complete1 = purge_complete.clone();
let last_compact_slot1 = last_compact_slot.clone();
let _t_purge = Builder::new() let _t_purge = Builder::new()
.name("solLedgerPurge".to_string()) .name("solLedgerPurge".to_string())
.spawn(move || { .spawn(move || {
@ -266,8 +222,6 @@ impl LedgerCleanupService {
purge_time.stop(); purge_time.stop();
info!("{}", purge_time); info!("{}", purge_time);
last_compact_slot1.store(lowest_cleanup_slot, Ordering::Relaxed);
purge_complete1.store(true, Ordering::Relaxed); purge_complete1.store(true, Ordering::Relaxed);
}) })
.unwrap(); .unwrap();
@ -287,39 +241,6 @@ impl LedgerCleanupService {
Ok(()) Ok(())
} }
pub fn compact_ledger(
blockstore: &Arc<Blockstore>,
last_compaction_slot: &mut u64,
compaction_interval: u64,
highest_compact_slot: &Arc<AtomicU64>,
compaction_jitter: &mut u64,
max_jitter: Option<u64>,
) {
let highest_compaction_slot = highest_compact_slot.load(Ordering::Relaxed);
if highest_compaction_slot.saturating_sub(*last_compaction_slot)
> (compaction_interval + *compaction_jitter)
{
info!(
"compacting data from slots {} to {}",
*last_compaction_slot, highest_compaction_slot,
);
if let Err(err) =
blockstore.compact_storage(*last_compaction_slot, highest_compaction_slot)
{
// This error is not fatal and indicates an internal error?
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
err, last_compaction_slot, highest_compaction_slot,
);
}
*last_compaction_slot = highest_compaction_slot;
let jitter = max_jitter.unwrap_or(0);
if jitter > 0 {
*compaction_jitter = thread_rng().gen_range(0, jitter);
}
}
}
fn report_disk_metrics( fn report_disk_metrics(
pre: BlockstoreResult<u64>, pre: BlockstoreResult<u64>,
post: BlockstoreResult<u64>, post: BlockstoreResult<u64>,
@ -337,8 +258,7 @@ impl LedgerCleanupService {
} }
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
self.t_cleanup.join()?; self.t_cleanup.join()
self.t_compact.join()
} }
} }
#[cfg(test)] #[cfg(test)]
@ -361,19 +281,10 @@ mod tests {
//send a signal to kill all but 5 shreds, which will be in the newest slots //send a signal to kill all but 5 shreds, which will be in the newest slots
let mut last_purge_slot = 0; let mut last_purge_slot = 0;
let highest_compaction_slot = Arc::new(AtomicU64::new(0));
sender.send(50).unwrap(); sender.send(50).unwrap();
LedgerCleanupService::cleanup_ledger( LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10)
&receiver, .unwrap();
&blockstore,
5,
&mut last_purge_slot,
10,
&highest_compaction_slot,
)
.unwrap();
assert_eq!(last_purge_slot, 50); assert_eq!(last_purge_slot, 50);
assert_eq!(highest_compaction_slot.load(Ordering::Relaxed), 44);
//check that 0-40 don't exist //check that 0-40 don't exist
blockstore blockstore
@ -381,18 +292,6 @@ mod tests {
.unwrap() .unwrap()
.for_each(|(slot, _)| assert!(slot > 40)); .for_each(|(slot, _)| assert!(slot > 40));
let mut last_compaction_slot = 0;
let mut jitter = 0;
LedgerCleanupService::compact_ledger(
&blockstore,
&mut last_compaction_slot,
10,
&highest_compaction_slot,
&mut jitter,
None,
);
assert_eq!(jitter, 0);
drop(blockstore); drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
@ -401,9 +300,7 @@ mod tests {
fn test_cleanup_speed() { fn test_cleanup_speed() {
solana_logger::setup(); solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
let mut blockstore = Blockstore::open(&blockstore_path).unwrap(); let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
blockstore.set_no_compaction(true);
let blockstore = Arc::new(blockstore);
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
let mut first_insert = Measure::start("first_insert"); let mut first_insert = Measure::start("first_insert");
@ -415,7 +312,6 @@ mod tests {
info!("{}", first_insert); info!("{}", first_insert);
let mut last_purge_slot = 0; let mut last_purge_slot = 0;
let last_compaction_slot = Arc::new(AtomicU64::new(0));
let mut slot = initial_slots; let mut slot = initial_slots;
let mut num_slots = 6; let mut num_slots = 6;
for _ in 0..5 { for _ in 0..5 {
@ -439,7 +335,6 @@ mod tests {
initial_slots, initial_slots,
&mut last_purge_slot, &mut last_purge_slot,
10, 10,
&last_compaction_slot,
) )
.unwrap(); .unwrap();
time.stop(); time.stop();

View File

@ -81,8 +81,6 @@ pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>, pub max_ledger_shreds: Option<u64>,
pub shred_version: u16, pub shred_version: u16,
pub repair_validators: Option<HashSet<Pubkey>>, pub repair_validators: Option<HashSet<Pubkey>>,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool, pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool, pub replay_slots_concurrently: bool,
} }
@ -300,8 +298,6 @@ impl Tvu {
blockstore.clone(), blockstore.clone(),
max_ledger_shreds, max_ledger_shreds,
exit, exit,
tvu_config.rocksdb_compaction_interval,
tvu_config.rocksdb_max_compaction_jitter,
) )
}); });

View File

@ -142,9 +142,6 @@ pub struct ValidatorConfig {
pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
pub halt_on_known_validators_accounts_hash_mismatch: bool, pub halt_on_known_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
pub no_rocksdb_compaction: bool,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub accounts_hash_interval_slots: u64, pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64, pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>, pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
@ -207,9 +204,6 @@ impl Default for ValidatorConfig {
gossip_validators: None, gossip_validators: None,
halt_on_known_validators_accounts_hash_mismatch: false, halt_on_known_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0, accounts_hash_fault_injection_slots: 0,
no_rocksdb_compaction: false,
rocksdb_compaction_interval: None,
rocksdb_max_compaction_jitter: None,
accounts_hash_interval_slots: std::u64::MAX, accounts_hash_interval_slots: std::u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None, wal_recovery_mode: None,
@ -981,8 +975,6 @@ impl Validator {
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
shred_version: node.info.shred_version, shred_version: node.info.shred_version,
repair_validators: config.repair_validators.clone(), repair_validators: config.repair_validators.clone(),
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader, wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently, replay_slots_concurrently: config.replay_slots_concurrently,
}, },
@ -1414,7 +1406,6 @@ fn load_blockstore(
}, },
) )
.expect("Failed to open ledger database"); .expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
blockstore.shred_timing_point_sender = poh_timing_point_sender; blockstore.shred_timing_point_sender = poh_timing_point_sender;
// following boot sequence (esp BankForks) could set root. so stash the original value // following boot sequence (esp BankForks) could set root. so stash the original value
// of blockstore root away here as soon as possible. // of blockstore root away here as soon as possible.

View File

@ -38,8 +38,6 @@ mod tests {
const DEFAULT_STOP_SIZE_CF_DATA_BYTES: u64 = 0; const DEFAULT_STOP_SIZE_CF_DATA_BYTES: u64 = 0;
const DEFAULT_SHRED_DATA_CF_SIZE_BYTES: u64 = 125 * 1024 * 1024 * 1024; const DEFAULT_SHRED_DATA_CF_SIZE_BYTES: u64 = 125 * 1024 * 1024 * 1024;
const ROCKSDB_FLUSH_GRACE_PERIOD_SECS: u64 = 20;
#[derive(Debug)] #[derive(Debug)]
struct BenchmarkConfig { struct BenchmarkConfig {
benchmark_slots: u64, benchmark_slots: u64,
@ -51,9 +49,6 @@ mod tests {
stop_size_cf_data_bytes: u64, stop_size_cf_data_bytes: u64,
pre_generate_data: bool, pre_generate_data: bool,
cleanup_blockstore: bool, cleanup_blockstore: bool,
assert_compaction: bool,
compaction_interval: Option<u64>,
no_compaction: bool,
num_writers: u64, num_writers: u64,
cleanup_service: bool, cleanup_service: bool,
fifo_compaction: bool, fifo_compaction: bool,
@ -187,19 +182,10 @@ mod tests {
/// under the storage limitation. /// under the storage limitation.
/// - `CLEANUP_BLOCKSTORE`: if true, the ledger store created in the current /// - `CLEANUP_BLOCKSTORE`: if true, the ledger store created in the current
/// benchmark run will be deleted. Default: true. /// benchmark run will be deleted. Default: true.
/// - `NO_COMPACTION`: whether to stop rocksdb's background compaction
/// completely. Default: false.
/// ///
/// Cleanup-service related settings: /// Cleanup-service related settings:
/// - `MAX_LEDGER_SHREDS`: when the clean-up service is on, the service will /// - `MAX_LEDGER_SHREDS`: when the clean-up service is on, the service will
/// clean up the ledger store when the number of shreds exceeds this value. /// clean up the ledger store when the number of shreds exceeds this value.
/// - `COMPACTION_INTERVAL`: if set, the clean-up service will compact all
/// slots that are older than the specified interval. The interval is
/// measured by slots.
/// Default: the number of slots per day (`TICKS_PER_DAY` / `DEFAULT_TICKS_PER_SLOT`)
/// - `ASSERT_COMPACTION`: if true, then the benchmark will perform a sanity check
/// on whether clean-up service triggers the expected compaction at the end of
/// the benchmark run. Default: false.
/// - `CLEANUP_SERVICE`: whether to enable the background cleanup service. /// - `CLEANUP_SERVICE`: whether to enable the background cleanup service.
/// If set to false, the ledger store in the benchmark will be purely relied /// If set to false, the ledger store in the benchmark will be purely relied
/// on RocksDB's compaction. Default: true. /// on RocksDB's compaction. Default: true.
@ -220,13 +206,6 @@ mod tests {
read_env("STOP_SIZE_CF_DATA_BYTES", DEFAULT_STOP_SIZE_CF_DATA_BYTES); read_env("STOP_SIZE_CF_DATA_BYTES", DEFAULT_STOP_SIZE_CF_DATA_BYTES);
let pre_generate_data = read_env("PRE_GENERATE_DATA", false); let pre_generate_data = read_env("PRE_GENERATE_DATA", false);
let cleanup_blockstore = read_env("CLEANUP_BLOCKSTORE", true); let cleanup_blockstore = read_env("CLEANUP_BLOCKSTORE", true);
// set default to `true` once compaction is merged
let assert_compaction = read_env("ASSERT_COMPACTION", false);
let compaction_interval = match read_env("COMPACTION_INTERVAL", 0) {
maybe_zero if maybe_zero == 0 => None,
non_zero => Some(non_zero),
};
let no_compaction = read_env("NO_COMPACTION", false);
let num_writers = read_env("NUM_WRITERS", 1); let num_writers = read_env("NUM_WRITERS", 1);
// A flag indicating whether to have a background clean-up service. // A flag indicating whether to have a background clean-up service.
// If set to false, the ledger store will purely rely on RocksDB's // If set to false, the ledger store will purely rely on RocksDB's
@ -246,9 +225,6 @@ mod tests {
stop_size_cf_data_bytes, stop_size_cf_data_bytes,
pre_generate_data, pre_generate_data,
cleanup_blockstore, cleanup_blockstore,
assert_compaction,
compaction_interval,
no_compaction,
num_writers, num_writers,
cleanup_service, cleanup_service,
fifo_compaction, fifo_compaction,
@ -331,7 +307,7 @@ mod tests {
false false
} }
/// The ledger cleanup compaction test which can also be used as a benchmark /// The ledger cleanup test which can also be used as a benchmark
/// measuring shred insertion performance of the blockstore. /// measuring shred insertion performance of the blockstore.
/// ///
/// The benchmark is controlled by several environmental arguments. /// The benchmark is controlled by several environmental arguments.
@ -339,15 +315,15 @@ mod tests {
/// ///
/// Example command: /// Example command:
/// BENCHMARK_SLOTS=1000000 BATCH_SIZE=1 SHREDS_PER_SLOT=25 NUM_WRITERS=8 \ /// BENCHMARK_SLOTS=1000000 BATCH_SIZE=1 SHREDS_PER_SLOT=25 NUM_WRITERS=8 \
/// PRE_GENERATE_DATA=false cargo test --release tests::test_ledger_cleanup_compaction \ /// PRE_GENERATE_DATA=false cargo test --release tests::test_ledger_cleanup \
/// -- --exact --nocapture /// -- --exact --nocapture
#[test] #[test]
fn test_ledger_cleanup_compaction() { fn test_ledger_cleanup() {
solana_logger::setup_with("error,ledger_cleanup::tests=info"); solana_logger::setup_with("error,ledger_cleanup::tests=info");
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let config = get_benchmark_config(); let config = get_benchmark_config();
let mut blockstore = Blockstore::open_with_options( let blockstore = Blockstore::open_with_options(
&ledger_path, &ledger_path,
if config.fifo_compaction { if config.fifo_compaction {
BlockstoreOptions { BlockstoreOptions {
@ -367,9 +343,6 @@ mod tests {
}, },
) )
.unwrap(); .unwrap();
if config.no_compaction {
blockstore.set_no_compaction(true);
}
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
info!("Benchmark configuration: {:#?}", config); info!("Benchmark configuration: {:#?}", config);
@ -383,7 +356,6 @@ mod tests {
let stop_size_iterations = config.stop_size_iterations; let stop_size_iterations = config.stop_size_iterations;
let stop_size_cf_data_bytes = config.stop_size_cf_data_bytes; let stop_size_cf_data_bytes = config.stop_size_cf_data_bytes;
let pre_generate_data = config.pre_generate_data; let pre_generate_data = config.pre_generate_data;
let compaction_interval = config.compaction_interval;
let num_writers = config.num_writers; let num_writers = config.num_writers;
let cleanup_service = config.cleanup_service; let cleanup_service = config.cleanup_service;
@ -399,8 +371,6 @@ mod tests {
blockstore.clone(), blockstore.clone(),
max_ledger_shreds, max_ledger_shreds,
&exit, &exit,
compaction_interval,
None,
)) ))
} else { } else {
None None
@ -619,23 +589,6 @@ mod tests {
insert_timer, insert_timer,
num_shreds_total as f32 / insert_timer.as_s(), num_shreds_total as f32 / insert_timer.as_s(),
); );
let u1 = storage_previous;
// Poll on some compaction happening
info!("Begin polling for compaction ...");
let start_poll = Instant::now();
while blockstore.storage_size().unwrap_or(0) >= u1 {
if start_poll.elapsed().as_secs() > ROCKSDB_FLUSH_GRACE_PERIOD_SECS {
break;
}
std::thread::sleep(Duration::from_millis(200));
}
info!(
"Done polling for compaction after {}s",
start_poll.elapsed().as_secs_f32()
);
let u2 = storage_previous;
exit.store(true, Ordering::SeqCst); exit.store(true, Ordering::SeqCst);
if cleanup_service { if cleanup_service {
@ -645,74 +598,9 @@ mod tests {
exit_cpu.store(true, Ordering::SeqCst); exit_cpu.store(true, Ordering::SeqCst);
sys.join().unwrap(); sys.join().unwrap();
if config.assert_compaction {
assert!(u2 < u1, "expected compaction! pre={},post={}", u1, u2);
}
if config.cleanup_blockstore { if config.cleanup_blockstore {
drop(blockstore); drop(blockstore);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
} }
} }
#[test]
fn test_compaction() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let n = 10_000;
let batch_size_slots = 100;
let num_batches = n / batch_size_slots;
let max_ledger_shreds = 100;
for i in 0..num_batches {
let start_slot = i * batch_size_slots;
let (shreds, _) = make_many_slot_shreds(start_slot, batch_size_slots, 1);
blockstore.insert_shreds(shreds, None, false).unwrap();
}
let u1 = blockstore.storage_size().unwrap() as f64;
// send signal to cleanup slots
let (sender, receiver) = unbounded();
sender.send(n).unwrap();
let mut last_purge_slot = 0;
let highest_compact_slot = Arc::new(AtomicU64::new(0));
LedgerCleanupService::cleanup_ledger(
&receiver,
&blockstore,
max_ledger_shreds,
&mut last_purge_slot,
10,
&highest_compact_slot,
)
.unwrap();
let mut compaction_jitter = 0;
let mut last_compaction_slot = 0;
LedgerCleanupService::compact_ledger(
&blockstore,
&mut last_compaction_slot,
10,
&highest_compact_slot,
&mut compaction_jitter,
None,
);
thread::sleep(Duration::from_secs(2));
let u2 = blockstore.storage_size().unwrap() as f64;
assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,);
// check that early slots don't exist
let max_slot = n - max_ledger_shreds - 1;
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| assert!(slot > max_slot));
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
} }

View File

@ -177,7 +177,6 @@ pub struct Blockstore {
completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>, completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>,
pub shred_timing_point_sender: Option<PohTimingSender>, pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>, pub lowest_cleanup_slot: RwLock<Slot>,
no_compaction: bool,
pub slots_stats: SlotsStats, pub slots_stats: SlotsStats,
} }
@ -335,7 +334,6 @@ impl Blockstore {
insert_shreds_lock: Mutex::<()>::default(), insert_shreds_lock: Mutex::<()>::default(),
last_root, last_root,
lowest_cleanup_slot: RwLock::<Slot>::default(), lowest_cleanup_slot: RwLock::<Slot>::default(),
no_compaction: false,
slots_stats: SlotsStats::default(), slots_stats: SlotsStats::default(),
}; };
if initialize_transaction_status_index { if initialize_transaction_status_index {
@ -410,18 +408,6 @@ impl Blockstore {
} }
} }
/// Whether to disable compaction in [`Blockstore::compact_storage`], which is used
/// by the ledger cleanup service and `solana_core::validator::backup_and_clear_blockstore`.
///
/// Note that this setting is not related to the RocksDB's background
/// compaction.
///
/// To disable RocksDB's background compaction, open the Blockstore
/// with AccessType::PrimaryForMaintenance.
pub fn set_no_compaction(&mut self, no_compaction: bool) {
self.no_compaction = no_compaction;
}
/// Deletes the blockstore at the specified path. /// Deletes the blockstore at the specified path.
/// ///
/// Note that if the `ledger_path` has multiple rocksdb instances, this /// Note that if the `ledger_path` has multiple rocksdb instances, this
@ -7377,10 +7363,7 @@ pub mod tests {
assert_eq!(counter, 2); assert_eq!(counter, 2);
} }
fn do_test_lowest_cleanup_slot_and_special_cfs( fn do_test_lowest_cleanup_slot_and_special_cfs(simulate_ledger_cleanup_service: bool) {
simulate_compaction: bool,
simulate_ledger_cleanup_service: bool,
) {
solana_logger::setup(); solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
@ -7496,20 +7479,12 @@ pub mod tests {
assert_eq!(are_missing, (false, false, false)); assert_eq!(are_missing, (false, false, false));
assert_existing_always(); assert_existing_always();
if simulate_compaction {
blockstore.set_max_expired_slot(lowest_cleanup_slot);
// force compaction filters to run across whole key range.
blockstore
.compact_storage(Slot::min_value(), Slot::max_value())
.unwrap();
}
if simulate_ledger_cleanup_service { if simulate_ledger_cleanup_service {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot; *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
} }
let are_missing = check_for_missing(); let are_missing = check_for_missing();
if simulate_compaction || simulate_ledger_cleanup_service { if simulate_ledger_cleanup_service {
// ... when either simulation (or both) is effective, we should observe to be missing // ... when either simulation (or both) is effective, we should observe to be missing
// consistently // consistently
assert_eq!(are_missing, (true, true, true)); assert_eq!(are_missing, (true, true, true));
@ -7521,27 +7496,13 @@ pub mod tests {
} }
#[test] #[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_compact_with_ledger_cleanup_service_simulation( fn test_lowest_cleanup_slot_and_special_cfs_with_ledger_cleanup_service_simulation() {
) { do_test_lowest_cleanup_slot_and_special_cfs(true);
do_test_lowest_cleanup_slot_and_special_cfs(true, true);
} }
#[test] #[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_compact_without_ledger_cleanup_service_simulation( fn test_lowest_cleanup_slot_and_special_cfs_without_ledger_cleanup_service_simulation() {
) { do_test_lowest_cleanup_slot_and_special_cfs(false);
do_test_lowest_cleanup_slot_and_special_cfs(true, false);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_compact_with_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(false, true);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_compact_without_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(false, false);
} }
#[test] #[test]

View File

@ -77,13 +77,6 @@ impl Blockstore {
pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) { pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots(from_slot, to_slot, PurgeType::Exact); self.purge_slots(from_slot, to_slot, PurgeType::Exact);
if let Err(e) = self.compact_storage(from_slot, to_slot) {
// This error is not fatal and indicates an internal error?
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
e, from_slot, to_slot
);
}
} }
/// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the /// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the
@ -346,97 +339,6 @@ impl Blockstore {
.is_ok() .is_ok()
} }
pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
if self.no_compaction {
info!("compact_storage: compaction disabled");
return Ok(false);
}
info!("compact_storage: from {} to {}", from_slot, to_slot);
let mut compact_timer = Measure::start("compact_range");
let result = self
.meta_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.db
.column::<cf::Root>()
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.data_shred_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.code_shred_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.dead_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.duplicate_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.erasure_meta_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.orphans_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.bank_hash_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.index_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.transaction_status_cf
.compact_range(0, 2)
.unwrap_or(false)
&& self
.address_signatures_cf
.compact_range(0, 2)
.unwrap_or(false)
&& self
.transaction_status_index_cf
.compact_range(0, 2)
.unwrap_or(false)
&& self
.rewards_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.blocktime_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.perf_samples_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.block_height_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.optimistic_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false);
compact_timer.stop();
if !result {
info!("compact_storage incomplete");
}
datapoint_info!(
"blockstore-compact",
("compact_range_us", compact_timer.as_us() as i64, i64),
);
Ok(result)
}
/// Purges special columns (using a non-Slot primary-index) exactly, by /// Purges special columns (using a non-Slot primary-index) exactly, by
/// deserializing each slot being purged and iterating through all /// deserializing each slot being purged and iterating through all
/// transactions to determine the keys of individual records. /// transactions to determine the keys of individual records.

View File

@ -31,9 +31,6 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
halt_on_known_validators_accounts_hash_mismatch: config halt_on_known_validators_accounts_hash_mismatch: config
.halt_on_known_validators_accounts_hash_mismatch, .halt_on_known_validators_accounts_hash_mismatch,
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
no_rocksdb_compaction: config.no_rocksdb_compaction,
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_max_compaction_jitter,
accounts_hash_interval_slots: config.accounts_hash_interval_slots, accounts_hash_interval_slots: config.accounts_hash_interval_slots,
max_genesis_archive_unpacked_size: config.max_genesis_archive_unpacked_size, max_genesis_archive_unpacked_size: config.max_genesis_archive_unpacked_size,
wal_recovery_mode: config.wal_recovery_mode.clone(), wal_recovery_mode: config.wal_recovery_mode.clone(),

View File

@ -818,7 +818,6 @@ impl TestValidator {
enforce_ulimit_nofile: false, enforce_ulimit_nofile: false,
warp_slot: config.warp_slot, warp_slot: config.warp_slot,
validator_exit: config.validator_exit.clone(), validator_exit: config.validator_exit.clone(),
rocksdb_compaction_interval: Some(100), // Compact every 100 slots
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
no_wait_for_vote_to_start_leader: true, no_wait_for_vote_to_start_leader: true,
staked_nodes_overrides: config.staked_nodes_overrides.clone(), staked_nodes_overrides: config.staked_nodes_overrides.clone(),

View File

@ -1188,13 +1188,6 @@ pub fn main() {
will not push/pull from from validators outside this set. \ will not push/pull from from validators outside this set. \
[default: all validators]") [default: all validators]")
) )
.arg(
Arg::with_name("rocksdb_compaction_interval")
.long("rocksdb-compaction-interval-slots")
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
.takes_value(true)
.help("Number of slots between compacting ledger"),
)
.arg( .arg(
Arg::with_name("tpu_coalesce_ms") Arg::with_name("tpu_coalesce_ms")
.long("tpu-coalesce-ms") .long("tpu-coalesce-ms")
@ -1242,13 +1235,6 @@ pub fn main() {
number of QUIC streams permitted from the peer and vote packet sender stage. number of QUIC streams permitted from the peer and vote packet sender stage.
Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}"), Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}"),
) )
.arg(
Arg::with_name("rocksdb_max_compaction_jitter")
.long("rocksdb-max-compaction-jitter-slots")
.value_name("ROCKSDB_MAX_COMPACTION_JITTER_SLOTS")
.takes_value(true)
.help("Introduce jitter into the compaction to offset compaction operation"),
)
.arg( .arg(
Arg::with_name("bind_address") Arg::with_name("bind_address")
.long("bind-address") .long("bind-address")
@ -2308,10 +2294,6 @@ pub fn main() {
let private_rpc = matches.is_present("private_rpc"); let private_rpc = matches.is_present("private_rpc");
let do_port_check = !matches.is_present("no_port_check"); let do_port_check = !matches.is_present("no_port_check");
let no_rocksdb_compaction = true;
let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok();
let rocksdb_max_compaction_jitter =
value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok();
let tpu_coalesce_ms = let tpu_coalesce_ms =
value_t!(matches, "tpu_coalesce_ms", u64).unwrap_or(DEFAULT_TPU_COALESCE_MS); value_t!(matches, "tpu_coalesce_ms", u64).unwrap_or(DEFAULT_TPU_COALESCE_MS);
let wal_recovery_mode = matches let wal_recovery_mode = matches
@ -2661,9 +2643,6 @@ pub fn main() {
known_validators, known_validators,
repair_validators, repair_validators,
gossip_validators, gossip_validators,
no_rocksdb_compaction,
rocksdb_compaction_interval,
rocksdb_max_compaction_jitter,
wal_recovery_mode, wal_recovery_mode,
poh_verify: !matches.is_present("skip_poh_verify"), poh_verify: !matches.is_present("skip_poh_verify"),
debug_keys, debug_keys,
@ -3294,6 +3273,18 @@ fn get_deprecated_arguments() -> Vec<Arg<'static, 'static>> {
.hidden(true) .hidden(true)
.takes_value(false) .takes_value(false)
.help("Disable manual compaction of the ledger database (this is ignored)."), .help("Disable manual compaction of the ledger database (this is ignored)."),
Arg::with_name("rocksdb_compaction_interval")
.long("rocksdb-compaction-interval-slots")
.hidden(true)
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
.takes_value(true)
.help("Number of slots between compacting ledger"),
Arg::with_name("rocksdb_max_compaction_jitter")
.long("rocksdb-max-compaction-jitter-slots")
.hidden(true)
.value_name("ROCKSDB_MAX_COMPACTION_JITTER_SLOTS")
.takes_value(true)
.help("Introduce jitter into the compaction to offset compaction operation"),
] ]
} }
@ -3328,6 +3319,8 @@ lazy_static! {
"Vote account sanity checks are no longer performed by default.", "Vote account sanity checks are no longer performed by default.",
), ),
("no_rocksdb_compaction", ""), ("no_rocksdb_compaction", ""),
("rocksdb_compaction_interval", ""),
("rocksdb_max_compaction_jitter", ""),
]; ];
} }