ledger_cleanup test improvement (1/N) -- make the test lockless and simplify the logic (#22090)

This commit is contained in:
Yueh-Hsuan Chiang 2021-12-30 20:18:47 -10:00 committed by GitHub
parent d06e6c7425
commit f479ab7af2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 88 deletions

View File

@ -204,7 +204,7 @@ mod tests {
let storage_now = blockstore.storage_size().unwrap_or(0);
let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle);
println!(
info!(
"{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}",
time_now.duration_since(time_initial).as_millis(),
time_now.duration_since(*time_previous).as_millis(),
@ -287,7 +287,6 @@ mod tests {
}
let shreds = Arc::new(Mutex::new(shreds));
let (mut shreds_batch, _) = make_many_slot_shreds(0, batch_size_slots, shreds_per_slot);
info!(
"Bench info num_batches: {}, batch size (slots): {}, shreds_per_slot: {}, num_shreds_total: {}",
num_batches,
@ -314,20 +313,20 @@ mod tests {
&sys.get_stats(),
);
let mut total_make = 0;
let mut num_slots = 0;
let mut total_slots = 0;
let mut time = Instant::now();
let mut insert_threads = vec![];
let insert_exit = Arc::new(AtomicBool::new(false));
info!("Begin inserting shreds ...");
let mut insert_timer = Measure::start("Shred insertion");
let current_batch_id = Arc::new(AtomicU64::new(0));
let finished_batch_count = Arc::new(AtomicU64::new(0));
for i in 0..num_writers {
let cloned_insert_exit = insert_exit.clone();
let cloned_blockstore = blockstore.clone();
let cloned_shreds = shreds.clone();
let shared_batch_id = current_batch_id.clone();
let shared_finished_count = finished_batch_count.clone();
let insert_thread = Builder::new()
.name(format!("insert_shreds-{}", i))
.spawn(move || {
@ -340,10 +339,26 @@ mod tests {
let mut max_speed = 0f32;
let mut min_speed = f32::MAX;
loop {
let (new_shreds, len) = {
let batch_id = shared_batch_id.fetch_add(1, Ordering::SeqCst);
let start_slot = batch_id * batch_size_slots;
let len = batch_id;
if start_slot >= benchmark_slots {
break;
}
let new_shreds = if pre_generate_data {
let mut sl = cloned_shreds.lock().unwrap();
(sl.pop_front(), sl.len())
if let Some(shreds_from_queue) = sl.pop_front() {
shreds_from_queue
} else {
break;
}
} else {
let (generated_shreds, _) = make_many_slot_shreds(
start_slot, batch_size_slots, shreds_per_slot);
generated_shreds
};
// as_secs() returns whole number of seconds, so this runs every second
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
@ -362,17 +377,15 @@ mod tests {
now = Instant::now();
num_shreds = 0;
}
if let Some(new_shreds) = new_shreds {
total += new_shreds.len();
total_batches += 1;
let br = cloned_blockstore.insert_shreds(
new_shreds, None, false).unwrap();
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
} else {
warn!("insert-{} sleeping for 200ms", i);
thread::sleep(Duration::from_millis(200));
}
total += new_shreds.len();
total_batches += 1;
let br = cloned_blockstore.insert_shreds(
new_shreds, None, false).unwrap();
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
shared_finished_count.fetch_add(1, Ordering::Relaxed);
if cloned_insert_exit.load(Ordering::Relaxed) {
if max_speed > 0.0 {
info!(
@ -386,7 +399,6 @@ mod tests {
i
);
}
break;
}
}
@ -395,42 +407,17 @@ mod tests {
insert_threads.push(insert_thread);
}
for i in 0..num_batches {
let start_slot = i * batch_size_slots;
loop {
let finished_batch = finished_batch_count.load(Ordering::Relaxed);
let finished_slot = (finished_batch + 1) * batch_size_slots - 1;
if time.elapsed().as_secs() > 0 {
warn!(
"total slots: {}, slots: {}, make: {}ms {:.2}",
total_slots,
num_slots,
total_make / (1000),
num_slots as f32 / time.elapsed().as_secs() as f32,
);
num_slots = 0;
total_make = 0;
time = Instant::now();
}
if !pre_generate_data && shreds.lock().unwrap().len() < 50 {
let mut make_time = Measure::start("make_entries");
num_slots += batch_size_slots;
total_slots += batch_size_slots;
shreds_batch
.iter_mut()
.for_each(|shred| shred.set_slot(shred.slot() + batch_size_slots));
let new_shreds = shreds_batch.clone();
shreds.lock().unwrap().push_back(new_shreds);
make_time.stop();
total_make += make_time.as_us();
}
sender.send(start_slot).unwrap();
sender.send(finished_slot).unwrap();
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
start_slot,
finished_slot,
batch_size_slots,
shreds_per_slot,
max_ledger_shreds as i64,
@ -449,22 +436,26 @@ mod tests {
break;
}
}
}
let mut now = Instant::now();
loop {
if now.elapsed().as_secs() > 1 {
warn!(
"Waiting for insert queue to clear ... {}",
shreds.lock().unwrap().len()
);
now = Instant::now();
if stop_size_bytes > 0 {
if storage_previous >= stop_size_bytes {
stop_size_bytes_exceeded_iterations += 1;
} else {
stop_size_bytes_exceeded_iterations = 0;
}
if stop_size_bytes_exceeded_iterations > stop_size_iterations {
break;
}
}
if shreds.lock().unwrap().is_empty() {
if finished_batch >= num_batches {
break;
} else {
thread::sleep(Duration::from_millis(200));
thread::sleep(Duration::from_millis(500));
}
}
// Send exit signal to stop all the writer threads.
insert_exit.store(true, Ordering::Relaxed);
while let Some(thread) = insert_threads.pop() {
@ -479,21 +470,6 @@ mod tests {
);
let u1 = storage_previous;
// send final `ledger_cleanup` notification (since iterations above are zero-based)
sender.send(benchmark_slots).unwrap();
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
benchmark_slots,
0,
0,
max_ledger_shreds as i64,
&blockstore,
&sys.get_stats(),
);
// Poll on some compaction happening
info!("Begin polling for compaction ...");
let start_poll = Instant::now();
@ -508,18 +484,6 @@ mod tests {
start_poll.elapsed().as_secs_f32()
);
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
benchmark_slots,
0,
0,
max_ledger_shreds as i64,
&blockstore,
&sys.get_stats(),
);
let u2 = storage_previous;
exit.store(true, Ordering::SeqCst);