AcctIdx: generate index inserts/updates directly to disk (#21363)

* when initially creating account index, write directly to disk

* AcctIdx: generate index inserts/updates directly to disk
This commit is contained in:
Jeff Washington (jwash) 2021-11-19 17:17:07 -06:00 committed by GitHub
parent 0bb059185c
commit ebea3297f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 47 deletions

View File

@ -482,9 +482,9 @@ impl<T: Clone + Copy> Bucket<T> {
}
}
pub fn update<F>(&mut self, key: &Pubkey, updatefn: F)
pub fn update<F>(&mut self, key: &Pubkey, mut updatefn: F)
where
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
{
let current = self.read_value(key);
let new = updatefn(current);

View File

@ -128,7 +128,7 @@ impl<T: Clone + Copy> BucketApi<T> {
pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
{
let mut bucket = self.get_write_bucket();
bucket.as_mut().unwrap().update(key, updatefn)

View File

@ -148,7 +148,7 @@ impl<T: Clone + Copy + Debug> BucketMap<T> {
/// Update Pubkey `key`'s value with function `updatefn`
pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: Fn(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
{
self.get_bucket(key).update(key, updatefn)
}

View File

@ -3,7 +3,7 @@ use crate::{
ancestors::Ancestors,
bucket_map_holder::{Age, BucketMapHolder},
contains::Contains,
in_mem_accounts_index::InMemAccountsIndex,
in_mem_accounts_index::{InMemAccountsIndex, InsertNewEntryResults},
inline_spl_token_v2_0::{self, SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET},
pubkey_bins::PubkeyBinCalculator24,
secondary_index::*,
@ -1648,28 +1648,14 @@ impl<T: IndexValue> AccountsIndex<T> {
let insertion_time = AtomicU64::new(0);
binned.into_iter().for_each(|(pubkey_bin, items)| {
let mut _reclaims = SlotList::new();
// big enough so not likely to re-allocate, small enough to not over-allocate by too much
// this assumes 10% of keys are duplicates. This vector will be flattened below.
let w_account_maps = self.account_maps[pubkey_bin].write().unwrap();
let mut insert_time = Measure::start("insert_into_primary_index");
items.into_iter().for_each(|(pubkey, new_item)| {
let already_exists =
w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_item);
if let Some((account_entry, account_info, pubkey)) = already_exists {
let is_zero_lamport = account_info.is_zero_lamport();
InMemAccountsIndex::lock_and_update_slot_list(
&account_entry,
(slot, account_info),
&mut _reclaims,
false,
);
if !is_zero_lamport {
// zero lamports were already added to dirty_pubkeys above
dirty_pubkeys.push(pubkey);
}
if let InsertNewEntryResults::ExistedNewEntryNonZeroLamports =
w_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_item)
{
// zero lamports were already added to dirty_pubkeys above
dirty_pubkeys.push(pubkey);
}
});
insert_time.stop();

View File

@ -47,6 +47,12 @@ impl<T: IndexValue> Debug for InMemAccountsIndex<T> {
}
}
pub enum InsertNewEntryResults {
DidNotExist,
ExistedNewEntryZeroLamports,
ExistedNewEntryNonZeroLamports,
}
#[allow(dead_code)] // temporary during staging
impl<T: IndexValue> InMemAccountsIndex<T> {
pub fn new(storage: &Arc<BucketMapHolder<T>>, bin: usize) -> Self {
@ -479,52 +485,85 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
)
}
// return None if item was created new
// if entry for pubkey already existed, return Some(entry). Caller needs to call entry.update.
pub fn insert_new_entry_if_missing_with_lock(
&self,
pubkey: Pubkey,
new_entry: PreAllocatedAccountMapEntry<T>,
) -> Option<(AccountMapEntry<T>, T, Pubkey)> {
) -> InsertNewEntryResults {
let mut m = Measure::start("entry");
let mut map = self.map().write().unwrap();
let entry = map.entry(pubkey);
m.stop();
let found = matches!(entry, Entry::Occupied(_));
let result = match entry {
Entry::Occupied(occupied) => Some(Self::insert_returner(
occupied.get(),
occupied.key(),
new_entry,
)),
let mut new_entry_zero_lamports = false;
let (found_in_mem, already_existed) = match entry {
Entry::Occupied(occupied) => {
// in cache, so merge into cache
let (slot, account_info) = new_entry.into();
new_entry_zero_lamports = account_info.is_zero_lamport();
InMemAccountsIndex::lock_and_update_slot_list(
occupied.get(),
(slot, account_info),
&mut Vec::default(),
false,
);
(
true, /* found in mem */
true, /* already existed */
)
}
Entry::Vacant(vacant) => {
// not in cache, look on disk
let disk_entry = self.load_account_entry_from_disk(vacant.key());
self.stats().insert_or_delete_mem(true, self.bin);
if let Some(disk_entry) = disk_entry {
// on disk, so insert into cache, then return cache value so caller will merge
let result = Some(Self::insert_returner(&disk_entry, vacant.key(), new_entry));
assert!(disk_entry.dirty());
vacant.insert(disk_entry);
result
let mut existed = false;
if let Some(disk) = self.bucket.as_ref() {
let (slot, account_info) = new_entry.into();
new_entry_zero_lamports = account_info.is_zero_lamport();
disk.update(vacant.key(), |current| {
if let Some((slot_list, mut ref_count)) = current {
// on disk, so merge and update disk
let mut slot_list = slot_list.to_vec();
let addref = Self::update_slot_list(
&mut slot_list,
slot,
account_info,
&mut Vec::default(),
false,
);
if addref {
ref_count += 1
};
existed = true;
Some((slot_list, ref_count))
} else {
// doesn't exist on disk yet, so insert it
let ref_count = if account_info.is_cached() { 0 } else { 1 };
Some((vec![(slot, account_info)], ref_count))
}
});
} else {
// not on disk, so insert new thing and we're done
// not using disk, so insert into mem
self.stats().insert_or_delete_mem(true, self.bin);
let new_entry: AccountMapEntry<T> = new_entry.into();
assert!(new_entry.dirty());
vacant.insert(new_entry);
None // returns None if item was created new
}
(false, existed)
}
};
drop(map);
self.update_entry_stats(m, found);
self.update_entry_stats(m, found_in_mem);
let stats = self.stats();
if result.is_none() {
if !already_existed {
stats.insert_or_delete(true, self.bin);
} else {
Self::update_stat(&stats.updates_in_mem, 1);
}
result
if !already_existed {
InsertNewEntryResults::DidNotExist
} else if new_entry_zero_lamports {
InsertNewEntryResults::ExistedNewEntryZeroLamports
} else {
InsertNewEntryResults::ExistedNewEntryNonZeroLamports
}
}
pub fn just_set_hold_range_in_memory<R>(&self, range: &R, start_holding: bool)