clean read_only_cache in the bg (#16722)

* clean read_only_cache in the bg

* reset lru_index

* cleanup

* cleanup

* drop for read only cache

* join bg thread

* reuse existing lru vec

* simplify reserve logic
This commit is contained in:
Jeff Washington (jwash) 2021-04-22 13:59:58 -05:00 committed by GitHub
parent 333998d008
commit 739f0698ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 148 additions and 81 deletions

View File

@ -1,23 +1,24 @@
//! ReadOnlyAccountsCache used to store accounts, such as executable accounts, //! ReadOnlyAccountsCache used to store accounts, such as executable accounts,
//! which can be large, loaded many times, and rarely change. //! which can be large, loaded many times, and rarely change.
use dashmap::DashMap; use dashmap::{mapref::entry::Entry, DashMap};
use std::{ //use mapref::entry::{Entry, OccupiedEntry, VacantEntry};
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::Instant,
};
use solana_sdk::{ use solana_sdk::{
account::{AccountSharedData, ReadableAccount}, account::{AccountSharedData, ReadableAccount},
clock::Slot, clock::Slot,
pubkey::Pubkey, pubkey::Pubkey,
}; };
use std::thread::{sleep, Builder, JoinHandle};
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
time::Instant,
};
type ReadOnlyCacheKey = (Pubkey, Slot); type ReadOnlyCacheKey = (Pubkey, Slot);
type LruEntry = (Instant, usize, ReadOnlyCacheKey); type LruEntry = (Instant, ReadOnlyCacheKey);
type LruList = Arc<RwLock<Vec<LruEntry>>>;
#[derive(Debug)] #[derive(Debug)]
pub struct ReadOnlyAccountCacheEntry { pub struct ReadOnlyAccountCacheEntry {
@ -27,25 +28,62 @@ pub struct ReadOnlyAccountCacheEntry {
#[derive(Debug)] #[derive(Debug)]
pub struct ReadOnlyAccountsCache { pub struct ReadOnlyAccountsCache {
cache: DashMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>, cache: Arc<DashMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>>,
max_data_size: usize, max_data_size: usize,
data_size: Arc<RwLock<usize>>, data_size: Arc<AtomicUsize>,
hits: AtomicU64, hits: AtomicU64,
misses: AtomicU64, misses: AtomicU64,
lru: LruList,
per_account_size: usize, per_account_size: usize,
stop: Arc<AtomicBool>,
background: Option<JoinHandle<()>>,
}
impl Drop for ReadOnlyAccountsCache {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(background) = self.background.take() {
background.join().unwrap();
}
}
} }
impl ReadOnlyAccountsCache { impl ReadOnlyAccountsCache {
pub fn new(max_data_size: usize) -> Self { pub fn new(max_data_size: usize) -> Self {
Self { let mut result = Self::new_test(max_data_size);
let bg = Self {
max_data_size, max_data_size,
cache: DashMap::default(), cache: result.cache.clone(),
data_size: Arc::new(RwLock::new(0)), data_size: result.data_size.clone(),
hits: AtomicU64::new(0), hits: AtomicU64::new(0),
misses: AtomicU64::new(0), misses: AtomicU64::new(0),
lru: Arc::new(RwLock::new(Vec::new())),
per_account_size: Self::per_account_size(), per_account_size: Self::per_account_size(),
stop: result.stop.clone(),
background: None,
};
result.background = Some(
Builder::new()
.name("solana-readonly-accounts-cache".to_string())
.spawn(move || {
bg.bg_purge_lru_items(false);
})
.unwrap(),
);
result
}
fn new_test(max_data_size: usize) -> Self {
Self {
max_data_size,
cache: Arc::new(DashMap::default()),
data_size: Arc::new(AtomicUsize::new(0)),
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
per_account_size: Self::per_account_size(),
stop: Arc::new(AtomicBool::new(false)),
background: None,
} }
} }
@ -71,19 +109,37 @@ impl ReadOnlyAccountsCache {
}) })
} }
fn account_size(&self, account: &AccountSharedData) -> usize {
account.data().len() + self.per_account_size
}
pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) { pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) {
let len = account.data().len() + self.per_account_size; let len = self.account_size(account);
self.cache.insert( let previous_len = if let Some(previous) = self.cache.insert(
(*pubkey, slot), (*pubkey, slot),
ReadOnlyAccountCacheEntry { ReadOnlyAccountCacheEntry {
account: account.clone(), account: account.clone(),
last_used: Arc::new(RwLock::new(Instant::now())), last_used: Arc::new(RwLock::new(Instant::now())),
}, },
); ) {
self.account_size(&previous.account)
} else {
0
};
// maybe purge after we insert. Insert may have replaced. match len.cmp(&previous_len) {
let new_size = self.maybe_purge_lru_items(len); std::cmp::Ordering::Greater => {
*self.data_size.write().unwrap() = new_size; self.data_size
.fetch_add(len - previous_len, Ordering::Relaxed);
}
std::cmp::Ordering::Less => {
self.data_size
.fetch_sub(previous_len - len, Ordering::Relaxed);
}
std::cmp::Ordering::Equal => {
// no change in size
}
};
} }
pub fn remove(&self, pubkey: &Pubkey, slot: Slot) { pub fn remove(&self, pubkey: &Pubkey, slot: Slot) {
@ -92,78 +148,78 @@ impl ReadOnlyAccountsCache {
self.cache.remove(&(*pubkey, slot)); self.cache.remove(&(*pubkey, slot));
} }
fn purge_lru_list( fn purge_lru_list(&self, lru: &[LruEntry], lru_index: &mut usize) -> bool {
&self, let mut freed_bytes = 0;
lru: &mut Vec<LruEntry>, let start = *lru_index;
verify_timestamp: bool, let mut done = false;
mut current_size: usize, let current_size = self.data_size.load(Ordering::Relaxed);
) -> usize { for (timestamp, key) in lru.iter().skip(start) {
let mut processed = 0; if current_size.saturating_sub(freed_bytes) <= self.max_data_size {
for lru_item in lru.iter() { done = true;
let (timestamp, size, key) = lru_item;
processed += 1;
let mut try_remove = true;
if verify_timestamp {
let item = self.cache.get(key);
match item {
Some(item) => {
if *timestamp != *item.last_used.read().unwrap() {
// this item was used more recently than our list indicates, so skip it
continue;
}
// item is as old as we thought, so fall through and delete it
}
None => {
try_remove = false;
}
}
}
if try_remove {
self.cache.remove(&key);
}
current_size = current_size.saturating_sub(*size); // we don't subtract on remove, so subtract now
if current_size <= self.max_data_size {
break; break;
} }
*lru_index += 1;
match self.cache.entry(*key) {
Entry::Vacant(_entry) => (),
Entry::Occupied(entry) => {
if *timestamp == *entry.get().last_used.read().unwrap() {
let size = self.account_size(&entry.get().account);
freed_bytes += size;
entry.remove();
} }
lru.drain(0..processed); }
current_size }
}
if freed_bytes > 0 {
// if this overflows, we'll have a really big data size, so we'll clean everything, scan all, and reset the size. Not ideal, but not terrible.
self.data_size.fetch_sub(freed_bytes, Ordering::Relaxed);
}
done
} }
fn calculate_lru_list(&self, lru: &mut Vec<LruEntry>) -> usize { fn calculate_lru_list(&self, lru: &mut Vec<LruEntry>) -> usize {
// purge in lru order lru.clear();
lru.reserve(self.cache.len());
let mut new_size = 0; let mut new_size = 0;
for item in self.cache.iter() { for item in self.cache.iter() {
let value = item.value(); let value = item.value();
let item_len = value.account.data().len() + self.per_account_size; let item_len = self.account_size(&value.account);
new_size += item_len; new_size += item_len;
lru.push((*value.last_used.read().unwrap(), item_len, *item.key())); lru.push((*value.last_used.read().unwrap(), *item.key()));
} }
new_size new_size
} }
fn maybe_purge_lru_items(&self, new_item_len: usize) -> usize { fn bg_purge_lru_items(&self, once: bool) {
let mut new_size = *self.data_size.read().unwrap() + new_item_len; let mut lru = Vec::new();
if new_size <= self.max_data_size { let mut lru_index = 0;
return new_size; let mut stop = false;
loop {
if !once {
sleep(Duration::from_millis(200));
} else {
if stop {
break;
}
stop = true;
}
if self.stop.load(Ordering::Relaxed) {
break;
} }
// purge from the lru list we last made // purge from the lru list we last made
let mut list = self.lru.write().unwrap(); if self.purge_lru_list(&lru, &mut lru_index) {
new_size = self.purge_lru_list(&mut list, true, new_size); continue;
if new_size <= self.max_data_size {
return new_size;
} }
// we didn't get enough, so calculate a new list and keep purging // we didn't get enough, so calculate a new list and keep purging
new_size = self.calculate_lru_list(&mut list); let new_size = self.calculate_lru_list(&mut lru);
if new_size > self.max_data_size { lru_index = 0;
list.sort(); self.data_size.store(new_size, Ordering::Relaxed);
new_size = self.purge_lru_list(&mut list, false, new_size); lru.sort();
// the list is stored in self so we use it to purge next time self.purge_lru_list(&lru, &mut lru_index);
} }
new_size
} }
pub fn cache_len(&self) -> usize { pub fn cache_len(&self) -> usize {
@ -171,7 +227,7 @@ impl ReadOnlyAccountsCache {
} }
pub fn data_size(&self) -> usize { pub fn data_size(&self) -> usize {
*self.data_size.read().unwrap() self.data_size.load(Ordering::Relaxed)
} }
pub fn get_and_reset_stats(&self) -> (u64, u64) { pub fn get_and_reset_stats(&self) -> (u64, u64) {
@ -192,13 +248,22 @@ pub mod tests {
assert!(std::mem::size_of::<Arc<u64>>() == std::mem::size_of::<Arc<[u8; 32]>>()); assert!(std::mem::size_of::<Arc<u64>>() == std::mem::size_of::<Arc<[u8; 32]>>());
} }
#[test]
fn test_read_only_accounts_cache_drop() {
solana_logger::setup();
let cache = ReadOnlyAccountsCache::new_test(100);
let stop = cache.stop.clone();
drop(cache);
assert!(stop.load(Ordering::Relaxed));
}
#[test] #[test]
fn test_read_only_accounts_cache() { fn test_read_only_accounts_cache() {
solana_logger::setup(); solana_logger::setup();
let per_account_size = ReadOnlyAccountsCache::per_account_size(); let per_account_size = ReadOnlyAccountsCache::per_account_size();
let data_size = 100; let data_size = 100;
let max = data_size + per_account_size; let max = data_size + per_account_size;
let cache = ReadOnlyAccountsCache::new(max); let cache = ReadOnlyAccountsCache::new_test(max);
let slot = 0; let slot = 0;
assert!(cache.load(&Pubkey::default(), slot).is_none()); assert!(cache.load(&Pubkey::default(), slot).is_none());
assert_eq!(0, cache.cache_len()); assert_eq!(0, cache.cache_len());
@ -220,6 +285,7 @@ pub mod tests {
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len()); assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account2); cache.store(&key2, slot, &account2);
cache.bg_purge_lru_items(true);
assert_eq!(100 + per_account_size, cache.data_size()); assert_eq!(100 + per_account_size, cache.data_size());
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2)); assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2));
assert_eq!(1, cache.cache_len()); assert_eq!(1, cache.cache_len());
@ -233,7 +299,7 @@ pub mod tests {
// can store 2 items, 3rd item kicks oldest item out // can store 2 items, 3rd item kicks oldest item out
let max = (data_size + per_account_size) * 2; let max = (data_size + per_account_size) * 2;
let cache = ReadOnlyAccountsCache::new(max); let cache = ReadOnlyAccountsCache::new_test(max);
cache.store(&key1, slot, &account1); cache.store(&key1, slot, &account1);
assert_eq!(100 + per_account_size, cache.data_size()); assert_eq!(100 + per_account_size, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1)); assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
@ -249,6 +315,7 @@ pub mod tests {
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert_eq!(2, cache.cache_len()); assert_eq!(2, cache.cache_len());
cache.store(&key3, slot, &account3); cache.store(&key3, slot, &account3);
cache.bg_purge_lru_items(true);
assert_eq!(max, cache.data_size()); assert_eq!(max, cache.data_size());
assert!(cache.load(&key1, slot).is_none()); // was lru purged assert!(cache.load(&key1, slot).is_none()); // was lru purged
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1)); assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));