diff --git a/runtime/src/read_only_accounts_cache.rs b/runtime/src/read_only_accounts_cache.rs index 876da9b548..71a0947158 100644 --- a/runtime/src/read_only_accounts_cache.rs +++ b/runtime/src/read_only_accounts_cache.rs @@ -1,23 +1,24 @@ //! ReadOnlyAccountsCache used to store accounts, such as executable accounts, //! which can be large, loaded many times, and rarely change. -use dashmap::DashMap; -use std::{ - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, RwLock, - }, - time::Instant, -}; - +use dashmap::{mapref::entry::Entry, DashMap}; +//use mapref::entry::{Entry, OccupiedEntry, VacantEntry}; use solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Slot, pubkey::Pubkey, }; +use std::thread::{sleep, Builder, JoinHandle}; +use std::time::Duration; +use std::{ + sync::{ + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + Arc, RwLock, + }, + time::Instant, +}; type ReadOnlyCacheKey = (Pubkey, Slot); -type LruEntry = (Instant, usize, ReadOnlyCacheKey); -type LruList = Arc>>; +type LruEntry = (Instant, ReadOnlyCacheKey); #[derive(Debug)] pub struct ReadOnlyAccountCacheEntry { @@ -27,25 +28,62 @@ pub struct ReadOnlyAccountCacheEntry { #[derive(Debug)] pub struct ReadOnlyAccountsCache { - cache: DashMap, + cache: Arc>, max_data_size: usize, - data_size: Arc>, + data_size: Arc, hits: AtomicU64, misses: AtomicU64, - lru: LruList, per_account_size: usize, + stop: Arc, + background: Option>, +} + +impl Drop for ReadOnlyAccountsCache { + fn drop(&mut self) { + self.stop.store(true, Ordering::Relaxed); + if let Some(background) = self.background.take() { + background.join().unwrap(); + } + } } impl ReadOnlyAccountsCache { pub fn new(max_data_size: usize) -> Self { - Self { + let mut result = Self::new_test(max_data_size); + + let bg = Self { max_data_size, - cache: DashMap::default(), - data_size: Arc::new(RwLock::new(0)), + cache: result.cache.clone(), + data_size: result.data_size.clone(), hits: AtomicU64::new(0), misses: AtomicU64::new(0), - lru: Arc::new(RwLock::new(Vec::new())), per_account_size: Self::per_account_size(), + stop: result.stop.clone(), + background: None, + }; + + result.background = Some( + Builder::new() + .name("solana-readonly-accounts-cache".to_string()) + .spawn(move || { + bg.bg_purge_lru_items(false); + }) + .unwrap(), + ); + + result + } + + fn new_test(max_data_size: usize) -> Self { + Self { + max_data_size, + cache: Arc::new(DashMap::default()), + data_size: Arc::new(AtomicUsize::new(0)), + hits: AtomicU64::new(0), + misses: AtomicU64::new(0), + per_account_size: Self::per_account_size(), + stop: Arc::new(AtomicBool::new(false)), + background: None, } } @@ -71,19 +109,37 @@ impl ReadOnlyAccountsCache { }) } + fn account_size(&self, account: &AccountSharedData) -> usize { + account.data().len() + self.per_account_size + } + pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) { - let len = account.data().len() + self.per_account_size; - self.cache.insert( + let len = self.account_size(account); + let previous_len = if let Some(previous) = self.cache.insert( (*pubkey, slot), ReadOnlyAccountCacheEntry { account: account.clone(), last_used: Arc::new(RwLock::new(Instant::now())), }, - ); + ) { + self.account_size(&previous.account) + } else { + 0 + }; - // maybe purge after we insert. Insert may have replaced. - let new_size = self.maybe_purge_lru_items(len); - *self.data_size.write().unwrap() = new_size; + match len.cmp(&previous_len) { + std::cmp::Ordering::Greater => { + self.data_size + .fetch_add(len - previous_len, Ordering::Relaxed); + } + std::cmp::Ordering::Less => { + self.data_size + .fetch_sub(previous_len - len, Ordering::Relaxed); + } + std::cmp::Ordering::Equal => { + // no change in size + } + }; } pub fn remove(&self, pubkey: &Pubkey, slot: Slot) { @@ -92,78 +148,78 @@ impl ReadOnlyAccountsCache { self.cache.remove(&(*pubkey, slot)); } - fn purge_lru_list( - &self, - lru: &mut Vec, - verify_timestamp: bool, - mut current_size: usize, - ) -> usize { - let mut processed = 0; - for lru_item in lru.iter() { - let (timestamp, size, key) = lru_item; - processed += 1; - let mut try_remove = true; - if verify_timestamp { - let item = self.cache.get(key); - match item { - Some(item) => { - if *timestamp != *item.last_used.read().unwrap() { - // this item was used more recently than our list indicates, so skip it - continue; - } - // item is as old as we thought, so fall through and delete it - } - None => { - try_remove = false; + fn purge_lru_list(&self, lru: &[LruEntry], lru_index: &mut usize) -> bool { + let mut freed_bytes = 0; + let start = *lru_index; + let mut done = false; + let current_size = self.data_size.load(Ordering::Relaxed); + for (timestamp, key) in lru.iter().skip(start) { + if current_size.saturating_sub(freed_bytes) <= self.max_data_size { + done = true; + break; + } + *lru_index += 1; + match self.cache.entry(*key) { + Entry::Vacant(_entry) => (), + Entry::Occupied(entry) => { + if *timestamp == *entry.get().last_used.read().unwrap() { + let size = self.account_size(&entry.get().account); + freed_bytes += size; + entry.remove(); } } } - - if try_remove { - self.cache.remove(&key); - } - current_size = current_size.saturating_sub(*size); // we don't subtract on remove, so subtract now - if current_size <= self.max_data_size { - break; - } } - lru.drain(0..processed); - current_size + if freed_bytes > 0 { + // if this overflows, we'll have a really big data size, so we'll clean everything, scan all, and reset the size. Not ideal, but not terrible. + self.data_size.fetch_sub(freed_bytes, Ordering::Relaxed); + } + done } fn calculate_lru_list(&self, lru: &mut Vec) -> usize { - // purge in lru order + lru.clear(); + lru.reserve(self.cache.len()); let mut new_size = 0; for item in self.cache.iter() { let value = item.value(); - let item_len = value.account.data().len() + self.per_account_size; + let item_len = self.account_size(&value.account); new_size += item_len; - lru.push((*value.last_used.read().unwrap(), item_len, *item.key())); + lru.push((*value.last_used.read().unwrap(), *item.key())); } new_size } - fn maybe_purge_lru_items(&self, new_item_len: usize) -> usize { - let mut new_size = *self.data_size.read().unwrap() + new_item_len; - if new_size <= self.max_data_size { - return new_size; - } + fn bg_purge_lru_items(&self, once: bool) { + let mut lru = Vec::new(); + let mut lru_index = 0; + let mut stop = false; + loop { + if !once { + sleep(Duration::from_millis(200)); + } else { + if stop { + break; + } + stop = true; + } - // purge from the lru list we last made - let mut list = self.lru.write().unwrap(); - new_size = self.purge_lru_list(&mut list, true, new_size); - if new_size <= self.max_data_size { - return new_size; - } + if self.stop.load(Ordering::Relaxed) { + break; + } - // we didn't get enough, so calculate a new list and keep purging - new_size = self.calculate_lru_list(&mut list); - if new_size > self.max_data_size { - list.sort(); - new_size = self.purge_lru_list(&mut list, false, new_size); - // the list is stored in self so we use it to purge next time + // purge from the lru list we last made + if self.purge_lru_list(&lru, &mut lru_index) { + continue; + } + + // we didn't get enough, so calculate a new list and keep purging + let new_size = self.calculate_lru_list(&mut lru); + lru_index = 0; + self.data_size.store(new_size, Ordering::Relaxed); + lru.sort(); + self.purge_lru_list(&lru, &mut lru_index); } - new_size } pub fn cache_len(&self) -> usize { @@ -171,7 +227,7 @@ impl ReadOnlyAccountsCache { } pub fn data_size(&self) -> usize { - *self.data_size.read().unwrap() + self.data_size.load(Ordering::Relaxed) } pub fn get_and_reset_stats(&self) -> (u64, u64) { @@ -192,13 +248,22 @@ pub mod tests { assert!(std::mem::size_of::>() == std::mem::size_of::>()); } + #[test] + fn test_read_only_accounts_cache_drop() { + solana_logger::setup(); + let cache = ReadOnlyAccountsCache::new_test(100); + let stop = cache.stop.clone(); + drop(cache); + assert!(stop.load(Ordering::Relaxed)); + } + #[test] fn test_read_only_accounts_cache() { solana_logger::setup(); let per_account_size = ReadOnlyAccountsCache::per_account_size(); let data_size = 100; let max = data_size + per_account_size; - let cache = ReadOnlyAccountsCache::new(max); + let cache = ReadOnlyAccountsCache::new_test(max); let slot = 0; assert!(cache.load(&Pubkey::default(), slot).is_none()); assert_eq!(0, cache.cache_len()); @@ -220,6 +285,7 @@ pub mod tests { assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); assert_eq!(1, cache.cache_len()); cache.store(&key2, slot, &account2); + cache.bg_purge_lru_items(true); assert_eq!(100 + per_account_size, cache.data_size()); assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2)); assert_eq!(1, cache.cache_len()); @@ -233,7 +299,7 @@ pub mod tests { // can store 2 items, 3rd item kicks oldest item out let max = (data_size + per_account_size) * 2; - let cache = ReadOnlyAccountsCache::new(max); + let cache = ReadOnlyAccountsCache::new_test(max); cache.store(&key1, slot, &account1); assert_eq!(100 + per_account_size, cache.data_size()); assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); @@ -249,6 +315,7 @@ pub mod tests { assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); assert_eq!(2, cache.cache_len()); cache.store(&key3, slot, &account3); + cache.bg_purge_lru_items(true); assert_eq!(max, cache.data_size()); assert!(cache.load(&key1, slot).is_none()); // was lru purged assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));