diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index cf43483d0c..c8e6c909c3 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -9,7 +9,6 @@ mod tests { solana_ledger::{ blockstore::{make_many_slot_entries, Blockstore}, get_tmp_ledger_path, - shred::Shred, }, solana_measure::measure::Measure, std::{ @@ -48,6 +47,7 @@ mod tests { assert_compaction: bool, compaction_interval: Option, no_compaction: bool, + num_writers: u64, } #[derive(Clone, Copy, Debug)] @@ -167,6 +167,7 @@ mod tests { non_zero => Some(non_zero), }; let no_compaction = read_env("NO_COMPACTION", false); + let num_writers = read_env("NUM_WRITERS", 1); BenchmarkConfig { benchmark_slots, @@ -180,6 +181,7 @@ mod tests { assert_compaction, compaction_interval, no_compaction, + num_writers, } } @@ -221,6 +223,12 @@ mod tests { *storage_previous = storage_now; } + /** + * Example run command: + * BENCHMARK_SLOTS=10000 BATCH_SIZE=10 ENTRIES_PER_SLOT=1000 NUM_WRITERS=1 \ + * PRE_GENERATE_DATA=true cargo test --release tests::test_ledger_cleanup_compaction \ + * -- --exact --nocapture + */ #[test] fn test_ledger_cleanup_compaction() { solana_logger::setup(); @@ -243,6 +251,7 @@ mod tests { let stop_size_iterations = config.stop_size_iterations; let pre_generate_data = config.pre_generate_data; let compaction_interval = config.compaction_interval; + let num_writers = config.num_writers; let batches = benchmark_slots / batch_size; @@ -260,18 +269,20 @@ mod tests { let exit_cpu = Arc::new(AtomicBool::new(false)); let sys = CpuStatsUpdater::new(&exit_cpu); - let mut generated_batches = VecDeque::>::new(); + let mut shreds = VecDeque::new(); if pre_generate_data { let t0 = Instant::now(); eprintln!("PRE_GENERATE_DATA: (this may take a while)"); for i in 0..batches { let start_slot = i * batch_size; - let (shreds, _) = make_many_slot_entries(start_slot, batch_size, entries_per_slot); - generated_batches.push_back(shreds); + let (new_shreds, _) = + make_many_slot_entries(start_slot, batch_size, entries_per_slot); + shreds.push_back(new_shreds); } eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis()); - }; + } + let shreds = Arc::new(Mutex::new(shreds)); let time_initial = Instant::now(); let mut time_previous = time_initial; @@ -296,63 +307,70 @@ mod tests { let mut total_slots = 0; let mut time = Instant::now(); let mut start = Measure::start("start"); - let shreds: Arc>>> = Arc::new(Mutex::new(VecDeque::new())); - let shreds1 = shreds.clone(); let insert_exit = Arc::new(AtomicBool::new(false)); - let insert_exit1 = insert_exit.clone(); - let blockstore1 = blockstore.clone(); - let insert_thread = Builder::new() - .name("insert_shreds".to_string()) - .spawn(move || { - let start = Instant::now(); - let mut now = Instant::now(); - let mut total = 0; - let mut total_batches = 0; - let mut total_inserted_shreds = 0; - let mut num_shreds = 0; - let mut max_speed = 0f32; - let mut min_speed = f32::MAX; - loop { - let (new_shreds, len) = { - let mut sl = shreds1.lock().unwrap(); - (sl.pop_front(), sl.len()) - }; - if now.elapsed().as_secs() > 0 { - let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32; - warn!( - "tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}", - total, total_inserted_shreds, total_batches, len, shreds_per_second, - ); - let average_speed = - total_inserted_shreds as f32 / start.elapsed().as_secs() as f32; - max_speed = max_speed.max(shreds_per_second); - min_speed = min_speed.min(shreds_per_second); - warn!( - "highest: {} lowest: {} avg: {}", - max_speed, min_speed, average_speed - ); - now = Instant::now(); - num_shreds = 0; + let mut insert_threads = vec![]; + + for i in 0..num_writers { + let cloned_insert_exit = insert_exit.clone(); + let cloned_blockstore = blockstore.clone(); + let cloned_shreds = shreds.clone(); + let insert_thread = Builder::new() + .name(format!("insert_shreds-{}", i)) + .spawn(move || { + let start = Instant::now(); + let mut now = Instant::now(); + let mut total = 0; + let mut total_batches = 0; + let mut total_inserted_shreds = 0; + let mut num_shreds = 0; + let mut max_speed = 0f32; + let mut min_speed = f32::MAX; + loop { + let (new_shreds, len) = { + let mut sl = cloned_shreds.lock().unwrap(); + (sl.pop_front(), sl.len()) + }; + if now.elapsed().as_secs() > 0 { + let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32; + warn!( + "T{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}", + i, total, total_inserted_shreds, total_batches, len, shreds_per_second, + ); + let average_speed = + total_inserted_shreds as f32 / start.elapsed().as_secs() as f32; + max_speed = max_speed.max(shreds_per_second); + min_speed = min_speed.min(shreds_per_second); + warn!( + "highest: {} lowest: {} avg: {}", + max_speed, min_speed, average_speed + ); + now = Instant::now(); + num_shreds = 0; + } + if let Some(new_shreds) = new_shreds { + total += new_shreds.len(); + total_batches += 1; + let br = cloned_blockstore.insert_shreds( + new_shreds, None, false).unwrap(); + total_inserted_shreds += br.1.len(); + num_shreds += br.1.len(); + } else { + warn!("Thread {} sleeps for 200 millis", i); + thread::sleep(Duration::from_millis(200)); + } + if cloned_insert_exit.load(Ordering::Relaxed) { + info!( + "insert exiting... highest shreds/s: {} lowest shreds/s: {}", + max_speed, min_speed + ); + break; + } } - if let Some(new_shreds) = new_shreds { - total += new_shreds.len(); - total_batches += 1; - let br = blockstore1.insert_shreds(new_shreds, None, false).unwrap(); - total_inserted_shreds += br.1.len(); - num_shreds += br.1.len(); - } else { - thread::sleep(Duration::from_millis(200)); - } - if insert_exit1.load(Ordering::Relaxed) { - info!( - "insert exiting... highest shreds/s: {} lowest shreds/s: {}", - max_speed, min_speed - ); - break; - } - } - }) - .unwrap(); + }) + .unwrap(); + insert_threads.push(insert_thread); + } + let mut entries_batch = make_many_slot_entries(0, batch_size, entries_per_slot).0; info!( "batch size: {} entries_per_slot: {} shreds_per_slot: {}", @@ -360,7 +378,7 @@ mod tests { entries_per_slot, entries_batch.len() ); - shreds.lock().unwrap().push_back(entries_batch.clone()); + for i in 0..batches { let start_slot = i * batch_size; @@ -377,23 +395,17 @@ mod tests { time = Instant::now(); } - if shreds.lock().unwrap().len() < 50 { + if !pre_generate_data && shreds.lock().unwrap().len() < 50 { let mut make_time = Measure::start("make_entries"); - let new_shreds = if pre_generate_data { - generated_batches.pop_front().unwrap() - } else { - num_slots += batch_size; - total_slots += batch_size; - entries_batch - .iter_mut() - .for_each(|shred| shred.set_slot(shred.slot() + batch_size)); - entries_batch.clone() - }; + num_slots += batch_size; + total_slots += batch_size; + entries_batch + .iter_mut() + .for_each(|shred| shred.set_slot(shred.slot() + batch_size)); + let new_shreds = entries_batch.clone(); shreds.lock().unwrap().push_back(new_shreds); make_time.stop(); total_make += make_time.as_us(); - } else { - thread::sleep(Duration::from_millis(200)); } sender.send(start_slot).unwrap(); @@ -422,7 +434,6 @@ mod tests { } } } - start.stop(); let mut now = Instant::now(); loop { if now.elapsed().as_secs() > 1 { @@ -439,12 +450,16 @@ mod tests { } } insert_exit.store(true, Ordering::Relaxed); - insert_thread.join().unwrap(); - info!( - "done {} {} shreds/s", + while let Some(thread) = insert_threads.pop() { + thread.join().unwrap(); + } + start.stop(); + + eprintln!( + "done {} {} slots/s", start, - (batches * batch_size) as f32 / start.as_s() + benchmark_slots as f32 / start.as_s() ); let u1 = storage_previous;