get unique keys while finding dups (#33039)

* get unique keys while finding dups

* pr feedback
This commit is contained in:
Jeff Washington (jwash) 2023-08-29 08:06:35 -07:00 committed by GitHub
parent 9b4feddb55
commit 9bc09c9610
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 22 deletions

View File

@ -9263,6 +9263,9 @@ impl AccountsDb {
let mut index_flush_us = 0; let mut index_flush_us = 0;
let total_duplicate_slot_keys = AtomicU64::default(); let total_duplicate_slot_keys = AtomicU64::default();
let mut populate_duplicate_keys_us = 0; let mut populate_duplicate_keys_us = 0;
// outer vec is accounts index bin (determined by pubkey value)
// inner vec is the pubkeys within that bin that are present in > 1 slot
let unique_pubkeys_by_bin = Mutex::new(Vec::<Vec<Pubkey>>::default());
if pass == 0 { if pass == 0 {
// tell accounts index we are done adding the initial accounts at startup // tell accounts index we are done adding the initial accounts at startup
let mut m = Measure::start("accounts_index_idle_us"); let mut m = Measure::start("accounts_index_idle_us");
@ -9277,6 +9280,8 @@ impl AccountsDb {
.populate_and_retrieve_duplicate_keys_from_startup(|slot_keys| { .populate_and_retrieve_duplicate_keys_from_startup(|slot_keys| {
total_duplicate_slot_keys total_duplicate_slot_keys
.fetch_add(slot_keys.len() as u64, Ordering::Relaxed); .fetch_add(slot_keys.len() as u64, Ordering::Relaxed);
let unique_keys =
HashSet::<Pubkey>::from_iter(slot_keys.iter().map(|(_, key)| *key));
for (slot, key) in slot_keys { for (slot, key) in slot_keys {
match self.uncleaned_pubkeys.entry(slot) { match self.uncleaned_pubkeys.entry(slot) {
Occupied(mut occupied) => occupied.get_mut().push(key), Occupied(mut occupied) => occupied.get_mut().push(key),
@ -9285,10 +9290,18 @@ impl AccountsDb {
} }
} }
} }
let unique_pubkeys_by_bin_inner =
unique_keys.into_iter().collect::<Vec<_>>();
// does not matter that this is not ordered by slot
unique_pubkeys_by_bin
.lock()
.unwrap()
.push(unique_pubkeys_by_bin_inner);
}); });
}) })
.1; .1;
} }
let unique_pubkeys_by_bin = unique_pubkeys_by_bin.into_inner().unwrap();
let storage_info_timings = storage_info_timings.into_inner().unwrap(); let storage_info_timings = storage_info_timings.into_inner().unwrap();
let mut timings = GenerateIndexTimings { let mut timings = GenerateIndexTimings {
@ -9315,28 +9328,25 @@ impl AccountsDb {
Measure::start("handle accounts data len duplicates"); Measure::start("handle accounts data len duplicates");
let uncleaned_roots = Mutex::new(HashSet::<Slot>::default()); let uncleaned_roots = Mutex::new(HashSet::<Slot>::default());
if pass == 0 { if pass == 0 {
let mut unique_pubkeys = HashSet::<Pubkey>::default(); let accounts_data_len_from_duplicates = unique_pubkeys_by_bin
self.uncleaned_pubkeys.iter().for_each(|entry| { .par_iter()
entry.value().iter().for_each(|pubkey| { .map(|unique_keys| {
unique_pubkeys.insert(*pubkey); unique_keys
}) .par_chunks(4096)
}); .map(|pubkeys| {
let accounts_data_len_from_duplicates = unique_pubkeys let (count, uncleaned_roots_this_group) = self
.into_iter() .visit_duplicate_pubkeys_during_startup(
.collect::<Vec<_>>() pubkeys,
.par_chunks(4096) &rent_collector,
.map(|pubkeys| { &timings,
let (count, uncleaned_roots_this_group) = self );
.visit_duplicate_pubkeys_during_startup( let mut uncleaned_roots = uncleaned_roots.lock().unwrap();
pubkeys, uncleaned_roots_this_group.into_iter().for_each(|slot| {
&rent_collector, uncleaned_roots.insert(slot);
&timings, });
); count
let mut uncleaned_roots = uncleaned_roots.lock().unwrap(); })
uncleaned_roots_this_group.into_iter().for_each(|slot| { .sum::<u64>()
uncleaned_roots.insert(slot);
});
count
}) })
.sum(); .sum();
accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed); accounts_data_len.fetch_sub(accounts_data_len_from_duplicates, Ordering::Relaxed);