reduce contention on startup index generation (#31006)
This commit is contained in:
parent
fc2bcdffe2
commit
9600643860
|
@ -103,7 +103,7 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<
|
|||
flushing_active: AtomicBool,
|
||||
|
||||
/// info to streamline initial index generation
|
||||
startup_info: Mutex<StartupInfo<T>>,
|
||||
startup_info: StartupInfo<T>,
|
||||
|
||||
/// possible evictions for next few slots coming up
|
||||
possible_evictions: RwLock<PossibleEvictions<T>>,
|
||||
|
@ -131,9 +131,7 @@ pub enum InsertNewEntryResults {
|
|||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct StartupInfo<T: IndexValue> {
|
||||
/// entries to add next time we are flushing to disk
|
||||
insert: Vec<(Slot, Pubkey, T)>,
|
||||
struct StartupInfoDuplicates<T: IndexValue> {
|
||||
/// entries that were found to have duplicate index entries.
|
||||
/// When all entries have been inserted, these can be resolved and held in memory.
|
||||
duplicates: Vec<(Slot, Pubkey, T)>,
|
||||
|
@ -141,6 +139,14 @@ struct StartupInfo<T: IndexValue> {
|
|||
duplicates_put_on_disk: HashSet<(Slot, Pubkey)>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
struct StartupInfo<T: IndexValue> {
|
||||
/// entries to add next time we are flushing to disk
|
||||
insert: Mutex<Vec<(Slot, Pubkey, T)>>,
|
||||
/// pubkeys with more than 1 entry
|
||||
duplicates: Mutex<StartupInfoDuplicates<T>>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
/// result from scanning in-mem index during flush
|
||||
struct FlushScanResult<T> {
|
||||
|
@ -168,7 +174,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
flushing_active: AtomicBool::default(),
|
||||
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
|
||||
last_age_flushed: AtomicU8::new(Age::MAX),
|
||||
startup_info: Mutex::default(),
|
||||
startup_info: StartupInfo::default(),
|
||||
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
|
||||
// Spread out the scanning across all ages within the window.
|
||||
// This causes us to scan 1/N of the bins each 'Age'
|
||||
|
@ -663,7 +669,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
assert!(self.storage.get_startup());
|
||||
assert!(self.bucket.is_some());
|
||||
|
||||
let insert = &mut self.startup_info.lock().unwrap().insert;
|
||||
let mut insert = self.startup_info.insert.lock().unwrap();
|
||||
items
|
||||
.into_iter()
|
||||
.for_each(|(k, v)| insert.push((slot, k, v)));
|
||||
|
@ -1033,7 +1039,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
}
|
||||
|
||||
fn write_startup_info_to_disk(&self) {
|
||||
let insert = std::mem::take(&mut self.startup_info.lock().unwrap().insert);
|
||||
let insert = std::mem::take(&mut *self.startup_info.insert.lock().unwrap());
|
||||
if insert.is_empty() {
|
||||
// nothing to insert for this bin
|
||||
return;
|
||||
|
@ -1050,7 +1056,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
drop(map_internal);
|
||||
|
||||
// this fn should only be called from a single thread, so holding the lock is fine
|
||||
let mut startup_info = self.startup_info.lock().unwrap();
|
||||
let mut duplicates = self.startup_info.duplicates.lock().unwrap();
|
||||
|
||||
// merge all items into the disk index now
|
||||
let disk = self.bucket.as_ref().unwrap();
|
||||
|
@ -1062,12 +1068,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
match current {
|
||||
Some((current_slot_list, ref_count)) => {
|
||||
// already on disk, so remember the new (slot, info) for later
|
||||
startup_info.duplicates.push((slot, k, entry.1));
|
||||
duplicates.duplicates.push((slot, k, entry.1));
|
||||
if let Some((slot, _)) = current_slot_list.first() {
|
||||
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
|
||||
// That entry could not have known yet that it was a duplicate.
|
||||
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
|
||||
startup_info.duplicates_put_on_disk.insert((*slot, k));
|
||||
duplicates.duplicates_put_on_disk.insert((*slot, k));
|
||||
}
|
||||
Some((current_slot_list.to_vec(), ref_count))
|
||||
}
|
||||
|
@ -1087,13 +1093,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
|||
/// These were collected for this bin when we did batch inserts in the bg flush threads.
|
||||
/// Insert these into the in-mem index, then return the duplicate (Slot, Pubkey)
|
||||
pub(crate) fn populate_and_retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
|
||||
let mut write = self.startup_info.lock().unwrap();
|
||||
let inserts = self.startup_info.insert.lock().unwrap();
|
||||
// in order to return accurate and complete duplicates, we must have nothing left remaining to insert
|
||||
assert!(write.insert.is_empty());
|
||||
assert!(inserts.is_empty());
|
||||
drop(inserts);
|
||||
|
||||
let mut duplicate_items = self.startup_info.duplicates.lock().unwrap();
|
||||
let duplicates = std::mem::take(&mut duplicate_items.duplicates);
|
||||
let duplicates_put_on_disk = std::mem::take(&mut duplicate_items.duplicates_put_on_disk);
|
||||
drop(duplicate_items);
|
||||
|
||||
let duplicates = std::mem::take(&mut write.duplicates);
|
||||
let duplicates_put_on_disk = std::mem::take(&mut write.duplicates_put_on_disk);
|
||||
drop(write);
|
||||
duplicates_put_on_disk
|
||||
.into_iter()
|
||||
.chain(duplicates.into_iter().map(|(slot, key, info)| {
|
||||
|
|
Loading…
Reference in New Issue