Add NUM_WRITERS to ledger_cleanup to enable multiple writers. (#21729)

Summary:
* Add NUM_WRITERS to ledger_cleanup to enable multiple writers.
  (Note that our insert_shreds() is still single threaded because
   it has a lock that limits only one writer at a time.)

* Make pre-generated slots more performent by directly inserting
  into the shared queue.  Otherwise, the main-thread which
  prepares the slots will be slower than the writers.

* Correct the shred insertion time -- before this diff it did not
  wait for joining all writer threads.
This commit is contained in:
Yueh-Hsuan Chiang 2021-12-10 09:42:51 -08:00 committed by GitHub
parent 6c108c8fc3
commit 65194c7ae8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 94 additions and 79 deletions

View File

@ -9,7 +9,6 @@ mod tests {
solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore},
get_tmp_ledger_path,
shred::Shred,
},
solana_measure::measure::Measure,
std::{
@ -48,6 +47,7 @@ mod tests {
assert_compaction: bool,
compaction_interval: Option<u64>,
no_compaction: bool,
num_writers: u64,
}
#[derive(Clone, Copy, Debug)]
@ -167,6 +167,7 @@ mod tests {
non_zero => Some(non_zero),
};
let no_compaction = read_env("NO_COMPACTION", false);
let num_writers = read_env("NUM_WRITERS", 1);
BenchmarkConfig {
benchmark_slots,
@ -180,6 +181,7 @@ mod tests {
assert_compaction,
compaction_interval,
no_compaction,
num_writers,
}
}
@ -221,6 +223,12 @@ mod tests {
*storage_previous = storage_now;
}
/**
* Example run command:
* BENCHMARK_SLOTS=10000 BATCH_SIZE=10 ENTRIES_PER_SLOT=1000 NUM_WRITERS=1 \
* PRE_GENERATE_DATA=true cargo test --release tests::test_ledger_cleanup_compaction \
* -- --exact --nocapture
*/
#[test]
fn test_ledger_cleanup_compaction() {
solana_logger::setup();
@ -243,6 +251,7 @@ mod tests {
let stop_size_iterations = config.stop_size_iterations;
let pre_generate_data = config.pre_generate_data;
let compaction_interval = config.compaction_interval;
let num_writers = config.num_writers;
let batches = benchmark_slots / batch_size;
@ -260,18 +269,20 @@ mod tests {
let exit_cpu = Arc::new(AtomicBool::new(false));
let sys = CpuStatsUpdater::new(&exit_cpu);
let mut generated_batches = VecDeque::<Vec<Shred>>::new();
let mut shreds = VecDeque::new();
if pre_generate_data {
let t0 = Instant::now();
eprintln!("PRE_GENERATE_DATA: (this may take a while)");
for i in 0..batches {
let start_slot = i * batch_size;
let (shreds, _) = make_many_slot_entries(start_slot, batch_size, entries_per_slot);
generated_batches.push_back(shreds);
let (new_shreds, _) =
make_many_slot_entries(start_slot, batch_size, entries_per_slot);
shreds.push_back(new_shreds);
}
eprintln!("PRE_GENERATE_DATA: took {} ms", t0.elapsed().as_millis());
};
}
let shreds = Arc::new(Mutex::new(shreds));
let time_initial = Instant::now();
let mut time_previous = time_initial;
@ -296,63 +307,70 @@ mod tests {
let mut total_slots = 0;
let mut time = Instant::now();
let mut start = Measure::start("start");
let shreds: Arc<Mutex<VecDeque<Vec<Shred>>>> = Arc::new(Mutex::new(VecDeque::new()));
let shreds1 = shreds.clone();
let insert_exit = Arc::new(AtomicBool::new(false));
let insert_exit1 = insert_exit.clone();
let blockstore1 = blockstore.clone();
let insert_thread = Builder::new()
.name("insert_shreds".to_string())
.spawn(move || {
let start = Instant::now();
let mut now = Instant::now();
let mut total = 0;
let mut total_batches = 0;
let mut total_inserted_shreds = 0;
let mut num_shreds = 0;
let mut max_speed = 0f32;
let mut min_speed = f32::MAX;
loop {
let (new_shreds, len) = {
let mut sl = shreds1.lock().unwrap();
(sl.pop_front(), sl.len())
};
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
warn!(
"tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
total, total_inserted_shreds, total_batches, len, shreds_per_second,
);
let average_speed =
total_inserted_shreds as f32 / start.elapsed().as_secs() as f32;
max_speed = max_speed.max(shreds_per_second);
min_speed = min_speed.min(shreds_per_second);
warn!(
"highest: {} lowest: {} avg: {}",
max_speed, min_speed, average_speed
);
now = Instant::now();
num_shreds = 0;
let mut insert_threads = vec![];
for i in 0..num_writers {
let cloned_insert_exit = insert_exit.clone();
let cloned_blockstore = blockstore.clone();
let cloned_shreds = shreds.clone();
let insert_thread = Builder::new()
.name(format!("insert_shreds-{}", i))
.spawn(move || {
let start = Instant::now();
let mut now = Instant::now();
let mut total = 0;
let mut total_batches = 0;
let mut total_inserted_shreds = 0;
let mut num_shreds = 0;
let mut max_speed = 0f32;
let mut min_speed = f32::MAX;
loop {
let (new_shreds, len) = {
let mut sl = cloned_shreds.lock().unwrap();
(sl.pop_front(), sl.len())
};
if now.elapsed().as_secs() > 0 {
let shreds_per_second = num_shreds as f32 / now.elapsed().as_secs() as f32;
warn!(
"T{} tried: {} inserted: {} batches: {} len: {} shreds_per_second: {}",
i, total, total_inserted_shreds, total_batches, len, shreds_per_second,
);
let average_speed =
total_inserted_shreds as f32 / start.elapsed().as_secs() as f32;
max_speed = max_speed.max(shreds_per_second);
min_speed = min_speed.min(shreds_per_second);
warn!(
"highest: {} lowest: {} avg: {}",
max_speed, min_speed, average_speed
);
now = Instant::now();
num_shreds = 0;
}
if let Some(new_shreds) = new_shreds {
total += new_shreds.len();
total_batches += 1;
let br = cloned_blockstore.insert_shreds(
new_shreds, None, false).unwrap();
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
} else {
warn!("Thread {} sleeps for 200 millis", i);
thread::sleep(Duration::from_millis(200));
}
if cloned_insert_exit.load(Ordering::Relaxed) {
info!(
"insert exiting... highest shreds/s: {} lowest shreds/s: {}",
max_speed, min_speed
);
break;
}
}
if let Some(new_shreds) = new_shreds {
total += new_shreds.len();
total_batches += 1;
let br = blockstore1.insert_shreds(new_shreds, None, false).unwrap();
total_inserted_shreds += br.1.len();
num_shreds += br.1.len();
} else {
thread::sleep(Duration::from_millis(200));
}
if insert_exit1.load(Ordering::Relaxed) {
info!(
"insert exiting... highest shreds/s: {} lowest shreds/s: {}",
max_speed, min_speed
);
break;
}
}
})
.unwrap();
})
.unwrap();
insert_threads.push(insert_thread);
}
let mut entries_batch = make_many_slot_entries(0, batch_size, entries_per_slot).0;
info!(
"batch size: {} entries_per_slot: {} shreds_per_slot: {}",
@ -360,7 +378,7 @@ mod tests {
entries_per_slot,
entries_batch.len()
);
shreds.lock().unwrap().push_back(entries_batch.clone());
for i in 0..batches {
let start_slot = i * batch_size;
@ -377,23 +395,17 @@ mod tests {
time = Instant::now();
}
if shreds.lock().unwrap().len() < 50 {
if !pre_generate_data && shreds.lock().unwrap().len() < 50 {
let mut make_time = Measure::start("make_entries");
let new_shreds = if pre_generate_data {
generated_batches.pop_front().unwrap()
} else {
num_slots += batch_size;
total_slots += batch_size;
entries_batch
.iter_mut()
.for_each(|shred| shred.set_slot(shred.slot() + batch_size));
entries_batch.clone()
};
num_slots += batch_size;
total_slots += batch_size;
entries_batch
.iter_mut()
.for_each(|shred| shred.set_slot(shred.slot() + batch_size));
let new_shreds = entries_batch.clone();
shreds.lock().unwrap().push_back(new_shreds);
make_time.stop();
total_make += make_time.as_us();
} else {
thread::sleep(Duration::from_millis(200));
}
sender.send(start_slot).unwrap();
@ -422,7 +434,6 @@ mod tests {
}
}
}
start.stop();
let mut now = Instant::now();
loop {
if now.elapsed().as_secs() > 1 {
@ -439,12 +450,16 @@ mod tests {
}
}
insert_exit.store(true, Ordering::Relaxed);
insert_thread.join().unwrap();
info!(
"done {} {} shreds/s",
while let Some(thread) = insert_threads.pop() {
thread.join().unwrap();
}
start.stop();
eprintln!(
"done {} {} slots/s",
start,
(batches * batch_size) as f32 / start.as_s()
benchmark_slots as f32 / start.as_s()
);
let u1 = storage_previous;