solana/core/tests/ledger_cleanup.rs

725 lines
27 KiB
Rust

#![allow(clippy::integer_arithmetic)]
// Long-running ledger_cleanup tests
#[cfg(test)]
mod tests {
use {
crossbeam_channel::unbounded,
log::*,
solana_core::ledger_cleanup_service::LedgerCleanupService,
solana_ledger::{
blockstore::{make_many_slot_shreds, Blockstore},
blockstore_db::{
BlockstoreOptions, BlockstoreRocksFifoOptions, LedgerColumnOptions,
ShredStorageType,
},
get_tmp_ledger_path,
},
solana_measure::measure::Measure,
std::{
collections::VecDeque,
str::FromStr,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
systemstat::{CPULoad, Platform, System},
};
const DEFAULT_BENCHMARK_SLOTS: u64 = 50;
const DEFAULT_BATCH_SIZE_SLOTS: u64 = 1;
const DEFAULT_MAX_LEDGER_SHREDS: u64 = 50;
const DEFAULT_SHREDS_PER_SLOT: u64 = 25;
const DEFAULT_STOP_SIZE_BYTES: u64 = 0;
const DEFAULT_STOP_SIZE_ITERATIONS: u64 = 0;
const DEFAULT_STOP_SIZE_CF_DATA_BYTES: u64 = 0;
const DEFAULT_SHRED_DATA_CF_SIZE_BYTES: u64 = 125 * 1024 * 1024 * 1024;
const ROCKSDB_FLUSH_GRACE_PERIOD_SECS: u64 = 20;
#[derive(Debug)]
struct BenchmarkConfig {
benchmark_slots: u64,
batch_size_slots: u64,
max_ledger_shreds: u64,
shreds_per_slot: u64,
stop_size_bytes: u64,
stop_size_iterations: u64,
stop_size_cf_data_bytes: u64,
pre_generate_data: bool,
cleanup_blockstore: bool,
assert_compaction: bool,
compaction_interval: Option<u64>,
no_compaction: bool,
num_writers: u64,
cleanup_service: bool,
fifo_compaction: bool,
shred_data_cf_size: u64,
}
#[derive(Clone, Copy, Debug)]
struct CpuStatsInner {
cpu_user: f32,
cpu_system: f32,
cpu_idle: f32,
}
impl From<CPULoad> for CpuStatsInner {
fn from(cpu: CPULoad) -> Self {
Self {
cpu_user: cpu.user * 100.0,
cpu_system: cpu.system * 100.0,
cpu_idle: cpu.idle * 100.0,
}
}
}
impl Default for CpuStatsInner {
fn default() -> Self {
Self {
cpu_user: 0.0,
cpu_system: 0.0,
cpu_idle: 0.0,
}
}
}
struct CpuStats {
stats: RwLock<CpuStatsInner>,
sys: System,
}
impl Default for CpuStats {
fn default() -> Self {
Self {
stats: RwLock::new(CpuStatsInner::default()),
sys: System::new(),
}
}
}
impl CpuStats {
fn update(&self) {
if let Ok(cpu) = self.sys.cpu_load_aggregate() {
std::thread::sleep(Duration::from_millis(400));
let cpu_new = CpuStatsInner::from(cpu.done().unwrap());
*self.stats.write().unwrap() = cpu_new;
}
}
fn get_stats(&self) -> CpuStatsInner {
*self.stats.read().unwrap()
}
}
struct CpuStatsUpdater {
cpu_stats: Arc<CpuStats>,
t_cleanup: JoinHandle<()>,
}
impl CpuStatsUpdater {
pub fn new(exit: &Arc<AtomicBool>) -> Self {
let exit = exit.clone();
let cpu_stats = Arc::new(CpuStats::default());
let cpu_stats_clone = cpu_stats.clone();
let t_cleanup = Builder::new()
.name("cpu_info".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
cpu_stats_clone.update();
})
.unwrap();
Self {
cpu_stats,
t_cleanup,
}
}
pub fn get_stats(&self) -> CpuStatsInner {
self.cpu_stats.get_stats()
}
pub fn join(self) -> std::thread::Result<()> {
self.t_cleanup.join()
}
}
fn read_env<T>(key: &str, default: T) -> T
where
T: FromStr,
{
match std::env::var(key) {
Ok(val) => val.parse().unwrap_or(default),
Err(_e) => default,
}
}
/// Obtains the benchmark config from the following environmental arguments:
///
/// Basic benchmark settings:
/// - `BENCHMARK_SLOTS`: the number of slots in the benchmark.
/// - `BATCH_SIZE`: the number of slots in each write batch.
/// - `SHREDS_PER_SLOT`: the number of shreds in each slot. Together with
/// the `BATCH_SIZE` and `BENCHMARK_SLOTS`, it means:
/// - the number of shreds in one write batch is `BATCH_SIZE` * `SHREDS_PER_SLOT`.
/// - the total number of batches is `BENCHMARK_SLOTS` / `BATCH_SIZE`.
/// - the total number of shreds is `BENCHMARK_SLOTS` * `SHREDS_PER_SLOT`.
/// - `NUM_WRITERS`: controls the number of concurrent threads performing
/// shred insertion. Default: 1.
///
/// Advanced benchmark settings:
/// - `STOP_SIZE_BYTES`: if specified, the benchmark will count how
/// many times the ledger store size exceeds the specified threshold.
/// - `STOP_SIZE_CF_DATA_BYTES`: if specified, the benchmark will count how
/// many times the storage size of `cf::ShredData` which stores data shred
/// exceeds the specified threshold.
/// - `STOP_SIZE_ITERATIONS`: when any of the stop size is specified, the
/// benchmark will stop immediately when the number of consecutive times
/// where the ledger store size exceeds the configured `STOP_SIZE_BYTES`.
/// These configs are used to make sure the benchmark runs successfully
/// under the storage limitation.
/// - `CLEANUP_BLOCKSTORE`: if true, the ledger store created in the current
/// benchmark run will be deleted. Default: true.
/// - `NO_COMPACTION`: whether to stop rocksdb's background compaction
/// completely. Default: false.
///
/// Cleanup-service related settings:
/// - `MAX_LEDGER_SHREDS`: when the clean-up service is on, the service will
/// clean up the ledger store when the number of shreds exceeds this value.
/// - `COMPACTION_INTERVAL`: if set, the clean-up service will compact all
/// slots that are older than the specified interval. The interval is
/// measured by slots.
/// Default: the number of slots per day (`TICKS_PER_DAY` / `DEFAULT_TICKS_PER_SLOT`)
/// - `ASSERT_COMPACTION`: if true, then the benchmark will perform a sanity check
/// on whether clean-up service triggers the expected compaction at the end of
/// the benchmark run. Default: false.
/// - `CLEANUP_SERVICE`: whether to enable the background cleanup service.
/// If set to false, the ledger store in the benchmark will be purely relied
/// on RocksDB's compaction. Default: true.
///
/// Fifo-compaction settings:
/// - `FIFO_COMPACTION`: if true, then RocksDB's Fifo compaction will be
/// used for storing data shreds. Default: false.
/// - `SHRED_DATA_CF_SIZE_BYTES`: the maximum size of the data-shred column family.
/// Default: 125 * 1024 * 1024 * 1024.
fn get_benchmark_config() -> BenchmarkConfig {
let benchmark_slots = read_env("BENCHMARK_SLOTS", DEFAULT_BENCHMARK_SLOTS);
let batch_size_slots = read_env("BATCH_SIZE", DEFAULT_BATCH_SIZE_SLOTS);
let max_ledger_shreds = read_env("MAX_LEDGER_SHREDS", DEFAULT_MAX_LEDGER_SHREDS);
let shreds_per_slot = read_env("SHREDS_PER_SLOT", DEFAULT_SHREDS_PER_SLOT);
let stop_size_bytes = read_env("STOP_SIZE_BYTES", DEFAULT_STOP_SIZE_BYTES);
let stop_size_iterations = read_env("STOP_SIZE_ITERATIONS", DEFAULT_STOP_SIZE_ITERATIONS);
let stop_size_cf_data_bytes =
read_env("STOP_SIZE_CF_DATA_BYTES", DEFAULT_STOP_SIZE_CF_DATA_BYTES);
let pre_generate_data = read_env("PRE_GENERATE_DATA", false);
let cleanup_blockstore = read_env("CLEANUP_BLOCKSTORE", true);
// set default to `true` once compaction is merged
let assert_compaction = read_env("ASSERT_COMPACTION", false);
let compaction_interval = match read_env("COMPACTION_INTERVAL", 0) {
maybe_zero if maybe_zero == 0 => None,
non_zero => Some(non_zero),
};
let no_compaction = read_env("NO_COMPACTION", false);
let num_writers = read_env("NUM_WRITERS", 1);
// A flag indicating whether to have a background clean-up service.
// If set to false, the ledger store will purely rely on RocksDB's
// compaction to perform the clean-up.
let cleanup_service = read_env("CLEANUP_SERVICE", true);
let fifo_compaction = read_env("FIFO_COMPACTION", false);
let shred_data_cf_size =
read_env("SHRED_DATA_CF_SIZE_BYTES", DEFAULT_SHRED_DATA_CF_SIZE_BYTES);
BenchmarkConfig {
benchmark_slots,
batch_size_slots,
max_ledger_shreds,
shreds_per_slot,
stop_size_bytes,
stop_size_iterations,
stop_size_cf_data_bytes,
pre_generate_data,
cleanup_blockstore,
assert_compaction,
compaction_interval,
no_compaction,
num_writers,
cleanup_service,
fifo_compaction,
shred_data_cf_size,
}
}
fn emit_header() {
println!("TIME_MS,DELTA_MS,START_SLOT,BATCH_SIZE,SHREDS,MAX,SIZE,DELTA_SIZE,DATA_SHRED_SIZE,DATA_SHRED_SIZE_DELTA,CPU_USER,CPU_SYSTEM,CPU_IDLE");
}
#[allow(clippy::too_many_arguments)]
fn emit_stats(
time_initial: Instant,
time_previous: &mut Instant,
storage_previous: &mut u64,
data_shred_storage_previous: &mut u64,
start_slot: u64,
batch_size: u64,
num_shreds: u64,
max_shreds: i64,
blockstore: &Blockstore,
cpu: &CpuStatsInner,
) {
let time_now = Instant::now();
let storage_now = blockstore.storage_size().unwrap_or(0);
let data_shred_storage_now = blockstore.total_data_shred_storage_size().unwrap();
let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle);
info!(
"{},{},{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}",
time_now.duration_since(time_initial).as_millis(),
time_now.duration_since(*time_previous).as_millis(),
start_slot,
batch_size,
num_shreds,
max_shreds,
storage_now,
storage_now as i64 - *storage_previous as i64,
data_shred_storage_now,
data_shred_storage_now as i64 - *data_shred_storage_previous as i64,
cpu_user,
cpu_system,
cpu_idle,
);
*time_previous = time_now;
*storage_previous = storage_now;
*data_shred_storage_previous = data_shred_storage_now.try_into().unwrap();
}
/// Helper function of the benchmark `test_ledger_cleanup_compaction` which
/// returns true if the benchmark fails the size limitation check.
fn is_exceeded_stop_size_iterations(
storage_size: u64,
stop_size: u64,
exceeded_iterations: &mut u64,
iteration_limit: u64,
storage_desc: &str,
) -> bool {
if stop_size > 0 {
if storage_size >= stop_size {
*exceeded_iterations += 1;
warn!(
"{} size {} exceeds the stop size {} for {} times!",
storage_desc, storage_size, stop_size, exceeded_iterations
);
} else {
*exceeded_iterations = 0;
}
if *exceeded_iterations >= iteration_limit {
error!(
"{} size exceeds the configured limit {} for {} times",
storage_desc, stop_size, exceeded_iterations,
);
return true;
}
}
false
}
/// The ledger cleanup compaction test which can also be used as a benchmark
/// measuring shred insertion performance of the blockstore.
///
/// The benchmark is controlled by several environmental arguments.
/// Check [`get_benchmark_config`] for the full list of arguments.
///
/// Example command:
/// BENCHMARK_SLOTS=1000000 BATCH_SIZE=1 SHREDS_PER_SLOT=25 NUM_WRITERS=8 \
/// PRE_GENERATE_DATA=false cargo test --release tests::test_ledger_cleanup_compaction \
/// -- --exact --nocapture
#[test]
fn test_ledger_cleanup_compaction() {
solana_logger::setup_with("error,ledger_cleanup::tests=info");
let ledger_path = get_tmp_ledger_path!();
let config = get_benchmark_config();
let mut blockstore = Blockstore::open_with_options(
&ledger_path,
if config.fifo_compaction {
BlockstoreOptions {
column_options: LedgerColumnOptions {
shred_storage_type: ShredStorageType::RocksFifo(
BlockstoreRocksFifoOptions {
shred_data_cf_size: config.shred_data_cf_size,
..BlockstoreRocksFifoOptions::default()
},
),
..LedgerColumnOptions::default()
},
..BlockstoreOptions::default()
}
} else {
BlockstoreOptions::default()
},
)
.unwrap();
if config.no_compaction {
blockstore.set_no_compaction(true);
}
let blockstore = Arc::new(blockstore);
info!("Benchmark configuration: {:#?}", config);
info!("Ledger path: {:?}", &ledger_path);
let benchmark_slots = config.benchmark_slots;
let batch_size_slots = config.batch_size_slots;
let max_ledger_shreds = config.max_ledger_shreds;
let shreds_per_slot = config.shreds_per_slot;
let stop_size_bytes = config.stop_size_bytes;
let stop_size_iterations = config.stop_size_iterations;
let stop_size_cf_data_bytes = config.stop_size_cf_data_bytes;
let pre_generate_data = config.pre_generate_data;
let compaction_interval = config.compaction_interval;
let num_writers = config.num_writers;
let cleanup_service = config.cleanup_service;
let num_batches = benchmark_slots / batch_size_slots;
let num_shreds_total = benchmark_slots * shreds_per_slot;
let (sender, receiver) = unbounded();
let exit = Arc::new(AtomicBool::new(false));
let cleaner = if cleanup_service {
Some(LedgerCleanupService::new(
receiver,
blockstore.clone(),
max_ledger_shreds,
&exit,
compaction_interval,
None,
))
} else {
None
};
let exit_cpu = Arc::new(AtomicBool::new(false));
let sys = CpuStatsUpdater::new(&exit_cpu);
let mut shreds = VecDeque::new();
if pre_generate_data {
let mut pre_generate_data_timer = Measure::start("Pre-generate data");
info!("Pre-generate data ... this may take a while");
for i in 0..num_batches {
let start_slot = i * batch_size_slots;
let (new_shreds, _) =
make_many_slot_shreds(start_slot, batch_size_slots, shreds_per_slot);
shreds.push_back(new_shreds);
}
pre_generate_data_timer.stop();
info!("{}", pre_generate_data_timer);
}
let shreds = Arc::new(Mutex::new(shreds));
info!(
"Bench info num_batches: {}, batch size (slots): {}, shreds_per_slot: {}, num_shreds_total: {}",
num_batches,
batch_size_slots,
shreds_per_slot,
num_shreds_total
);
let time_initial = Instant::now();
let mut time_previous = time_initial;
let mut storage_previous = 0;
let mut data_shred_storage_previous = 0;
let mut stop_size_bytes_exceeded_iterations = 0;
let mut stop_size_cf_data_exceeded_iterations = 0;
emit_header();
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
&mut data_shred_storage_previous,
0,
0,
0,
0,
&blockstore,
&sys.get_stats(),
);
let mut insert_threads = vec![];
let insert_exit = Arc::new(AtomicBool::new(false));
info!("Begin inserting shreds ...");
let mut insert_timer = Measure::start("Shred insertion");
let current_batch_id = Arc::new(AtomicU64::new(0));
let finished_batch_count = Arc::new(AtomicU64::new(0));
for i in 0..num_writers {
let cloned_insert_exit = insert_exit.clone();
let cloned_blockstore = blockstore.clone();
let cloned_shreds = shreds.clone();
let shared_batch_id = current_batch_id.clone();
let shared_finished_count = finished_batch_count.clone();
let insert_thread = Builder::new()
.name(format!("insert_shreds-{}", i))
.spawn(move || {
let start = Instant::now();
let mut now = Instant::now();
let mut total = 0;
let mut total_batches = 0;
let mut total_inserted_shreds = 0;
let mut num_shreds = 0;
let mut max_speed = 0f32;
let mut min_speed = f32::MAX;
let (mut shreds_with_parent, _) = make_many_slot_shreds(
1, batch_size_slots, shreds_per_slot);
let (first_shreds, _) = make_many_slot_shreds(
0, batch_size_slots, shreds_per_slot);
loop {
let batch_id = shared_batch_id.fetch_add(1, Ordering::Relaxed);
let start_slot = batch_id * batch_size_slots;
if start_slot >= benchmark_slots {
break;
}
let len = batch_id;
let br = if pre_generate_data {
let mut sl = cloned_shreds.lock().unwrap();
if let Some(shreds_from_queue) = sl.pop_front() {
total += shreds_from_queue.len();
cloned_blockstore.insert_shreds(
shreds_from_queue, None, false).unwrap()
} else {
// If the queue is empty, we're done!
break;
}
} else {
let mut slot_id = start_slot;
if slot_id > 0 {
for shred in shreds_with_parent.iter_mut() {
shred.set_slot(slot_id);
if shred.index() as u64 == shreds_per_slot - 1 {
slot_id += 1;
}
}
total += shreds_with_parent.len();
cloned_blockstore.insert_shreds(
shreds_with_parent.clone(), None, false).unwrap()
} else {
total += first_shreds.len();
cloned_blockstore.insert_shreds(
first_shreds.clone(), None, false).unwrap()
}
};
total_batches += 1;
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
shared_finished_count.fetch_add(1, Ordering::Relaxed);
// as_secs() returns whole number of seconds, so this runs every second
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
warn!(
"insert-{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
i, total, total_inserted_shreds, total_batches, len, shreds_per_second,
);
let average_speed =
total_inserted_shreds as f32 / start.elapsed().as_secs() as f32;
max_speed = max_speed.max(shreds_per_second);
min_speed = min_speed.min(shreds_per_second);
warn!(
"highest: {} lowest: {} avg: {}",
max_speed, min_speed, average_speed
);
now = Instant::now();
num_shreds = 0;
}
if cloned_insert_exit.load(Ordering::Relaxed) {
if max_speed > 0.0 {
info!(
"insert-{} exiting highest shreds/s: {}, lowest shreds/s: {}",
i, max_speed, min_speed
);
} else {
// Not enough time elapsed to sample
info!(
"insert-{} exiting",
i
);
}
break;
}
}
})
.unwrap();
insert_threads.push(insert_thread);
}
loop {
let finished_batch = finished_batch_count.load(Ordering::Relaxed);
let finished_slot = (finished_batch + 1) * batch_size_slots - 1;
if cleanup_service {
sender.send(finished_slot).unwrap();
}
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
&mut data_shred_storage_previous,
finished_slot,
batch_size_slots,
shreds_per_slot,
max_ledger_shreds as i64,
&blockstore,
&sys.get_stats(),
);
if is_exceeded_stop_size_iterations(
storage_previous,
stop_size_bytes,
&mut stop_size_bytes_exceeded_iterations,
stop_size_iterations,
"Storage",
) {
break;
}
if is_exceeded_stop_size_iterations(
data_shred_storage_previous,
stop_size_cf_data_bytes,
&mut stop_size_cf_data_exceeded_iterations,
stop_size_iterations,
"cf::ShredData",
) {
break;
}
if finished_batch >= num_batches {
break;
} else {
thread::sleep(Duration::from_millis(500));
}
}
// Send exit signal to stop all the writer threads.
insert_exit.store(true, Ordering::Relaxed);
while let Some(thread) = insert_threads.pop() {
thread.join().unwrap();
}
insert_timer.stop();
info!(
"Done inserting shreds: {}, {} shreds/s",
insert_timer,
num_shreds_total as f32 / insert_timer.as_s(),
);
let u1 = storage_previous;
// Poll on some compaction happening
info!("Begin polling for compaction ...");
let start_poll = Instant::now();
while blockstore.storage_size().unwrap_or(0) >= u1 {
if start_poll.elapsed().as_secs() > ROCKSDB_FLUSH_GRACE_PERIOD_SECS {
break;
}
std::thread::sleep(Duration::from_millis(200));
}
info!(
"Done polling for compaction after {}s",
start_poll.elapsed().as_secs_f32()
);
let u2 = storage_previous;
exit.store(true, Ordering::SeqCst);
if cleanup_service {
cleaner.unwrap().join().unwrap();
}
exit_cpu.store(true, Ordering::SeqCst);
sys.join().unwrap();
if config.assert_compaction {
assert!(u2 < u1, "expected compaction! pre={},post={}", u1, u2);
}
if config.cleanup_blockstore {
drop(blockstore);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
}
#[test]
fn test_compaction() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let n = 10_000;
let batch_size_slots = 100;
let num_batches = n / batch_size_slots;
let max_ledger_shreds = 100;
for i in 0..num_batches {
let start_slot = i * batch_size_slots;
let (shreds, _) = make_many_slot_shreds(start_slot, batch_size_slots, 1);
blockstore.insert_shreds(shreds, None, false).unwrap();
}
let u1 = blockstore.storage_size().unwrap() as f64;
// send signal to cleanup slots
let (sender, receiver) = unbounded();
sender.send(n).unwrap();
let mut last_purge_slot = 0;
let highest_compact_slot = Arc::new(AtomicU64::new(0));
LedgerCleanupService::cleanup_ledger(
&receiver,
&blockstore,
max_ledger_shreds,
&mut last_purge_slot,
10,
&highest_compact_slot,
)
.unwrap();
let mut compaction_jitter = 0;
let mut last_compaction_slot = 0;
LedgerCleanupService::compact_ledger(
&blockstore,
&mut last_compaction_slot,
10,
&highest_compact_slot,
&mut compaction_jitter,
None,
);
thread::sleep(Duration::from_secs(2));
let u2 = blockstore.storage_size().unwrap() as f64;
assert!(u2 < u1, "insufficient compaction! pre={},post={}", u1, u2,);
// check that early slots don't exist
let max_slot = n - max_ledger_shreds - 1;
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| assert!(slot > max_slot));
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
}