AcctIdx: initial index items have future age to flush (#20010)

This commit is contained in:
Jeff Washington (jwash) 2021-09-19 20:22:09 -05:00 committed by GitHub
parent c1d181add5
commit af5b0d42a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 11 deletions

View File

@ -1,6 +1,7 @@
use crate::{
accounts_index_storage::AccountsIndexStorage,
ancestors::Ancestors,
bucket_map_holder::{Age, BucketMapHolder},
contains::Contains,
in_mem_accounts_index::InMemAccountsIndex,
inline_spl_token_v2_0::{self, SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET},
@ -41,12 +42,14 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex
flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: None,
ages_to_stay_in_cache: None,
};
pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig {
bins: Some(BINS_FOR_BENCHMARKS),
flush_threads: Some(FLUSH_THREADS_TESTING),
drives: None,
index_limit_mb: None,
ages_to_stay_in_cache: None,
};
pub type ScanResult<T> = Result<T, ScanError>;
pub type SlotList<T> = Vec<(Slot, T)>;
@ -104,6 +107,7 @@ pub struct AccountsIndexConfig {
pub flush_threads: Option<usize>,
pub drives: Option<Vec<PathBuf>>,
pub index_limit_mb: Option<usize>,
pub ages_to_stay_in_cache: Option<Age>,
}
#[derive(Debug, Default, Clone)]
@ -134,10 +138,10 @@ pub struct AccountMapEntryMeta {
}
impl AccountMapEntryMeta {
pub fn new_dirty() -> Self {
pub fn new_dirty<T: IndexValue>(storage: &Arc<BucketMapHolder<T>>) -> Self {
AccountMapEntryMeta {
dirty: AtomicBool::new(true),
age: AtomicU8::default(),
age: AtomicU8::new(storage.future_age_to_flush()),
}
}
}
@ -252,12 +256,16 @@ impl<T: IndexValue> WriteAccountMapEntry<T> {
// 1. new empty (refcount=0, slot_list={})
// 2. update(slot, account_info)
// This code is called when the first entry [ie. (slot,account_info)] for a pubkey is inserted into the index.
pub fn new_entry_after_update(slot: Slot, account_info: T) -> AccountMapEntry<T> {
pub fn new_entry_after_update(
slot: Slot,
account_info: T,
storage: &Arc<BucketMapHolder<T>>,
) -> AccountMapEntry<T> {
let ref_count = if account_info.is_cached() { 0 } else { 1 };
Arc::new(AccountMapEntryInner {
ref_count: AtomicU64::new(ref_count),
slot_list: RwLock::new(vec![(slot, account_info)]),
meta: AccountMapEntryMeta::new_dirty(),
meta: AccountMapEntryMeta::new_dirty(storage),
})
}
@ -1531,7 +1539,11 @@ impl<T: IndexValue> AccountsIndex<T> {
let is_zero_lamport = account_info.is_zero_lamport();
let result = if is_zero_lamport { Some(pubkey) } else { None };
let info = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
let info = WriteAccountMapEntry::new_entry_after_update(
slot,
account_info,
&self.storage.storage,
);
binned[bin].1.push((pubkey, info));
result
})
@ -1591,7 +1603,8 @@ impl<T: IndexValue> AccountsIndex<T> {
// - The secondary index is never consulted as primary source of truth for gets/stores.
// So, what the accounts_index sees alone is sufficient as a source of truth for other non-scan
// account operations.
let new_item = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
let new_item =
WriteAccountMapEntry::new_entry_after_update(slot, account_info, &self.storage.storage);
let map = &self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)];
let r_account_maps = map.read().unwrap();
@ -2769,8 +2782,13 @@ pub mod tests {
let slot = 0;
// account_info type that IS cached
let account_info = AccountInfoTest::default();
let index = AccountsIndex::default_for_tests();
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
let new_entry = WriteAccountMapEntry::new_entry_after_update(
slot,
account_info,
&index.storage.storage,
);
assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 0);
assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1);
assert_eq!(
@ -2780,8 +2798,13 @@ pub mod tests {
// account_info type that is NOT cached
let account_info = true;
let index = AccountsIndex::default_for_tests();
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
let new_entry = WriteAccountMapEntry::new_entry_after_update(
slot,
account_info,
&index.storage.storage,
);
assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 1);
assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1);
assert_eq!(
@ -2845,7 +2868,11 @@ pub mod tests {
assert_eq!(entry.ref_count(), if is_cached { 0 } else { 1 });
let expected = vec![(slot0, account_infos[0])];
assert_eq!(entry.slot_list().to_vec(), expected);
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot0, account_infos[0]);
let new_entry = WriteAccountMapEntry::new_entry_after_update(
slot0,
account_infos[0],
&index.storage.storage,
);
assert_eq!(
entry.slot_list().to_vec(),
new_entry.slot_list.read().unwrap().to_vec(),
@ -2891,7 +2918,11 @@ pub mod tests {
vec![(slot0, account_infos[0]), (slot1, account_infos[1])]
);
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot1, account_infos[1]);
let new_entry = WriteAccountMapEntry::new_entry_after_update(
slot1,
account_infos[1],
&index.storage.storage,
);
assert_eq!(entry.slot_list()[1], new_entry.slot_list.read().unwrap()[0],);
}
}
@ -2914,7 +2945,11 @@ pub mod tests {
let slot = 0;
let account_info = true;
let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info);
let new_entry = WriteAccountMapEntry::new_entry_after_update(
slot,
account_info,
&index.storage.storage,
);
assert_eq!(0, account_maps_len_expensive(&index));
// will fail because key doesn't exist

View File

@ -20,6 +20,7 @@ pub struct BucketMapHolder<T: IndexValue> {
// how much mb are we allowed to keep in the in-mem index?
// Rest goes to disk.
pub mem_budget_mb: Option<usize>,
ages_to_stay_in_cache: Age,
/// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible
/// with less thread count limitations. LRU and access patterns are not important. Freeing memory
@ -46,6 +47,9 @@ impl<T: IndexValue> BucketMapHolder<T> {
assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed
}
pub fn future_age_to_flush(&self) -> Age {
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
}
/// used by bg processes to determine # active threads and how aggressively to flush
pub fn get_startup(&self) -> bool {
self.startup.load(Ordering::Relaxed)
@ -76,7 +80,13 @@ impl<T: IndexValue> BucketMapHolder<T> {
}
pub fn new(bins: usize, config: &Option<AccountsIndexConfig>) -> Self {
const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5;
let ages_to_stay_in_cache = config
.as_ref()
.and_then(|config| config.ages_to_stay_in_cache)
.unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE);
Self {
ages_to_stay_in_cache,
count_ages_flushed: AtomicUsize::default(),
age: AtomicU8::default(),
stats: BucketMapHolderStats::default(),