generate_index straight to disk and batched (#25947)

* generate_index straight to disk and batched

* renames and comments

* handle in-mem case correctly

* use mutex
This commit is contained in:
Jeff Washington (jwash) 2022-06-15 18:14:39 -05:00 committed by GitHub
parent 631ea93259
commit b02c412d5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 191 additions and 33 deletions

View File

@ -8217,6 +8217,30 @@ impl AccountsDb {
})
.sum();
let mut index_flush_us = 0;
if pass == 0 {
// tell accounts index we are done adding the initial accounts at startup
let mut m = Measure::start("accounts_index_idle_us");
self.accounts_index.set_startup(Startup::Normal);
m.stop();
index_flush_us = m.as_us();
// this has to happen before pubkeys_to_duplicate_accounts_data_len below
// get duplicate keys from acct idx. We have to wait until we've finished flushing.
for (slot, key) in self
.accounts_index
.retrieve_duplicate_keys_from_startup()
.into_iter()
.flatten()
{
match self.uncleaned_pubkeys.entry(slot) {
Occupied(mut occupied) => occupied.get_mut().push(key),
Vacant(vacant) => {
vacant.insert(vec![key]);
}
}
}
}
// subtract data.len() from accounts_data_len for all old accounts that are in the index twice
let mut accounts_data_len_dedup_timer =
Measure::start("handle accounts data len duplicates");
@ -8243,15 +8267,6 @@ impl AccountsDb {
let storage_info_timings = storage_info_timings.into_inner().unwrap();
let mut index_flush_us = 0;
if pass == 0 {
// tell accounts index we are done adding the initial accounts at startup
let mut m = Measure::start("accounts_index_idle_us");
self.accounts_index.set_startup(Startup::Normal);
m.stop();
index_flush_us = m.as_us();
}
let mut timings = GenerateIndexTimings {
index_flush_us,
scan_time,

View File

@ -4,7 +4,7 @@ use {
ancestors::Ancestors,
bucket_map_holder::{Age, BucketMapHolder},
contains::Contains,
in_mem_accounts_index::{InMemAccountsIndex, InsertNewEntryResults},
in_mem_accounts_index::InMemAccountsIndex,
inline_spl_token::{self, GenericTokenAccount},
inline_spl_token_2022,
pubkey_bins::PubkeyBinCalculator24,
@ -1523,7 +1523,7 @@ impl<T: IndexValue> AccountsIndex<T> {
(pubkey_bin, Vec::with_capacity(expected_items_per_bin))
})
.collect::<Vec<_>>();
let mut dirty_pubkeys = items
let dirty_pubkeys = items
.filter_map(|(pubkey, account_info)| {
let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey);
let binned_index = (pubkey_bin + random_offset) % bins;
@ -1531,13 +1531,7 @@ impl<T: IndexValue> AccountsIndex<T> {
let is_zero_lamport = account_info.is_zero_lamport();
let result = if is_zero_lamport { Some(pubkey) } else { None };
let info = PreAllocatedAccountMapEntry::new(
slot,
account_info,
&self.storage.storage,
use_disk,
);
binned[binned_index].1.push((pubkey, info));
binned[binned_index].1.push((pubkey, account_info));
result
})
.collect::<Vec<_>>();
@ -1548,14 +1542,21 @@ impl<T: IndexValue> AccountsIndex<T> {
binned.into_iter().for_each(|(pubkey_bin, items)| {
let w_account_maps = self.account_maps[pubkey_bin].write().unwrap();
let mut insert_time = Measure::start("insert_into_primary_index");
items.into_iter().for_each(|(pubkey, new_item)| {
if let InsertNewEntryResults::ExistedNewEntryNonZeroLamports =
w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_item)
{
// zero lamports were already added to dirty_pubkeys above
dirty_pubkeys.push(pubkey);
}
});
if use_disk {
w_account_maps.startup_insert_only(slot, items.into_iter());
} else {
// not using disk buckets, so just write to in-mem
// this is no longer the default case
items.into_iter().for_each(|(pubkey, account_info)| {
let new_entry = PreAllocatedAccountMapEntry::new(
slot,
account_info,
&self.storage.storage,
use_disk,
);
w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_entry);
});
}
insert_time.stop();
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
});
@ -1563,6 +1564,17 @@ impl<T: IndexValue> AccountsIndex<T> {
(dirty_pubkeys, insertion_time.load(Ordering::Relaxed))
}
/// return Vec<Vec<>> because the internal vecs are already allocated per bin
pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<Vec<(Slot, Pubkey)>> {
(0..self.bins())
.into_iter()
.map(|pubkey_bin| {
let r_account_maps = self.account_maps[pubkey_bin].read().unwrap();
r_account_maps.retrieve_duplicate_keys_from_startup()
})
.collect()
}
/// Updates the given pubkey at the given slot with the new account information.
/// on return, the index's previous account info may be returned in 'reclaims' depending on 'previous_slot_entry_was_cached'
pub fn upsert(
@ -2325,7 +2337,9 @@ pub mod tests {
let index = AccountsIndex::<bool>::default_for_tests();
let account_info = true;
let items = vec![(*pubkey, account_info)];
index.set_startup(Startup::Startup);
index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter());
index.set_startup(Startup::Normal);
let mut ancestors = Ancestors::default();
assert!(index
@ -2358,7 +2372,9 @@ pub mod tests {
let index = AccountsIndex::<AccountInfoTest>::default_for_tests();
let account_info: AccountInfoTest = 0 as AccountInfoTest;
let items = vec![(*pubkey, account_info)];
index.set_startup(Startup::Startup);
index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter());
index.set_startup(Startup::Normal);
let mut ancestors = Ancestors::default();
assert!(index
@ -2462,8 +2478,10 @@ pub mod tests {
let index = AccountsIndex::<bool>::default_for_tests();
let account_infos = [true, false];
index.set_startup(Startup::Startup);
let items = vec![(key0, account_infos[0]), (key1, account_infos[1])];
index.insert_new_if_missing_into_primary_index(slot0, items.len(), items.into_iter());
index.set_startup(Startup::Normal);
for (i, key) in [key0, key1].iter().enumerate() {
let entry = index.get_account_read_entry(key).unwrap();
@ -2476,12 +2494,26 @@ pub mod tests {
account_infos: [T; 2],
is_cached: bool,
upsert: bool,
use_disk: bool,
) {
if is_cached && !upsert {
// This is an illegal combination when we are using queued lazy inserts.
// Cached items don't ever leave the in-mem cache.
// But the queued lazy insert code relies on there being nothing in the in-mem cache.
return;
}
let slot0 = 0;
let slot1 = 1;
let key = Keypair::new().pubkey();
let index = AccountsIndex::<T>::default_for_tests();
let mut config = ACCOUNTS_INDEX_CONFIG_FOR_TESTING;
config.index_limit_mb = if use_disk {
IndexLimitMb::Limit(10_000)
} else {
IndexLimitMb::InMemOnly // in-mem only
};
let index = AccountsIndex::<T>::new(Some(config));
let mut gc = Vec::new();
if upsert {
@ -2498,7 +2530,9 @@ pub mod tests {
);
} else {
let items = vec![(key, account_infos[0])];
index.set_startup(Startup::Startup);
index.insert_new_if_missing_into_primary_index(slot0, items.len(), items.into_iter());
index.set_startup(Startup::Normal);
}
assert!(gc.is_empty());
@ -2534,8 +2568,16 @@ pub mod tests {
UPSERT_PREVIOUS_SLOT_ENTRY_WAS_CACHED_FALSE,
);
} else {
// this has the effect of aging out everything in the in-mem cache
for _ in 0..5 {
index.set_startup(Startup::Startup);
index.set_startup(Startup::Normal);
}
let items = vec![(key, account_infos[1])];
index.set_startup(Startup::Startup);
index.insert_new_if_missing_into_primary_index(slot1, items.len(), items.into_iter());
index.set_startup(Startup::Normal);
}
assert!(gc.is_empty());
@ -2572,12 +2614,14 @@ pub mod tests {
#[test]
fn test_new_entry_and_update_code_paths() {
for is_upsert in &[false, true] {
// account_info type that IS cached
test_new_entry_code_paths_helper([1.0, 2.0], true, *is_upsert);
for use_disk in [false, true] {
for is_upsert in &[false, true] {
// account_info type that IS cached
test_new_entry_code_paths_helper([1.0, 2.0], true, *is_upsert, use_disk);
// account_info type that is NOT cached
test_new_entry_code_paths_helper([true, false], false, *is_upsert);
// account_info type that is NOT cached
test_new_entry_code_paths_helper([true, false], false, *is_upsert, use_disk);
}
}
}

View File

@ -21,7 +21,7 @@ use {
ops::{Bound, RangeBounds, RangeInclusive},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering},
Arc, RwLock, RwLockWriteGuard,
Arc, Mutex, RwLock, RwLockWriteGuard,
},
},
};
@ -49,6 +49,9 @@ pub struct InMemAccountsIndex<T: IndexValue> {
stop_evictions: AtomicU64,
// set to true while this bin is being actively flushed
flushing_active: AtomicBool,
/// info to streamline initial index generation
startup_info: Mutex<StartupInfo<T>>,
}
impl<T: IndexValue> Debug for InMemAccountsIndex<T> {
@ -63,6 +66,14 @@ pub enum InsertNewEntryResults {
ExistedNewEntryNonZeroLamports,
}
#[derive(Default, Debug)]
struct StartupInfo<T: IndexValue> {
/// entries to add next time we are flushing to disk
insert: Vec<(Slot, Pubkey, T)>,
/// pubkeys that were found to have duplicate index entries
duplicates: Vec<(Slot, Pubkey)>,
}
/// result from scanning in-mem index during flush
struct FlushScanResult<T> {
/// pubkeys whose age indicates they may be evicted now, pending further checks.
@ -88,6 +99,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
flushing_active: AtomicBool::default(),
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
last_age_flushed: AtomicU8::new(Age::MAX),
startup_info: Mutex::default(),
}
}
@ -577,6 +589,18 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
self.stats().count_in_bucket(self.bin)
}
/// Queue up these insertions for when the flush thread is dealing with this bin.
/// This is very fast and requires no lookups or disk access.
pub fn startup_insert_only(&self, slot: Slot, items: impl Iterator<Item = (Pubkey, T)>) {
assert!(self.storage.get_startup());
assert!(self.bucket.is_some());
let insert = &mut self.startup_info.lock().unwrap().insert;
items
.into_iter()
.for_each(|(k, v)| insert.push((slot, k, v)));
}
pub fn insert_new_entry_if_missing_with_lock(
&self,
pubkey: Pubkey,
@ -982,6 +1006,77 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}
}
fn write_startup_info_to_disk(&self) {
let mut insert = vec![];
{
let mut lock = self.startup_info.lock().unwrap();
std::mem::swap(&mut insert, &mut lock.insert);
}
if insert.is_empty() {
// nothing to insert for this bin
return;
}
// during startup, nothing should be in the in-mem map
assert!(
self.map_internal.read().unwrap().is_empty(),
"len: {}, first: {:?}",
self.map_internal.read().unwrap().len(),
self.map_internal
.read()
.unwrap()
.iter()
.take(1)
.collect::<Vec<_>>()
);
let mut duplicates = vec![];
// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
let mut duplicate = vec![];
insert.into_iter().for_each(|(slot, k, v)| {
let entry = (slot, v);
let new_ref_count = if v.is_cached() { 0 } else { 1 };
disk.update(&k, |current| {
match current {
Some((current_slot_list, mut ref_count)) => {
// merge this in, mark as conflict
let mut slot_list = Vec::with_capacity(current_slot_list.len() + 1);
slot_list.extend_from_slice(current_slot_list);
slot_list.push(entry); // will never be from the same slot that already exists in the list
ref_count += new_ref_count;
duplicate.push((slot, k));
Some((slot_list, ref_count))
}
None => {
// not on disk, insert it
Some((vec![entry], new_ref_count))
}
}
});
});
self.startup_info
.lock()
.unwrap()
.duplicates
.append(&mut duplicates);
}
/// pull out all duplicate pubkeys from 'startup_info'
/// duplicate pubkeys have a slot list with len > 1
/// These were collected for this bin when we did batch inserts in the bg flush threads.
pub fn retrieve_duplicate_keys_from_startup(&self) -> Vec<(Slot, Pubkey)> {
let mut write = self.startup_info.lock().unwrap();
// in order to return accurate and complete duplicates, we must have nothing left remaining to insert
assert!(write.insert.is_empty());
let write = &mut write.duplicates;
let mut duplicates = vec![];
std::mem::swap(&mut duplicates, write);
duplicates
}
/// synchronize the in-mem index with the disk index
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
let current_age = self.storage.current_age();
@ -999,6 +1094,10 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
mut evictions_random,
} = self.flush_scan(current_age, startup, flush_guard);
if startup {
self.write_startup_info_to_disk();
}
// write to disk outside in-mem map read lock
{
let mut evictions_age = Vec::with_capacity(evictions_age_possible.len());