insert accounts in parallel when building initial index (#17040)

* insert accounts in parallel when building initial index

* rename nits from pr review

* rename nits from pr review

* rename nits from pr review

* rename nits from pr review
This commit is contained in:
Jeff Washington (jwash) 2021-05-05 12:08:45 -05:00 committed by GitHub
parent 3e0fed48e7
commit ab7c96aa81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 82 additions and 60 deletions

View File

@ -4908,12 +4908,33 @@ impl AccountsDb {
let mut slots = self.storage.all_slots();
#[allow(clippy::stable_sort_primitive)]
slots.sort();
let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
slots.par_chunks(chunk_size).for_each(|slots| {
let mut last_log_update = Instant::now();
let mut my_last_reported_number_of_processed_slots = 0;
let mut was_first = false;
for (index, slot) in slots.iter().enumerate() {
let now = Instant::now();
if now.duration_since(last_log_update).as_secs() >= 2 {
info!("generating index: {}/{} slots...", index, slots.len());
let my_total_newly_processed_slots_since_last_report =
(index as u64) - my_last_reported_number_of_processed_slots;
my_last_reported_number_of_processed_slots = index as u64;
let previous_total_processed_slots_across_all_threads =
total_processed_slots_across_all_threads.fetch_add(
my_total_newly_processed_slots_since_last_report,
Ordering::Relaxed,
);
was_first = was_first || 0 == previous_total_processed_slots_across_all_threads;
if was_first {
info!(
"generating index: {}/{} slots...",
previous_total_processed_slots_across_all_threads
+ my_total_newly_processed_slots_since_last_report,
outer_slots_len
);
}
last_log_update = now;
}
let storage_maps: Vec<Arc<AccountStorageEntry>> = self
@ -4972,6 +4993,7 @@ impl AccountsDb {
}
}
}
});
// Need to add these last, otherwise older updates will be cleaned
for slot in slots {