diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index c0977e2515..64b4d8783e 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -2,15 +2,17 @@ #[cfg(test)] mod tests { + use log::*; use solana_core::ledger_cleanup_service::LedgerCleanupService; use solana_ledger::blockstore::{make_many_slot_entries, Blockstore}; use solana_ledger::get_tmp_ledger_path; use solana_ledger::shred::Shred; + use solana_measure::measure::Measure; use std::collections::VecDeque; use std::str::FromStr; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; + use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::{Duration, Instant}; use systemstat::{CPULoad, Platform, System}; @@ -186,7 +188,7 @@ mod tests { let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle); println!( - "{},{},{},{},{},{},{},{},{},{},{}", + "{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}", time_now.duration_since(time_initial).as_millis(), time_now.duration_since(*time_previous).as_millis(), start_slot, @@ -206,6 +208,7 @@ mod tests { #[test] fn test_ledger_cleanup_compaction() { + solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap()); let config = get_benchmark_config(); @@ -241,8 +244,8 @@ mod tests { let t0 = Instant::now(); eprintln!("PRE_GENERATE_DATA: (this may take a while)"); for i in 0..batches { - let x = i * batch_size; - let (shreds, _) = make_many_slot_entries(x, batch_size, entries_per_slot); + let start_slot = i * batch_size; + let (shreds, _) = make_many_slot_entries(start_slot, batch_size, entries_per_slot); generated_batches.push_back(shreds); } eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis()); @@ -266,23 +269,118 @@ mod tests { &sys.get_stats(), ); + let mut total_make = 0; + let mut num_slots = 0; + let mut total_slots = 0; + let mut time = Instant::now(); + let mut start = Measure::start("start"); + let shreds: Arc>>> = Arc::new(Mutex::new(VecDeque::new())); + let shreds1 = shreds.clone(); + let insert_exit = Arc::new(AtomicBool::new(false)); + let insert_exit1 = insert_exit.clone(); + let blockstore1 = blockstore.clone(); + let insert_thread = Builder::new() + .name("insert_shreds".to_string()) + .spawn(move || { + let start = Instant::now(); + let mut now = Instant::now(); + let mut total = 0; + let mut total_batches = 0; + let mut total_inserted_shreds = 0; + let mut num_shreds = 0; + let mut max_speed = 0f32; + let mut min_speed = f32::MAX; + loop { + let (new_shreds, len) = { + let mut sl = shreds1.lock().unwrap(); + (sl.pop_front(), sl.len()) + }; + if now.elapsed().as_secs() > 0 { + let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32; + warn!( + "tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}", + total, total_inserted_shreds, total_batches, len, shreds_per_second, + ); + let average_speed = + total_inserted_shreds as f32 / start.elapsed().as_secs() as f32; + max_speed = max_speed.max(shreds_per_second); + min_speed = min_speed.min(shreds_per_second); + warn!( + "highest: {} lowest: {} avg: {}", + max_speed, min_speed, average_speed + ); + now = Instant::now(); + num_shreds = 0; + } + if let Some(new_shreds) = new_shreds { + total += new_shreds.len(); + total_batches += 1; + let br = blockstore1.insert_shreds(new_shreds, None, false).unwrap(); + total_inserted_shreds += br.1.len(); + num_shreds += br.1.len(); + } else { + thread::sleep(Duration::from_millis(200)); + } + if insert_exit1.load(Ordering::Relaxed) { + info!( + "insert exiting... highest shreds/s: {} lowest shreds/s: {}", + max_speed, min_speed + ); + break; + } + } + }) + .unwrap(); + let mut entries_batch = make_many_slot_entries(0, batch_size, entries_per_slot).0; + info!( + "batch size: {} entries_per_slot: {} shreds_per_slot: {}", + batch_size, + entries_per_slot, + entries_batch.len() + ); + shreds.lock().unwrap().push_back(entries_batch.clone()); for i in 0..batches { - let x = i * batch_size; + let start_slot = i * batch_size; - let shreds = if pre_generate_data { - generated_batches.pop_front().unwrap() + if time.elapsed().as_secs() > 0 { + warn!( + "total slots: {} slots: {} make: {}ms {:.2}", + total_slots, + num_slots, + total_make / (1000), + num_slots as f32 / time.elapsed().as_secs() as f32, + ); + num_slots = 0; + total_make = 0; + time = Instant::now(); + } + + if shreds.lock().unwrap().len() < 50 { + let mut make_time = Measure::start("make_entries"); + let new_shreds = if pre_generate_data { + generated_batches.pop_front().unwrap() + } else { + num_slots += batch_size; + total_slots += batch_size; + entries_batch + .iter_mut() + .for_each(|shred| shred.set_slot(shred.slot() + batch_size)); + entries_batch.clone() + }; + shreds.lock().unwrap().push_back(new_shreds); + make_time.stop(); + total_make += make_time.as_us(); } else { - make_many_slot_entries(x, batch_size, entries_per_slot).0 - }; + thread::sleep(Duration::from_millis(200)); + } - blockstore.insert_shreds(shreds, None, false).unwrap(); - sender.send(x).unwrap(); + sender.send(start_slot).unwrap(); emit_stats( time_initial, &mut time_previous, &mut storage_previous, - x, + start_slot, batch_size, batch_size, max_ledger_shreds as i64, @@ -302,7 +400,30 @@ mod tests { } } } + start.stop(); + let mut now = Instant::now(); + loop { + if now.elapsed().as_secs() > 1 { + warn!( + "waiting for insert queue to clear.. {}", + shreds.lock().unwrap().len() + ); + now = Instant::now(); + } + if shreds.lock().unwrap().is_empty() { + break; + } else { + thread::sleep(Duration::from_millis(200)); + } + } + insert_exit.store(true, Ordering::Relaxed); + insert_thread.join().unwrap(); + info!( + "done {} {} shreds/s", + start, + (batches * batch_size) as f32 / start.as_s() + ); let u1 = storage_previous; // send final `ledger_cleanup` notification (since iterations above are zero-based) @@ -329,6 +450,7 @@ mod tests { std::thread::sleep(Duration::from_millis(200)); } + info!("done polling"); emit_stats( time_initial, &mut time_previous,