better duplicate key stats during index generation (#30829)
This commit is contained in:
parent
473534f308
commit
2216647f7e
|
@ -620,8 +620,10 @@ struct GenerateIndexTimings {
|
|||
pub index_flush_us: u64,
|
||||
pub rent_paying: AtomicUsize,
|
||||
pub amount_to_top_off_rent: AtomicU64,
|
||||
pub total_duplicates: u64,
|
||||
pub total_including_duplicates: u64,
|
||||
pub accounts_data_len_dedup_time_us: u64,
|
||||
pub total_duplicate_slot_keys: u64,
|
||||
pub populate_duplicate_keys_us: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, PartialEq, Eq)]
|
||||
|
@ -668,8 +670,8 @@ impl GenerateIndexTimings {
|
|||
i64
|
||||
),
|
||||
(
|
||||
"total_items_with_duplicates",
|
||||
self.total_duplicates as i64,
|
||||
"total_items_including_duplicates",
|
||||
self.total_including_duplicates as i64,
|
||||
i64
|
||||
),
|
||||
("total_items", self.total_items as i64, i64),
|
||||
|
@ -678,6 +680,16 @@ impl GenerateIndexTimings {
|
|||
self.accounts_data_len_dedup_time_us as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"total_duplicate_slot_keys",
|
||||
self.total_duplicate_slot_keys as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"populate_duplicate_keys_us",
|
||||
self.populate_duplicate_keys_us as i64,
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -8982,7 +8994,7 @@ impl AccountsDb {
|
|||
let insertion_time_us = AtomicU64::new(0);
|
||||
let rent_paying = AtomicUsize::new(0);
|
||||
let amount_to_top_off_rent = AtomicU64::new(0);
|
||||
let total_duplicates = AtomicU64::new(0);
|
||||
let total_including_duplicates = AtomicU64::new(0);
|
||||
let storage_info_timings = Mutex::new(GenerateIndexTimings::default());
|
||||
let scan_time: u64 = slots
|
||||
.par_chunks(chunk_size)
|
||||
|
@ -9025,7 +9037,8 @@ impl AccountsDb {
|
|||
rent_paying.fetch_add(rent_paying_this_slot, Ordering::Relaxed);
|
||||
amount_to_top_off_rent
|
||||
.fetch_add(amount_to_top_off_rent_this_slot, Ordering::Relaxed);
|
||||
total_duplicates.fetch_add(total_this_slot, Ordering::Relaxed);
|
||||
total_including_duplicates
|
||||
.fetch_add(total_this_slot, Ordering::Relaxed);
|
||||
accounts_data_len
|
||||
.fetch_add(accounts_data_len_this_slot, Ordering::Relaxed);
|
||||
let mut rent_paying_accounts_by_partition =
|
||||
|
@ -9088,6 +9101,8 @@ impl AccountsDb {
|
|||
.sum();
|
||||
|
||||
let mut index_flush_us = 0;
|
||||
let mut total_duplicate_slot_keys = 0;
|
||||
let mut populate_duplicate_keys_us = 0;
|
||||
if pass == 0 {
|
||||
// tell accounts index we are done adding the initial accounts at startup
|
||||
let mut m = Measure::start("accounts_index_idle_us");
|
||||
|
@ -9095,21 +9110,25 @@ impl AccountsDb {
|
|||
m.stop();
|
||||
index_flush_us = m.as_us();
|
||||
|
||||
// this has to happen before visit_duplicate_pubkeys_during_startup below
|
||||
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
|
||||
for (slot, key) in self
|
||||
.accounts_index
|
||||
.retrieve_duplicate_keys_from_startup()
|
||||
.into_iter()
|
||||
.flatten()
|
||||
{
|
||||
match self.uncleaned_pubkeys.entry(slot) {
|
||||
Occupied(mut occupied) => occupied.get_mut().push(key),
|
||||
Vacant(vacant) => {
|
||||
vacant.insert(vec![key]);
|
||||
populate_duplicate_keys_us = measure_us!({
|
||||
// this has to happen before visit_duplicate_pubkeys_during_startup below
|
||||
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
|
||||
for (slot, key) in self
|
||||
.accounts_index
|
||||
.retrieve_duplicate_keys_from_startup()
|
||||
.into_iter()
|
||||
.flatten()
|
||||
{
|
||||
total_duplicate_slot_keys += 1;
|
||||
match self.uncleaned_pubkeys.entry(slot) {
|
||||
Occupied(mut occupied) => occupied.get_mut().push(key),
|
||||
Vacant(vacant) => {
|
||||
vacant.insert(vec![key]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.1;
|
||||
}
|
||||
|
||||
let storage_info_timings = storage_info_timings.into_inner().unwrap();
|
||||
|
@ -9123,7 +9142,9 @@ impl AccountsDb {
|
|||
total_items,
|
||||
rent_paying,
|
||||
amount_to_top_off_rent,
|
||||
total_duplicates: total_duplicates.load(Ordering::Relaxed),
|
||||
total_duplicate_slot_keys,
|
||||
populate_duplicate_keys_us,
|
||||
total_including_duplicates: total_including_duplicates.load(Ordering::Relaxed),
|
||||
storage_size_accounts_map_us: storage_info_timings.storage_size_accounts_map_us,
|
||||
storage_size_accounts_map_flatten_us: storage_info_timings
|
||||
.storage_size_accounts_map_flatten_us,
|
||||
|
|
|
@ -1051,12 +1051,19 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
disk.update(&k, |current| {
|
||||
match current {
|
||||
Some((current_slot_list, mut ref_count)) => {
|
||||
// merge this in, mark as conflict
|
||||
// merge this in, mark as duplicate
|
||||
duplicates.push((slot, k));
|
||||
if current_slot_list.len() == 1 {
|
||||
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
|
||||
// That entry could not have known yet that it was a duplicate.
|
||||
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
|
||||
let first_entry_slot = current_slot_list[0].0;
|
||||
duplicates.push((first_entry_slot, k));
|
||||
}
|
||||
let mut slot_list = Vec::with_capacity(current_slot_list.len() + 1);
|
||||
slot_list.extend_from_slice(current_slot_list);
|
||||
slot_list.push((entry.0, entry.1.into())); // will never be from the same slot that already exists in the list
|
||||
ref_count += new_ref_count;
|
||||
duplicates.push((slot, k));
|
||||
Some((slot_list, ref_count))
|
||||
}
|
||||
None => {
|
||||
|
|
Loading…
Reference in New Issue