read only account cache for executable accounts - improve replay (#16150)

* read only account cache

* tests

* clippy

* cleanup

* new file, add tests

* remove copy/paste code from test

* remove dead code

* all loads use cache

* remove stale comments

* add metrics logging for read only cache size

* report read only cache hits and misses

* consistency

* formatting

* rename, add comment

* u64

* better interaction with existing cache

* lru list saved between cleans
This commit is contained in:
Jeff Washington (jwash) 2021-04-01 07:16:34 -05:00 committed by GitHub
parent badf224460
commit 3996b699dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 360 additions and 2 deletions

View File

@ -27,6 +27,7 @@ use crate::{
},
append_vec::{AppendVec, StoredAccountMeta, StoredMeta},
contains::Contains,
read_only_accounts_cache::ReadOnlyAccountsCache,
};
use blake3::traits::digest::Digest;
use crossbeam_channel::{unbounded, Receiver, Sender};
@ -287,6 +288,13 @@ impl<'a> LoadedAccount<'a> {
},
}
}
pub fn is_cached(&self) -> bool {
match self {
LoadedAccount::Stored(_) => false,
LoadedAccount::Cached(_) => true,
}
}
}
#[derive(Clone, Default, Debug)]
@ -694,6 +702,7 @@ pub struct AccountsDb {
pub accounts_cache: AccountsCache,
sender_bg_hasher: Option<Sender<CachedAccount>>,
pub read_only_accounts_cache: ReadOnlyAccountsCache,
recycle_stores: RwLock<RecycleStores>,
@ -1065,6 +1074,7 @@ impl solana_frozen_abi::abi_example::AbiExample for AccountsDb {
impl Default for AccountsDb {
fn default() -> Self {
let num_threads = get_thread_count();
const MAX_READ_ONLY_CACHE_DATA_SIZE: usize = 200_000_000;
let mut bank_hashes = HashMap::new();
bank_hashes.insert(0, BankHashInfo::default());
@ -1073,6 +1083,7 @@ impl Default for AccountsDb {
storage: AccountStorage::default(),
accounts_cache: AccountsCache::default(),
sender_bg_hasher: None,
read_only_accounts_cache: ReadOnlyAccountsCache::new(MAX_READ_ONLY_CACHE_DATA_SIZE),
recycle_stores: RwLock::new(RecycleStores::default()),
uncleaned_pubkeys: DashMap::new(),
next_id: AtomicUsize::new(0),
@ -2271,10 +2282,46 @@ impl AccountsDb {
// `lock` released here
};
if self.caching_enabled && store_id != CACHE_VIRTUAL_STORAGE_ID {
let result = self.read_only_accounts_cache.load(pubkey, slot);
if let Some(account) = result {
return Some((account, slot));
}
}
//TODO: thread this as a ref
self.get_account_accessor_from_cache_or_storage(slot, pubkey, store_id, offset)
let mut is_cached = false;
let loaded_account = self
.get_account_accessor_from_cache_or_storage(slot, pubkey, store_id, offset)
.get_loaded_account()
.map(|loaded_account| (loaded_account.account(), slot))
.map(|loaded_account| {
is_cached = loaded_account.is_cached();
(loaded_account.account(), slot)
});
if self.caching_enabled && !is_cached {
match loaded_account {
Some((account, slot)) => {
/*
We show this store into the read-only cache for account 'A' and future loads of 'A' from the read-only cache are
safe/reflect 'A''s latest state on this fork.
This safety holds if during replay of slot 'S', we show we only read 'A' from the write cache,
not the read-only cache, after it's been updated in replay of slot 'S'.
Assume for contradiction this is not true, and we read 'A' from the read-only cache *after* it had been updated in 'S'.
This means an entry '(S, A)' was added to the read-only cache after 'A' had been updated in 'S'.
Now when '(S, A)' was being added to the read-only cache, it must have been true that 'is_cache == false',
which means '(S', A)' does not exist in the write cache yet.
However, by the assumption for contradiction above , 'A' has already been updated in 'S' which means '(S, A)'
must exist in the write cache, which is a contradiction.
*/
self.read_only_accounts_cache.store(pubkey, slot, &account);
Some((account, slot))
}
_ => None,
}
} else {
loaded_account
}
}
pub fn load_account_hash(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Hash {
@ -3439,6 +3486,7 @@ impl AccountsDb {
let accounts_and_meta_to_store: Vec<(StoredMeta, &AccountSharedData)> = accounts
.iter()
.map(|(pubkey, account)| {
self.read_only_accounts_cache.remove(pubkey, slot);
let account = if account.lamports == 0 {
&default_account
} else {
@ -4288,6 +4336,8 @@ impl AccountsDb {
Ordering::Relaxed,
) == Ok(last)
{
let (read_only_cache_hits, read_only_cache_misses) =
self.read_only_accounts_cache.get_and_reset_stats();
datapoint_info!(
"accounts_db_store_timings",
(
@ -4330,6 +4380,22 @@ impl AccountsDb {
self.stats.store_total_data.swap(0, Ordering::Relaxed),
i64
),
(
"read_only_accounts_cache_entries",
self.read_only_accounts_cache.cache_len(),
i64
),
(
"read_only_accounts_cache_data_size",
self.read_only_accounts_cache.data_size(),
i64
),
("read_only_accounts_cache_hits", read_only_cache_hits, i64),
(
"read_only_accounts_cache_misses",
read_only_cache_misses,
i64
),
);
let recycle_stores = self.recycle_stores.read().unwrap();
@ -8521,6 +8587,53 @@ pub mod tests {
.unwrap_or_default()
}
#[test]
fn test_read_only_accounts_cache() {
let caching_enabled = true;
let db = Arc::new(AccountsDb::new_with_config(
Vec::new(),
&ClusterType::Development,
HashSet::new(),
caching_enabled,
));
let account_key = Pubkey::new_unique();
let zero_lamport_account =
AccountSharedData::new(0, 0, &AccountSharedData::default().owner);
let slot1_account = AccountSharedData::new(1, 1, &AccountSharedData::default().owner);
db.store_cached(0, &[(&account_key, &zero_lamport_account)]);
db.store_cached(1, &[(&account_key, &slot1_account)]);
db.add_root(0);
db.add_root(1);
db.clean_accounts(None);
db.flush_accounts_cache(true, None);
db.clean_accounts(None);
db.add_root(2);
assert_eq!(db.read_only_accounts_cache.cache_len(), 0);
let account = db
.load(&Ancestors::default(), &account_key)
.map(|(account, _)| account)
.unwrap();
assert_eq!(account.lamports, 1);
assert_eq!(db.read_only_accounts_cache.cache_len(), 1);
let account = db
.load(&Ancestors::default(), &account_key)
.map(|(account, _)| account)
.unwrap();
assert_eq!(account.lamports, 1);
assert_eq!(db.read_only_accounts_cache.cache_len(), 1);
db.store_cached(2, &[(&account_key, &zero_lamport_account)]);
assert_eq!(db.read_only_accounts_cache.cache_len(), 1);
let account = db
.load(&Ancestors::default(), &account_key)
.map(|(account, _)| account)
.unwrap();
assert_eq!(account.lamports, 0);
assert_eq!(db.read_only_accounts_cache.cache_len(), 1);
}
#[test]
fn test_flush_cache_clean() {
let caching_enabled = true;
@ -8547,6 +8660,8 @@ pub mod tests {
.do_load(&Ancestors::default(), &account_key, Some(0))
.unwrap();
assert_eq!(account.0.lamports, 0);
// since this item is in the cache, it should not be in the read only cache
assert_eq!(db.read_only_accounts_cache.cache_len(), 0);
// Flush, then clean again. Should not need another root to initiate the cleaning
// because `accounts_index.uncleaned_roots` should be correct

View File

@ -25,6 +25,7 @@ pub mod loader_utils;
pub mod log_collector;
pub mod message_processor;
mod native_loader;
mod read_only_accounts_cache;
pub mod rent_collector;
pub mod secondary_index;
pub mod serde_snapshot;

View File

@ -0,0 +1,242 @@
//! ReadOnlyAccountsCache used to store accounts, such as executable accounts,
//! which can be large, loaded many times, and rarely change.
use dashmap::DashMap;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::Instant,
};
use solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Slot,
pubkey::Pubkey,
};
type ReadOnlyCacheKey = (Pubkey, Slot);
type LruEntry = (Instant, usize, ReadOnlyCacheKey);
type LruList = Arc<RwLock<Vec<LruEntry>>>;
#[derive(Debug)]
pub struct ReadOnlyAccountCacheEntry {
pub account: AccountSharedData,
pub last_used: Arc<RwLock<Instant>>,
}
#[derive(Debug)]
pub struct ReadOnlyAccountsCache {
cache: DashMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>,
max_data_size: usize,
data_size: Arc<RwLock<usize>>,
hits: AtomicU64,
misses: AtomicU64,
lru: LruList,
}
impl ReadOnlyAccountsCache {
pub fn new(max_data_size: usize) -> Self {
Self {
max_data_size,
cache: DashMap::default(),
data_size: Arc::new(RwLock::new(0)),
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
lru: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn load(&self, pubkey: &Pubkey, slot: Slot) -> Option<AccountSharedData> {
self.cache
.get(&(*pubkey, slot))
.map(|account_ref| {
self.hits.fetch_add(1, Ordering::Relaxed);
let value = account_ref.value();
// remember last use
let now = Instant::now();
*value.last_used.write().unwrap() = now;
value.account.clone()
})
.or_else(|| {
self.misses.fetch_add(1, Ordering::Relaxed);
None
})
}
pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) {
let len = account.data().len();
self.cache.insert(
(*pubkey, slot),
ReadOnlyAccountCacheEntry {
account: account.clone(),
last_used: Arc::new(RwLock::new(Instant::now())),
},
);
// maybe purge after we insert. Insert may have replaced.
let new_size = self.maybe_purge_lru_items(len);
*self.data_size.write().unwrap() = new_size;
}
pub fn remove(&self, pubkey: &Pubkey, slot: Slot) {
// does not keep track of data size reduction here.
// data size will be recomputed the next time we store and we think we may now be too large.
self.cache.remove(&(*pubkey, slot));
}
fn purge_lru_list(
&self,
lru: &mut Vec<LruEntry>,
verify_timestamp: bool,
mut current_size: usize,
) -> usize {
let mut processed = 0;
for lru_item in lru.iter() {
let (timestamp, size, key) = lru_item;
processed += 1;
let mut try_remove = true;
if verify_timestamp {
let item = self.cache.get(key);
match item {
Some(item) => {
if *timestamp != *item.last_used.read().unwrap() {
// this item was used more recently than our list indicates, so skip it
continue;
}
// item is as old as we thought, so fall through and delete it
}
None => {
try_remove = false;
}
}
}
if try_remove {
self.cache.remove(&key);
}
current_size = current_size.saturating_sub(*size); // we don't subtract on remove, so subtract now
if current_size <= self.max_data_size {
break;
}
}
lru.drain(0..processed);
current_size
}
fn calculate_lru_list(&self, lru: &mut Vec<LruEntry>) -> usize {
// purge in lru order
let mut new_size = 0;
for item in self.cache.iter() {
let value = item.value();
let item_len = value.account.data().len();
new_size += item_len;
lru.push((*value.last_used.read().unwrap(), item_len, *item.key()));
}
new_size
}
fn maybe_purge_lru_items(&self, new_item_len: usize) -> usize {
let mut new_size = *self.data_size.read().unwrap() + new_item_len;
if new_size <= self.max_data_size {
return new_size;
}
// purge from the lru list we last made
let mut list = self.lru.write().unwrap();
new_size = self.purge_lru_list(&mut list, true, new_size);
if new_size <= self.max_data_size {
return new_size;
}
// we didn't get enough, so calculate a new list and keep purging
new_size = self.calculate_lru_list(&mut list);
if new_size > self.max_data_size {
list.sort();
new_size = self.purge_lru_list(&mut list, false, new_size);
// the list is stored in self so we use it to purge next time
}
new_size
}
pub fn cache_len(&self) -> usize {
self.cache.len()
}
pub fn data_size(&self) -> usize {
*self.data_size.read().unwrap()
}
pub fn get_and_reset_stats(&self) -> (u64, u64) {
let hits = self.hits.swap(0, Ordering::Relaxed);
let misses = self.misses.swap(0, Ordering::Relaxed);
(hits, misses)
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use solana_sdk::account::{accounts_equal, Account};
#[test]
fn test_read_only_accounts_cache() {
solana_logger::setup();
let max = 100;
let cache = ReadOnlyAccountsCache::new(max);
let slot = 0;
assert!(cache.load(&Pubkey::default(), slot).is_none());
assert_eq!(0, cache.cache_len());
assert_eq!(0, cache.data_size());
cache.remove(&Pubkey::default(), slot); // assert no panic
let key1 = Pubkey::new_unique();
let key2 = Pubkey::new_unique();
let key3 = Pubkey::new_unique();
let account1 = AccountSharedData::from(Account {
data: vec![0; max],
..Account::default()
});
let mut account2 = account1.clone();
account2.lamports += 1; // so they compare differently
let mut account3 = account1.clone();
account3.lamports += 4; // so they compare differently
cache.store(&key1, slot, &account1);
assert_eq!(100, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account2);
assert_eq!(100, cache.data_size());
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2));
assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account1); // overwrite key2 with account1
assert_eq!(100, cache.data_size());
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len());
cache.remove(&key2, slot);
assert_eq!(100, cache.data_size());
assert_eq!(0, cache.cache_len());
// can store 2 items, 3rd item kicks oldest item out
let max = 200;
let cache = ReadOnlyAccountsCache::new(max);
cache.store(&key1, slot, &account1);
assert_eq!(100, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account2);
assert_eq!(200, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2));
assert_eq!(2, cache.cache_len());
cache.store(&key2, slot, &account1); // overwrite key2 with account1
assert_eq!(200, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert_eq!(2, cache.cache_len());
cache.store(&key3, slot, &account3);
assert_eq!(200, cache.data_size());
assert!(cache.load(&key1, slot).is_none()); // was lru purged
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(&key3, slot).unwrap(), &account3));
assert_eq!(2, cache.cache_len());
}
}