Clean up test_ledger_cleanup_compaction prints (#21875)

- Use info!()/warn!() over println!()/eprintln!()
- Make status prints consistent
- Add default RUST_LOG filter to see test printouts
- Adjust reported data to show shreds and rates we care about
This commit is contained in:
steviez 2021-12-16 11:24:29 -06:00 committed by GitHub
parent e97da0ea15
commit e83ca4bb28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 49 additions and 31 deletions

View File

@ -231,17 +231,18 @@ mod tests {
*/
#[test]
fn test_ledger_cleanup_compaction() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
solana_logger::setup_with("error,ledger_cleanup::tests=info");
let ledger_path = get_tmp_ledger_path!();
let mut blockstore = Blockstore::open(&ledger_path).unwrap();
let config = get_benchmark_config();
if config.no_compaction {
blockstore.set_no_compaction(true);
}
let blockstore = Arc::new(blockstore);
eprintln!("BENCHMARK CONFIG: {:?}", config);
eprintln!("LEDGER_PATH: {:?}", &blockstore_path);
info!("Benchmark configuration: {:#?}", config);
info!("Ledger path: {:?}", &ledger_path);
let benchmark_slots = config.benchmark_slots;
let batch_size_slots = config.batch_size_slots;
@ -254,6 +255,7 @@ mod tests {
let num_writers = config.num_writers;
let num_batches = benchmark_slots / batch_size_slots;
let num_shreds_total = benchmark_slots * shreds_per_slot;
let (sender, receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
@ -272,25 +274,26 @@ mod tests {
let mut shreds = VecDeque::new();
if pre_generate_data {
let t0 = Instant::now();
eprintln!("PRE_GENERATE_DATA: (this may take a while)");
let mut pre_generate_data_timer = Measure::start("Pre-generate data");
info!("Pre-generate data ... this may take a while");
for i in 0..num_batches {
let start_slot = i * batch_size_slots;
let (new_shreds, _) =
make_many_slot_shreds(start_slot, batch_size_slots, shreds_per_slot);
shreds.push_back(new_shreds);
}
eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis());
pre_generate_data_timer.stop();
info!("{}", pre_generate_data_timer);
}
let shreds = Arc::new(Mutex::new(shreds));
let (mut shreds_batch, entries) =
make_many_slot_shreds(0, batch_size_slots, shreds_per_slot);
let (mut shreds_batch, _) = make_many_slot_shreds(0, batch_size_slots, shreds_per_slot);
info!(
"batch size (slots): {}, entries_per_slot: {}, shreds_per_slot: {}",
"Bench info num_batches: {}, batch size (slots): {}, shreds_per_slot: {}, num_shreds_total: {}",
num_batches,
batch_size_slots,
entries.len() as u64 / batch_size_slots,
shreds_per_slot
shreds_per_slot,
num_shreds_total
);
let time_initial = Instant::now();
@ -315,9 +318,11 @@ mod tests {
let mut num_slots = 0;
let mut total_slots = 0;
let mut time = Instant::now();
let mut start = Measure::start("start");
let insert_exit = Arc::new(AtomicBool::new(false));
let mut insert_threads = vec![];
let insert_exit = Arc::new(AtomicBool::new(false));
info!("Begin inserting shreds ...");
let mut insert_timer = Measure::start("Shred insertion");
for i in 0..num_writers {
let cloned_insert_exit = insert_exit.clone();
@ -339,10 +344,11 @@ mod tests {
let mut sl = cloned_shreds.lock().unwrap();
(sl.pop_front(), sl.len())
};
// as_secs() returns whole number of seconds, so this runs every second
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
warn!(
"T{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
"insert-{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
i, total, total_inserted_shreds, total_batches, len, shreds_per_second,
);
let average_speed =
@ -364,14 +370,23 @@ mod tests {
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
} else {
warn!("Thread {} sleeps for 200 millis", i);
warn!("insert-{} sleeping for 200ms", i);
thread::sleep(Duration::from_millis(200));
}
if cloned_insert_exit.load(Ordering::Relaxed) {
info!(
"insert exiting... highest shreds/s: {} lowest shreds/s: {}",
max_speed, min_speed
);
if max_speed > 0.0 {
info!(
"insert-{} exiting highest shreds/s: {}, lowest shreds/s: {}",
i, max_speed, min_speed
);
} else {
// Not enough time elapsed to sample
info!(
"insert-{} exiting",
i
);
}
break;
}
}
@ -385,7 +400,7 @@ mod tests {
if time.elapsed().as_secs() > 0 {
warn!(
"total slots: {} slots: {} make: {}ms {:.2}",
"total slots: {}, slots: {}, make: {}ms {:.2}",
total_slots,
num_slots,
total_make / (1000),
@ -439,7 +454,7 @@ mod tests {
loop {
if now.elapsed().as_secs() > 1 {
warn!(
"waiting for insert queue to clear.. {}",
"Waiting for insert queue to clear ... {}",
shreds.lock().unwrap().len()
);
now = Instant::now();
@ -455,12 +470,12 @@ mod tests {
while let Some(thread) = insert_threads.pop() {
thread.join().unwrap();
}
start.stop();
insert_timer.stop();
eprintln!(
"done {} {} slots/s",
start,
benchmark_slots as f32 / start.as_s()
info!(
"Done inserting shreds: {}, {} shreds/s",
insert_timer,
num_shreds_total as f32 / insert_timer.as_s(),
);
let u1 = storage_previous;
@ -480,6 +495,7 @@ mod tests {
);
// Poll on some compaction happening
info!("Begin polling for compaction ...");
let start_poll = Instant::now();
while blockstore.storage_size().unwrap_or(0) >= u1 {
if start_poll.elapsed().as_secs() > ROCKSDB_FLUSH_GRACE_PERIOD_SECS {
@ -487,8 +503,11 @@ mod tests {
}
std::thread::sleep(Duration::from_millis(200));
}
info!(
"Done polling for compaction after {}s",
start_poll.elapsed().as_secs_f32()
);
info!("done polling");
emit_stats(
time_initial,
&mut time_previous,
@ -515,8 +534,7 @@ mod tests {
if config.cleanup_blockstore {
drop(blockstore);
Blockstore::destroy(&blockstore_path)
.expect("Expected successful database destruction");
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
}