at startup, keep duplicates in in-memory index since they will be cleaned shortly (#30736)
at startup, keep duplicates in in-memory index since they will be cleaned soon
This commit is contained in:
parent
81ef2a0d75
commit
9a1d5ea95d
|
@ -9115,7 +9115,7 @@ impl AccountsDb {
|
|||
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
|
||||
for (slot, key) in self
|
||||
.accounts_index
|
||||
.retrieve_duplicate_keys_from_startup()
|
||||
.populate_and_retrieve_duplicate_keys_from_startup()
|
||||
.into_iter()
|
||||
.flatten()
|
||||
{
|
||||
|
|
|
@ -1685,11 +1685,14 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
|
|||
}
|
||||
|
||||
/// return Vec<Vec<>> because the internal vecs are already allocated per bin
|
||||
pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<Vec<(Slot, Pubkey)>> {
|
||||
pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(
|
||||
&self,
|
||||
) -> Vec<Vec<(Slot, Pubkey)>> {
|
||||
(0..self.bins())
|
||||
.into_par_iter()
|
||||
.map(|pubkey_bin| {
|
||||
let r_account_maps = &self.account_maps[pubkey_bin];
|
||||
r_account_maps.retrieve_duplicate_keys_from_startup()
|
||||
r_account_maps.populate_and_retrieve_duplicate_keys_from_startup()
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
@ -2699,6 +2702,7 @@ pub mod tests {
|
|||
index.set_startup(Startup::Normal);
|
||||
}
|
||||
assert!(gc.is_empty());
|
||||
index.populate_and_retrieve_duplicate_keys_from_startup();
|
||||
|
||||
for lock in &[false, true] {
|
||||
let read_lock = if *lock {
|
||||
|
|
|
@ -13,7 +13,7 @@ use {
|
|||
solana_measure::measure::Measure,
|
||||
solana_sdk::{clock::Slot, pubkey::Pubkey},
|
||||
std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
collections::{hash_map::Entry, HashMap, HashSet},
|
||||
fmt::Debug,
|
||||
ops::{Bound, RangeBounds, RangeInclusive},
|
||||
sync::{
|
||||
|
@ -128,8 +128,11 @@ pub enum InsertNewEntryResults {
|
|||
struct StartupInfo<T: IndexValue> {
|
||||
/// entries to add next time we are flushing to disk
|
||||
insert: Vec<(Slot, Pubkey, T)>,
|
||||
/// pubkeys that were found to have duplicate index entries
|
||||
duplicates: Vec<(Slot, Pubkey)>,
|
||||
/// entries that were found to have duplicate index entries.
|
||||
/// When all entries have been inserted, these can be resolved and held in memory.
|
||||
duplicates: Vec<(Slot, Pubkey, T)>,
|
||||
/// pubkeys that were already added to disk and later found to be duplicates,
|
||||
duplicates_put_on_disk: HashSet<(Slot, Pubkey)>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
|
@ -1040,7 +1043,8 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
);
|
||||
drop(map_internal);
|
||||
|
||||
let mut duplicates = vec![];
|
||||
// this fn should only be called from a single thread, so holding the lock is fine
|
||||
let mut startup_info = self.startup_info.lock().unwrap();
|
||||
|
||||
// merge all items into the disk index now
|
||||
let disk = self.bucket.as_ref().unwrap();
|
||||
|
@ -1050,21 +1054,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
let new_ref_count = u64::from(!v.is_cached());
|
||||
disk.update(&k, |current| {
|
||||
match current {
|
||||
Some((current_slot_list, mut ref_count)) => {
|
||||
// merge this in, mark as duplicate
|
||||
duplicates.push((slot, k));
|
||||
if current_slot_list.len() == 1 {
|
||||
Some((current_slot_list, ref_count)) => {
|
||||
// already on disk, so remember the new (slot, info) for later
|
||||
startup_info.duplicates.push((slot, k, entry.1));
|
||||
if let Some((slot, _)) = current_slot_list.first() {
|
||||
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
|
||||
// That entry could not have known yet that it was a duplicate.
|
||||
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
|
||||
let first_entry_slot = current_slot_list[0].0;
|
||||
duplicates.push((first_entry_slot, k));
|
||||
startup_info.duplicates_put_on_disk.insert((*slot, k));
|
||||
}
|
||||
let mut slot_list = Vec::with_capacity(current_slot_list.len() + 1);
|
||||
slot_list.extend_from_slice(current_slot_list);
|
||||
slot_list.push((entry.0, entry.1.into())); // will never be from the same slot that already exists in the list
|
||||
ref_count += new_ref_count;
|
||||
Some((slot_list, ref_count))
|
||||
Some((current_slot_list.to_vec(), ref_count))
|
||||
}
|
||||
None => {
|
||||
count += 1;
|
||||
|
@ -1075,22 +1074,28 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
});
|
||||
});
|
||||
self.stats().inc_insert_count(count);
|
||||
self.startup_info
|
||||
.lock()
|
||||
.unwrap()
|
||||
.duplicates
|
||||
.append(&mut duplicates);
|
||||
}
|
||||
|
||||
/// pull out all duplicate pubkeys from 'startup_info'
|
||||
/// duplicate pubkeys have a slot list with len > 1
|
||||
/// These were collected for this bin when we did batch inserts in the bg flush threads.
|
||||
pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
|
||||
/// Insert these into the in-mem index, then return the duplicate (Slot, Pubkey)
|
||||
pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
|
||||
let mut write = self.startup_info.lock().unwrap();
|
||||
// in order to return accurate and complete duplicates, we must have nothing left remaining to insert
|
||||
assert!(write.insert.is_empty());
|
||||
|
||||
std::mem::take(&mut write.duplicates)
|
||||
let duplicates = std::mem::take(&mut write.duplicates);
|
||||
let duplicates_put_on_disk = std::mem::take(&mut write.duplicates_put_on_disk);
|
||||
drop(write);
|
||||
duplicates_put_on_disk
|
||||
.into_iter()
|
||||
.chain(duplicates.into_iter().map(|(slot, key, info)| {
|
||||
let entry = PreAllocatedAccountMapEntry::new(slot, info, &self.storage, true);
|
||||
self.insert_new_entry_if_missing_with_lock(key, entry);
|
||||
(slot, key)
|
||||
}))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// synchronize the in-mem index with the disk index
|
||||
|
|
Loading…
Reference in New Issue