diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 719a18872d..9b76da9503 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,6 +1,7 @@ use crate::{ accounts_index_storage::AccountsIndexStorage, ancestors::Ancestors, + bucket_map_holder::{Age, BucketMapHolder}, contains::Contains, in_mem_accounts_index::InMemAccountsIndex, inline_spl_token_v2_0::{self, SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET}, @@ -41,12 +42,14 @@ pub const ACCOUNTS_INDEX_CONFIG_FOR_TESTING: AccountsIndexConfig = AccountsIndex flush_threads: Some(FLUSH_THREADS_TESTING), drives: None, index_limit_mb: None, + ages_to_stay_in_cache: None, }; pub const ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS: AccountsIndexConfig = AccountsIndexConfig { bins: Some(BINS_FOR_BENCHMARKS), flush_threads: Some(FLUSH_THREADS_TESTING), drives: None, index_limit_mb: None, + ages_to_stay_in_cache: None, }; pub type ScanResult = Result; pub type SlotList = Vec<(Slot, T)>; @@ -104,6 +107,7 @@ pub struct AccountsIndexConfig { pub flush_threads: Option, pub drives: Option>, pub index_limit_mb: Option, + pub ages_to_stay_in_cache: Option, } #[derive(Debug, Default, Clone)] @@ -134,10 +138,10 @@ pub struct AccountMapEntryMeta { } impl AccountMapEntryMeta { - pub fn new_dirty() -> Self { + pub fn new_dirty(storage: &Arc>) -> Self { AccountMapEntryMeta { dirty: AtomicBool::new(true), - age: AtomicU8::default(), + age: AtomicU8::new(storage.future_age_to_flush()), } } } @@ -252,12 +256,16 @@ impl WriteAccountMapEntry { // 1. new empty (refcount=0, slot_list={}) // 2. update(slot, account_info) // This code is called when the first entry [ie. (slot,account_info)] for a pubkey is inserted into the index. - pub fn new_entry_after_update(slot: Slot, account_info: T) -> AccountMapEntry { + pub fn new_entry_after_update( + slot: Slot, + account_info: T, + storage: &Arc>, + ) -> AccountMapEntry { let ref_count = if account_info.is_cached() { 0 } else { 1 }; Arc::new(AccountMapEntryInner { ref_count: AtomicU64::new(ref_count), slot_list: RwLock::new(vec![(slot, account_info)]), - meta: AccountMapEntryMeta::new_dirty(), + meta: AccountMapEntryMeta::new_dirty(storage), }) } @@ -1531,7 +1539,11 @@ impl AccountsIndex { let is_zero_lamport = account_info.is_zero_lamport(); let result = if is_zero_lamport { Some(pubkey) } else { None }; - let info = WriteAccountMapEntry::new_entry_after_update(slot, account_info); + let info = WriteAccountMapEntry::new_entry_after_update( + slot, + account_info, + &self.storage.storage, + ); binned[bin].1.push((pubkey, info)); result }) @@ -1591,7 +1603,8 @@ impl AccountsIndex { // - The secondary index is never consulted as primary source of truth for gets/stores. // So, what the accounts_index sees alone is sufficient as a source of truth for other non-scan // account operations. - let new_item = WriteAccountMapEntry::new_entry_after_update(slot, account_info); + let new_item = + WriteAccountMapEntry::new_entry_after_update(slot, account_info, &self.storage.storage); let map = &self.account_maps[self.bin_calculator.bin_from_pubkey(pubkey)]; let r_account_maps = map.read().unwrap(); @@ -2769,8 +2782,13 @@ pub mod tests { let slot = 0; // account_info type that IS cached let account_info = AccountInfoTest::default(); + let index = AccountsIndex::default_for_tests(); - let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info); + let new_entry = WriteAccountMapEntry::new_entry_after_update( + slot, + account_info, + &index.storage.storage, + ); assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 0); assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1); assert_eq!( @@ -2780,8 +2798,13 @@ pub mod tests { // account_info type that is NOT cached let account_info = true; + let index = AccountsIndex::default_for_tests(); - let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info); + let new_entry = WriteAccountMapEntry::new_entry_after_update( + slot, + account_info, + &index.storage.storage, + ); assert_eq!(new_entry.ref_count.load(Ordering::Relaxed), 1); assert_eq!(new_entry.slot_list.read().unwrap().capacity(), 1); assert_eq!( @@ -2845,7 +2868,11 @@ pub mod tests { assert_eq!(entry.ref_count(), if is_cached { 0 } else { 1 }); let expected = vec![(slot0, account_infos[0])]; assert_eq!(entry.slot_list().to_vec(), expected); - let new_entry = WriteAccountMapEntry::new_entry_after_update(slot0, account_infos[0]); + let new_entry = WriteAccountMapEntry::new_entry_after_update( + slot0, + account_infos[0], + &index.storage.storage, + ); assert_eq!( entry.slot_list().to_vec(), new_entry.slot_list.read().unwrap().to_vec(), @@ -2891,7 +2918,11 @@ pub mod tests { vec![(slot0, account_infos[0]), (slot1, account_infos[1])] ); - let new_entry = WriteAccountMapEntry::new_entry_after_update(slot1, account_infos[1]); + let new_entry = WriteAccountMapEntry::new_entry_after_update( + slot1, + account_infos[1], + &index.storage.storage, + ); assert_eq!(entry.slot_list()[1], new_entry.slot_list.read().unwrap()[0],); } } @@ -2914,7 +2945,11 @@ pub mod tests { let slot = 0; let account_info = true; - let new_entry = WriteAccountMapEntry::new_entry_after_update(slot, account_info); + let new_entry = WriteAccountMapEntry::new_entry_after_update( + slot, + account_info, + &index.storage.storage, + ); assert_eq!(0, account_maps_len_expensive(&index)); // will fail because key doesn't exist diff --git a/runtime/src/bucket_map_holder.rs b/runtime/src/bucket_map_holder.rs index a43e4694b3..8c7b1fd2ad 100644 --- a/runtime/src/bucket_map_holder.rs +++ b/runtime/src/bucket_map_holder.rs @@ -20,6 +20,7 @@ pub struct BucketMapHolder { // how much mb are we allowed to keep in the in-mem index? // Rest goes to disk. pub mem_budget_mb: Option, + ages_to_stay_in_cache: Age, /// startup is a special time for flush to focus on moving everything to disk as fast and efficiently as possible /// with less thread count limitations. LRU and access patterns are not important. Freeing memory @@ -46,6 +47,9 @@ impl BucketMapHolder { assert!(previous >= self.bins); // we should not have increased age before previous age was fully flushed } + pub fn future_age_to_flush(&self) -> Age { + self.current_age().wrapping_add(self.ages_to_stay_in_cache) + } /// used by bg processes to determine # active threads and how aggressively to flush pub fn get_startup(&self) -> bool { self.startup.load(Ordering::Relaxed) @@ -76,7 +80,13 @@ impl BucketMapHolder { } pub fn new(bins: usize, config: &Option) -> Self { + const DEFAULT_AGE_TO_STAY_IN_CACHE: Age = 5; + let ages_to_stay_in_cache = config + .as_ref() + .and_then(|config| config.ages_to_stay_in_cache) + .unwrap_or(DEFAULT_AGE_TO_STAY_IN_CACHE); Self { + ages_to_stay_in_cache, count_ages_flushed: AtomicUsize::default(), age: AtomicU8::default(), stats: BucketMapHolderStats::default(),