From f479ab7af28c309c6e507f709dd8d3dfa93f95a8 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com> Date: Thu, 30 Dec 2021 20:18:47 -1000 Subject: [PATCH] ledger_cleanup test improvement (1/N) -- make the test lockless and simplify the logic (#22090) --- core/tests/ledger_cleanup.rs | 140 +++++++++++++---------------------- 1 file changed, 52 insertions(+), 88 deletions(-) diff --git a/core/tests/ledger_cleanup.rs b/core/tests/ledger_cleanup.rs index b376cb1bb..220c79418 100644 --- a/core/tests/ledger_cleanup.rs +++ b/core/tests/ledger_cleanup.rs @@ -204,7 +204,7 @@ mod tests { let storage_now = blockstore.storage_size().unwrap_or(0); let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle); - println!( + info!( "{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}", time_now.duration_since(time_initial).as_millis(), time_now.duration_since(*time_previous).as_millis(), @@ -287,7 +287,6 @@ mod tests { } let shreds = Arc::new(Mutex::new(shreds)); - let (mut shreds_batch, _) = make_many_slot_shreds(0, batch_size_slots, shreds_per_slot); info!( "Bench info num_batches: {}, batch size (slots): {}, shreds_per_slot: {}, num_shreds_total: {}", num_batches, @@ -314,20 +313,20 @@ mod tests { &sys.get_stats(), ); - let mut total_make = 0; - let mut num_slots = 0; - let mut total_slots = 0; - let mut time = Instant::now(); let mut insert_threads = vec![]; let insert_exit = Arc::new(AtomicBool::new(false)); info!("Begin inserting shreds ..."); let mut insert_timer = Measure::start("Shred insertion"); + let current_batch_id = Arc::new(AtomicU64::new(0)); + let finished_batch_count = Arc::new(AtomicU64::new(0)); for i in 0..num_writers { let cloned_insert_exit = insert_exit.clone(); let cloned_blockstore = blockstore.clone(); let cloned_shreds = shreds.clone(); + let shared_batch_id = current_batch_id.clone(); + let shared_finished_count = finished_batch_count.clone(); let insert_thread = Builder::new() .name(format!("insert_shreds-{}", i)) .spawn(move || { @@ -340,10 +339,26 @@ mod tests { let mut max_speed = 0f32; let mut min_speed = f32::MAX; loop { - let (new_shreds, len) = { + let batch_id = shared_batch_id.fetch_add(1, Ordering::SeqCst); + let start_slot = batch_id * batch_size_slots; + let len = batch_id; + if start_slot >= benchmark_slots { + break; + } + + let new_shreds = if pre_generate_data { let mut sl = cloned_shreds.lock().unwrap(); - (sl.pop_front(), sl.len()) + if let Some(shreds_from_queue) = sl.pop_front() { + shreds_from_queue + } else { + break; + } + } else { + let (generated_shreds, _) = make_many_slot_shreds( + start_slot, batch_size_slots, shreds_per_slot); + generated_shreds }; + // as_secs() returns whole number of seconds, so this runs every second if now.elapsed().as_secs() > 0 { let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32; @@ -362,17 +377,15 @@ mod tests { now = Instant::now(); num_shreds = 0; } - if let Some(new_shreds) = new_shreds { - total += new_shreds.len(); - total_batches += 1; - let br = cloned_blockstore.insert_shreds( - new_shreds, None, false).unwrap(); - total_inserted_shreds += br.1.len(); - num_shreds += br.1.len(); - } else { - warn!("insert-{} sleeping for 200ms", i); - thread::sleep(Duration::from_millis(200)); - } + + total += new_shreds.len(); + total_batches += 1; + let br = cloned_blockstore.insert_shreds( + new_shreds, None, false).unwrap(); + total_inserted_shreds += br.1.len(); + num_shreds += br.1.len(); + shared_finished_count.fetch_add(1, Ordering::Relaxed); + if cloned_insert_exit.load(Ordering::Relaxed) { if max_speed > 0.0 { info!( @@ -386,7 +399,6 @@ mod tests { i ); } - break; } } @@ -395,42 +407,17 @@ mod tests { insert_threads.push(insert_thread); } - for i in 0..num_batches { - let start_slot = i * batch_size_slots; + loop { + let finished_batch = finished_batch_count.load(Ordering::Relaxed); + let finished_slot = (finished_batch + 1) * batch_size_slots - 1; - if time.elapsed().as_secs() > 0 { - warn!( - "total slots: {}, slots: {}, make: {}ms {:.2}", - total_slots, - num_slots, - total_make / (1000), - num_slots as f32 / time.elapsed().as_secs() as f32, - ); - num_slots = 0; - total_make = 0; - time = Instant::now(); - } - - if !pre_generate_data && shreds.lock().unwrap().len() < 50 { - let mut make_time = Measure::start("make_entries"); - num_slots += batch_size_slots; - total_slots += batch_size_slots; - shreds_batch - .iter_mut() - .for_each(|shred| shred.set_slot(shred.slot() + batch_size_slots)); - let new_shreds = shreds_batch.clone(); - shreds.lock().unwrap().push_back(new_shreds); - make_time.stop(); - total_make += make_time.as_us(); - } - - sender.send(start_slot).unwrap(); + sender.send(finished_slot).unwrap(); emit_stats( time_initial, &mut time_previous, &mut storage_previous, - start_slot, + finished_slot, batch_size_slots, shreds_per_slot, max_ledger_shreds as i64, @@ -449,22 +436,26 @@ mod tests { break; } } - } - let mut now = Instant::now(); - loop { - if now.elapsed().as_secs() > 1 { - warn!( - "Waiting for insert queue to clear ... {}", - shreds.lock().unwrap().len() - ); - now = Instant::now(); + + if stop_size_bytes > 0 { + if storage_previous >= stop_size_bytes { + stop_size_bytes_exceeded_iterations += 1; + } else { + stop_size_bytes_exceeded_iterations = 0; + } + + if stop_size_bytes_exceeded_iterations > stop_size_iterations { + break; + } } - if shreds.lock().unwrap().is_empty() { + + if finished_batch >= num_batches { break; } else { - thread::sleep(Duration::from_millis(200)); + thread::sleep(Duration::from_millis(500)); } } + // Send exit signal to stop all the writer threads. insert_exit.store(true, Ordering::Relaxed); while let Some(thread) = insert_threads.pop() { @@ -479,21 +470,6 @@ mod tests { ); let u1 = storage_previous; - // send final `ledger_cleanup` notification (since iterations above are zero-based) - sender.send(benchmark_slots).unwrap(); - - emit_stats( - time_initial, - &mut time_previous, - &mut storage_previous, - benchmark_slots, - 0, - 0, - max_ledger_shreds as i64, - &blockstore, - &sys.get_stats(), - ); - // Poll on some compaction happening info!("Begin polling for compaction ..."); let start_poll = Instant::now(); @@ -508,18 +484,6 @@ mod tests { start_poll.elapsed().as_secs_f32() ); - emit_stats( - time_initial, - &mut time_previous, - &mut storage_previous, - benchmark_slots, - 0, - 0, - max_ledger_shreds as i64, - &blockstore, - &sys.get_stats(), - ); - let u2 = storage_previous; exit.store(true, Ordering::SeqCst);