Speedup ledger cleanup test (#15304)

Just clone to produce shreds and use a separate insert thread.
This commit is contained in:
sakridge 2021-02-17 08:59:25 -08:00 committed by GitHub
parent 4d16b2fcc5
commit b24cb9840e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 134 additions and 12 deletions

View File

@ -2,15 +2,17 @@
#[cfg(test)]
mod tests {
use log::*;
use solana_core::ledger_cleanup_service::LedgerCleanupService;
use solana_ledger::blockstore::{make_many_slot_entries, Blockstore};
use solana_ledger::get_tmp_ledger_path;
use solana_ledger::shred::Shred;
use solana_measure::measure::Measure;
use std::collections::VecDeque;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use systemstat::{CPULoad, Platform, System};
@ -186,7 +188,7 @@ mod tests {
let (cpu_user, cpu_system, cpu_idle) = (cpu.cpu_user, cpu.cpu_system, cpu.cpu_idle);
println!(
"{},{},{},{},{},{},{},{},{},{},{}",
"{},{},{},{},{},{},{},{},{:.2},{:.2},{:.2}",
time_now.duration_since(time_initial).as_millis(),
time_now.duration_since(*time_previous).as_millis(),
start_slot,
@ -206,6 +208,7 @@ mod tests {
#[test]
fn test_ledger_cleanup_compaction() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let config = get_benchmark_config();
@ -241,8 +244,8 @@ mod tests {
let t0 = Instant::now();
eprintln!("PRE_GENERATE_DATA: (this may take a while)");
for i in 0..batches {
let x = i * batch_size;
let (shreds, _) = make_many_slot_entries(x, batch_size, entries_per_slot);
let start_slot = i * batch_size;
let (shreds, _) = make_many_slot_entries(start_slot, batch_size, entries_per_slot);
generated_batches.push_back(shreds);
}
eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis());
@ -266,23 +269,118 @@ mod tests {
&sys.get_stats(),
);
let mut total_make = 0;
let mut num_slots = 0;
let mut total_slots = 0;
let mut time = Instant::now();
let mut start = Measure::start("start");
let shreds: Arc<Mutex<VecDeque<Vec<Shred>>>> = Arc::new(Mutex::new(VecDeque::new()));
let shreds1 = shreds.clone();
let insert_exit = Arc::new(AtomicBool::new(false));
let insert_exit1 = insert_exit.clone();
let blockstore1 = blockstore.clone();
let insert_thread = Builder::new()
.name("insert_shreds".to_string())
.spawn(move || {
let start = Instant::now();
let mut now = Instant::now();
let mut total = 0;
let mut total_batches = 0;
let mut total_inserted_shreds = 0;
let mut num_shreds = 0;
let mut max_speed = 0f32;
let mut min_speed = f32::MAX;
loop {
let (new_shreds, len) = {
let mut sl = shreds1.lock().unwrap();
(sl.pop_front(), sl.len())
};
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
warn!(
"tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
total, total_inserted_shreds, total_batches, len, shreds_per_second,
);
let average_speed =
total_inserted_shreds as f32 / start.elapsed().as_secs() as f32;
max_speed = max_speed.max(shreds_per_second);
min_speed = min_speed.min(shreds_per_second);
warn!(
"highest: {} lowest: {} avg: {}",
max_speed, min_speed, average_speed
);
now = Instant::now();
num_shreds = 0;
}
if let Some(new_shreds) = new_shreds {
total += new_shreds.len();
total_batches += 1;
let br = blockstore1.insert_shreds(new_shreds, None, false).unwrap();
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
} else {
thread::sleep(Duration::from_millis(200));
}
if insert_exit1.load(Ordering::Relaxed) {
info!(
"insert exiting... highest shreds/s: {} lowest shreds/s: {}",
max_speed, min_speed
);
break;
}
}
})
.unwrap();
let mut entries_batch = make_many_slot_entries(0, batch_size, entries_per_slot).0;
info!(
"batch size: {} entries_per_slot: {} shreds_per_slot: {}",
batch_size,
entries_per_slot,
entries_batch.len()
);
shreds.lock().unwrap().push_back(entries_batch.clone());
for i in 0..batches {
let x = i * batch_size;
let start_slot = i * batch_size;
let shreds = if pre_generate_data {
generated_batches.pop_front().unwrap()
if time.elapsed().as_secs() > 0 {
warn!(
"total slots: {} slots: {} make: {}ms {:.2}",
total_slots,
num_slots,
total_make / (1000),
num_slots as f32 / time.elapsed().as_secs() as f32,
);
num_slots = 0;
total_make = 0;
time = Instant::now();
}
if shreds.lock().unwrap().len() < 50 {
let mut make_time = Measure::start("make_entries");
let new_shreds = if pre_generate_data {
generated_batches.pop_front().unwrap()
} else {
num_slots += batch_size;
total_slots += batch_size;
entries_batch
.iter_mut()
.for_each(|shred| shred.set_slot(shred.slot() + batch_size));
entries_batch.clone()
};
shreds.lock().unwrap().push_back(new_shreds);
make_time.stop();
total_make += make_time.as_us();
} else {
make_many_slot_entries(x, batch_size, entries_per_slot).0
};
thread::sleep(Duration::from_millis(200));
}
blockstore.insert_shreds(shreds, None, false).unwrap();
sender.send(x).unwrap();
sender.send(start_slot).unwrap();
emit_stats(
time_initial,
&mut time_previous,
&mut storage_previous,
x,
start_slot,
batch_size,
batch_size,
max_ledger_shreds as i64,
@ -302,7 +400,30 @@ mod tests {
}
}
}
start.stop();
let mut now = Instant::now();
loop {
if now.elapsed().as_secs() > 1 {
warn!(
"waiting for insert queue to clear.. {}",
shreds.lock().unwrap().len()
);
now = Instant::now();
}
if shreds.lock().unwrap().is_empty() {
break;
} else {
thread::sleep(Duration::from_millis(200));
}
}
insert_exit.store(true, Ordering::Relaxed);
insert_thread.join().unwrap();
info!(
"done {} {} shreds/s",
start,
(batches * batch_size) as f32 / start.as_s()
);
let u1 = storage_previous;
// send final `ledger_cleanup` notification (since iterations above are zero-based)
@ -329,6 +450,7 @@ mod tests {
std::thread::sleep(Duration::from_millis(200));
}
info!("done polling");
emit_stats(
time_initial,
&mut time_previous,