optimizes ReadOnlyAccountsCache LRU eviction implementation (#22403)

ReadOnlyAccountsCache is using a background thread, table scan and sort
to implement LRU eviction policy:
https://github.com/solana-labs/solana/blob/eaa52bc93/runtime/src/read_only_accounts_cache.rs#L66-L73
https://github.com/solana-labs/solana/blob/eaa52bc93/runtime/src/read_only_accounts_cache.rs#L186-L191
https://github.com/solana-labs/solana/blob/eaa52bc93/runtime/src/read_only_accounts_cache.rs#L222

DashMap internally locks each shard when accessed; so a table scan in
the background thread can create a lot of lock contention.

This commit adds an index-list queue containing cached keys in the order
that they are accessed. Each hash-map entry also includes its index into
this queue.
When an item is first entered into the cache, it is added to the end of
the queue. Also each time an entry is looked up from the cache it is
moved to the end of queue. As a result, items in the queue are always
sorted in the order that they have last been accessed. When doing LRU
eviction, cache entries are evicted from the front of the queue.
Using index-list, all queue operations above are O(1) with low overhead
and so above achieves an efficient implementation of LRU cache eviction
policy.
This commit is contained in:
behzad nouri 2022-01-11 17:25:28 +00:00 committed by GitHub
parent 8b66625c95
commit a49ef49f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 196 additions and 227 deletions

8
Cargo.lock generated
View File

@ -2065,6 +2065,12 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9007da9cacbd3e6343da136e98b0d2df013f553d35bdec8b518f07bea768e19c"
[[package]]
name = "index_list"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9d968042a4902e08810946fc7cd5851eb75e80301342305af755ca06cb82ce"
[[package]]
name = "indexmap"
version = "1.8.0"
@ -5702,6 +5708,7 @@ dependencies = [
"ed25519-dalek",
"flate2",
"fnv",
"index_list",
"itertools 0.10.3",
"lazy_static",
"libsecp256k1 0.6.0",
@ -5712,6 +5719,7 @@ dependencies = [
"num_cpus",
"ouroboros",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
"regex",
"rustc_version 0.4.0",

View File

@ -1363,6 +1363,12 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "index_list"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a9d968042a4902e08810946fc7cd5851eb75e80301342305af755ca06cb82ce"
[[package]]
name = "indexmap"
version = "1.8.0"
@ -3447,6 +3453,7 @@ dependencies = [
"dir-diff",
"flate2",
"fnv",
"index_list",
"itertools 0.10.3",
"lazy_static",
"log",

View File

@ -21,6 +21,7 @@ crossbeam-channel = "0.5"
dir-diff = "0.3.2"
flate2 = "1.0.22"
fnv = "1.0.7"
index_list = "0.2.7"
itertools = "0.10.3"
lazy_static = "1.4.0"
log = "0.4.14"
@ -60,9 +61,10 @@ crate-type = ["lib"]
name = "solana_runtime"
[dev-dependencies]
assert_matches = "1.5.0"
ed25519-dalek = "=1.0.1"
libsecp256k1 = "0.6.0"
assert_matches = "1.5.0"
rand_chacha = "0.2.2"
solana-logger = { path = "../logger", version = "=1.10.0" }
[package.metadata.docs.rs]

View File

@ -963,7 +963,7 @@ pub struct AccountsDb {
write_cache_limit_bytes: Option<u64>,
sender_bg_hasher: Option<Sender<CachedAccount>>,
pub read_only_accounts_cache: ReadOnlyAccountsCache,
read_only_accounts_cache: ReadOnlyAccountsCache,
recycle_stores: RwLock<RecycleStores>,
@ -3649,7 +3649,7 @@ impl AccountsDb {
// Notice the subtle `?` at previous line, we bail out pretty early if missing.
if self.caching_enabled && !storage_location.is_cached() {
let result = self.read_only_accounts_cache.load(pubkey, slot);
let result = self.read_only_accounts_cache.load(*pubkey, slot);
if let Some(account) = result {
return Some((account, slot));
}
@ -3680,7 +3680,8 @@ impl AccountsDb {
However, by the assumption for contradiction above , 'A' has already been updated in 'S' which means '(S, A)'
must exist in the write cache, which is a contradiction.
*/
self.read_only_accounts_cache.store(pubkey, slot, &account);
self.read_only_accounts_cache
.store(*pubkey, slot, account.clone());
}
Some((account, slot))
}
@ -4928,7 +4929,7 @@ impl AccountsDb {
let accounts_and_meta_to_store: Vec<_> = accounts
.iter()
.map(|(pubkey, account)| {
self.read_only_accounts_cache.remove(pubkey, slot);
self.read_only_accounts_cache.remove(**pubkey, slot);
// this is the source of Some(Account) or None.
// Some(Account) = store 'Account'
// None = store a default/empty account with 0 lamports

View File

@ -1,238 +1,136 @@
//! ReadOnlyAccountsCache used to store accounts, such as executable accounts,
//! which can be large, loaded many times, and rarely change.
//use mapref::entry::{Entry, OccupiedEntry, VacantEntry};
use {
dashmap::{mapref::entry::Entry, DashMap},
index_list::{Index, IndexList},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Slot,
pubkey::Pubkey,
},
std::{
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
std::sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Mutex,
},
};
const CACHE_ENTRY_SIZE: usize =
std::mem::size_of::<ReadOnlyAccountCacheEntry>() + 2 * std::mem::size_of::<ReadOnlyCacheKey>();
type ReadOnlyCacheKey = (Pubkey, Slot);
type LruEntry = (Instant, ReadOnlyCacheKey);
#[derive(Debug)]
pub struct ReadOnlyAccountCacheEntry {
pub account: AccountSharedData,
pub last_used: Arc<RwLock<Instant>>,
struct ReadOnlyAccountCacheEntry {
account: AccountSharedData,
index: Index, // Index of the entry in the eviction queue.
}
#[derive(Debug)]
pub struct ReadOnlyAccountsCache {
cache: Arc<DashMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>>,
pub(crate) struct ReadOnlyAccountsCache {
cache: DashMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>,
// When an item is first entered into the cache, it is added to the end of
// the queue. Also each time an entry is looked up from the cache it is
// moved to the end of the queue. As a result, items in the queue are
// always sorted in the order that they have last been accessed. When doing
// LRU eviction, cache entries are evicted from the front of the queue.
queue: Mutex<IndexList<ReadOnlyCacheKey>>,
max_data_size: usize,
data_size: Arc<AtomicUsize>,
data_size: AtomicUsize,
hits: AtomicU64,
misses: AtomicU64,
per_account_size: usize,
stop: Arc<AtomicBool>,
background: Option<JoinHandle<()>>,
}
impl Drop for ReadOnlyAccountsCache {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(background) = self.background.take() {
background.join().unwrap();
}
}
}
impl ReadOnlyAccountsCache {
pub fn new(max_data_size: usize) -> Self {
let mut result = Self::new_test(max_data_size);
let bg = Self {
max_data_size,
cache: result.cache.clone(),
data_size: result.data_size.clone(),
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
per_account_size: Self::per_account_size(),
stop: result.stop.clone(),
background: None,
};
result.background = Some(
Builder::new()
.name("solana-readonly-accounts-cache".to_string())
.spawn(move || {
bg.bg_purge_lru_items(false);
})
.unwrap(),
);
result
}
fn new_test(max_data_size: usize) -> Self {
pub(crate) fn new(max_data_size: usize) -> Self {
Self {
max_data_size,
cache: Arc::new(DashMap::default()),
data_size: Arc::new(AtomicUsize::new(0)),
hits: AtomicU64::new(0),
misses: AtomicU64::new(0),
per_account_size: Self::per_account_size(),
stop: Arc::new(AtomicBool::new(false)),
background: None,
cache: DashMap::default(),
queue: Mutex::<IndexList<ReadOnlyCacheKey>>::default(),
data_size: AtomicUsize::default(),
hits: AtomicU64::default(),
misses: AtomicU64::default(),
}
}
fn per_account_size() -> usize {
// size_of(arc(x)) does not return the size of x, so we have to add the size of RwLock...
std::mem::size_of::<ReadOnlyAccountCacheEntry>() + std::mem::size_of::<RwLock<Instant>>()
}
pub fn load(&self, pubkey: &Pubkey, slot: Slot) -> Option<AccountSharedData> {
self.cache
.get(&(*pubkey, slot))
.map(|account_ref| {
self.hits.fetch_add(1, Ordering::Relaxed);
let value = account_ref.value();
// remember last use
let now = Instant::now();
*value.last_used.write().unwrap() = now;
value.account.clone()
})
.or_else(|| {
pub(crate) fn load(&self, pubkey: Pubkey, slot: Slot) -> Option<AccountSharedData> {
let key = (pubkey, slot);
let mut entry = match self.cache.get_mut(&key) {
None => {
self.misses.fetch_add(1, Ordering::Relaxed);
None
})
return None;
}
Some(entry) => entry,
};
self.hits.fetch_add(1, Ordering::Relaxed);
// Move the entry to the end of the queue.
// self.queue is modified while holding a reference to the cache entry;
// so that another thread cannot write to the same key.
{
let mut queue = self.queue.lock().unwrap();
queue.remove(entry.index);
entry.index = queue.insert_last(key);
}
Some(entry.account.clone())
}
fn account_size(&self, account: &AccountSharedData) -> usize {
account.data().len() + self.per_account_size
CACHE_ENTRY_SIZE + account.data().len()
}
pub fn store(&self, pubkey: &Pubkey, slot: Slot, account: &AccountSharedData) {
let len = self.account_size(account);
let previous_len = if let Some(previous) = self.cache.insert(
(*pubkey, slot),
ReadOnlyAccountCacheEntry {
account: account.clone(),
last_used: Arc::new(RwLock::new(Instant::now())),
},
) {
self.account_size(&previous.account)
} else {
0
};
match len.cmp(&previous_len) {
std::cmp::Ordering::Greater => {
self.data_size
.fetch_add(len - previous_len, Ordering::Relaxed);
pub(crate) fn store(&self, pubkey: Pubkey, slot: Slot, account: AccountSharedData) {
let key = (pubkey, slot);
let account_size = self.account_size(&account);
self.data_size.fetch_add(account_size, Ordering::Relaxed);
// self.queue is modified while holding a reference to the cache entry;
// so that another thread cannot write to the same key.
match self.cache.entry(key) {
Entry::Vacant(entry) => {
// Insert the entry at the end of the queue.
let mut queue = self.queue.lock().unwrap();
let index = queue.insert_last(key);
entry.insert(ReadOnlyAccountCacheEntry { account, index });
}
std::cmp::Ordering::Less => {
self.data_size
.fetch_sub(previous_len - len, Ordering::Relaxed);
}
std::cmp::Ordering::Equal => {
// no change in size
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
let account_size = self.account_size(&entry.account);
self.data_size.fetch_sub(account_size, Ordering::Relaxed);
entry.account = account;
// Move the entry to the end of the queue.
let mut queue = self.queue.lock().unwrap();
queue.remove(entry.index);
entry.index = queue.insert_last(key);
}
};
}
pub fn remove(&self, pubkey: &Pubkey, slot: Slot) {
if let Some((_, value)) = self.cache.remove(&(*pubkey, slot)) {
self.data_size
.fetch_sub(self.account_size(&value.account), Ordering::Relaxed);
// Evict entries from the front of the queue.
while self.data_size.load(Ordering::Relaxed) > self.max_data_size {
let (pubkey, slot) = match self.queue.lock().unwrap().get_first() {
None => break,
Some(key) => *key,
};
self.remove(pubkey, slot);
}
}
fn purge_lru_list(&self, lru: &[LruEntry], lru_index: &mut usize) -> bool {
let mut freed_bytes = 0;
let start = *lru_index;
let mut done = false;
let current_size = self.data_size.load(Ordering::Relaxed);
for (timestamp, key) in lru.iter().skip(start) {
if current_size.saturating_sub(freed_bytes) <= self.max_data_size {
done = true;
break;
}
*lru_index += 1;
match self.cache.entry(*key) {
Entry::Vacant(_entry) => (),
Entry::Occupied(entry) => {
if *timestamp == *entry.get().last_used.read().unwrap() {
let size = self.account_size(&entry.get().account);
freed_bytes += size;
entry.remove();
}
}
}
}
if freed_bytes > 0 {
// if this overflows, we'll have a really big data size, so we'll clean everything, scan all, and reset the size. Not ideal, but not terrible.
self.data_size.fetch_sub(freed_bytes, Ordering::Relaxed);
}
done
pub(crate) fn remove(&self, pubkey: Pubkey, slot: Slot) -> Option<AccountSharedData> {
let (_, entry) = self.cache.remove(&(pubkey, slot))?;
// self.queue should be modified only after removing the entry from the
// cache, so that this is still safe if another thread writes to the
// same key.
self.queue.lock().unwrap().remove(entry.index);
let account_size = self.account_size(&entry.account);
self.data_size.fetch_sub(account_size, Ordering::Relaxed);
Some(entry.account)
}
fn calculate_lru_list(&self, lru: &mut Vec<LruEntry>) -> usize {
lru.clear();
lru.reserve(self.cache.len());
let mut new_size = 0;
for item in self.cache.iter() {
let value = item.value();
let item_len = self.account_size(&value.account);
new_size += item_len;
lru.push((*value.last_used.read().unwrap(), *item.key()));
}
new_size
}
fn bg_purge_lru_items(&self, once: bool) {
let mut lru = Vec::new();
let mut lru_index = 0;
let mut stop = false;
loop {
if !once {
sleep(Duration::from_millis(200));
} else {
if stop {
break;
}
stop = true;
}
if self.stop.load(Ordering::Relaxed) {
break;
}
// purge from the lru list we last made
if self.purge_lru_list(&lru, &mut lru_index) {
continue;
}
// we didn't get enough, so calculate a new list and keep purging
let new_size = self.calculate_lru_list(&mut lru);
lru_index = 0;
self.data_size.store(new_size, Ordering::Relaxed);
lru.sort();
self.purge_lru_list(&lru, &mut lru_index);
}
}
pub fn cache_len(&self) -> usize {
pub(crate) fn cache_len(&self) -> usize {
self.cache.len()
}
pub fn data_size(&self) -> usize {
pub(crate) fn data_size(&self) -> usize {
self.data_size.load(Ordering::Relaxed)
}
pub fn get_and_reset_stats(&self) -> (u64, u64) {
pub(crate) fn get_and_reset_stats(&self) -> (u64, u64) {
let hits = self.hits.swap(0, Ordering::Relaxed);
let misses = self.misses.swap(0, Ordering::Relaxed);
(hits, misses)
@ -240,10 +138,16 @@ impl ReadOnlyAccountsCache {
}
#[cfg(test)]
pub mod tests {
mod tests {
use {
super::*,
rand::{
seq::{IteratorRandom, SliceRandom},
Rng, SeedableRng,
},
rand_chacha::ChaChaRng,
solana_sdk::account::{accounts_equal, Account, WritableAccount},
std::{collections::HashMap, iter::repeat_with, sync::Arc},
};
#[test]
fn test_accountsdb_sizeof() {
@ -252,27 +156,18 @@ pub mod tests {
assert!(std::mem::size_of::<Arc<u64>>() == std::mem::size_of::<Arc<[u8; 32]>>());
}
#[test]
fn test_read_only_accounts_cache_drop() {
solana_logger::setup();
let cache = ReadOnlyAccountsCache::new_test(100);
let stop = cache.stop.clone();
drop(cache);
assert!(stop.load(Ordering::Relaxed));
}
#[test]
fn test_read_only_accounts_cache() {
solana_logger::setup();
let per_account_size = ReadOnlyAccountsCache::per_account_size();
let per_account_size = CACHE_ENTRY_SIZE;
let data_size = 100;
let max = data_size + per_account_size;
let cache = ReadOnlyAccountsCache::new_test(max);
let cache = ReadOnlyAccountsCache::new(max);
let slot = 0;
assert!(cache.load(&Pubkey::default(), slot).is_none());
assert!(cache.load(Pubkey::default(), slot).is_none());
assert_eq!(0, cache.cache_len());
assert_eq!(0, cache.data_size());
cache.remove(&Pubkey::default(), slot); // assert no panic
cache.remove(Pubkey::default(), slot); // assert no panic
let key1 = Pubkey::new_unique();
let key2 = Pubkey::new_unique();
let key3 = Pubkey::new_unique();
@ -284,46 +179,102 @@ pub mod tests {
account2.checked_add_lamports(1).unwrap(); // so they compare differently
let mut account3 = account1.clone();
account3.checked_add_lamports(4).unwrap(); // so they compare differently
cache.store(&key1, slot, &account1);
cache.store(key1, slot, account1.clone());
assert_eq!(100 + per_account_size, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account2);
cache.bg_purge_lru_items(true);
cache.store(key2, slot, account2.clone());
assert_eq!(100 + per_account_size, cache.data_size());
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2));
assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account2));
assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account1); // overwrite key2 with account1
cache.store(key2, slot, account1.clone()); // overwrite key2 with account1
assert_eq!(100 + per_account_size, cache.data_size());
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len());
cache.remove(&key2, slot);
cache.remove(key2, slot);
assert_eq!(0, cache.data_size());
assert_eq!(0, cache.cache_len());
// can store 2 items, 3rd item kicks oldest item out
let max = (data_size + per_account_size) * 2;
let cache = ReadOnlyAccountsCache::new_test(max);
cache.store(&key1, slot, &account1);
let cache = ReadOnlyAccountsCache::new(max);
cache.store(key1, slot, account1.clone());
assert_eq!(100 + per_account_size, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1));
assert_eq!(1, cache.cache_len());
cache.store(&key2, slot, &account2);
cache.store(key2, slot, account2.clone());
assert_eq!(max, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account2));
assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account2));
assert_eq!(2, cache.cache_len());
cache.store(&key2, slot, &account1); // overwrite key2 with account1
cache.store(key2, slot, account1.clone()); // overwrite key2 with account1
assert_eq!(max, cache.data_size());
assert!(accounts_equal(&cache.load(&key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key1, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1));
assert_eq!(2, cache.cache_len());
cache.store(&key3, slot, &account3);
cache.bg_purge_lru_items(true);
cache.store(key3, slot, account3.clone());
assert_eq!(max, cache.data_size());
assert!(cache.load(&key1, slot).is_none()); // was lru purged
assert!(accounts_equal(&cache.load(&key2, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(&key3, slot).unwrap(), &account3));
assert!(cache.load(key1, slot).is_none()); // was lru purged
assert!(accounts_equal(&cache.load(key2, slot).unwrap(), &account1));
assert!(accounts_equal(&cache.load(key3, slot).unwrap(), &account3));
assert_eq!(2, cache.cache_len());
}
#[test]
fn test_read_only_accounts_cache_random() {
const SEED: [u8; 32] = [0xdb; 32];
const DATA_SIZE: usize = 19;
const MAX_CACHE_SIZE: usize = 17 * (CACHE_ENTRY_SIZE + DATA_SIZE);
let mut rng = ChaChaRng::from_seed(SEED);
let cache = ReadOnlyAccountsCache::new(MAX_CACHE_SIZE);
let slots: Vec<Slot> = repeat_with(|| rng.gen_range(0, 1000)).take(5).collect();
let pubkeys: Vec<Pubkey> = repeat_with(|| {
let mut arr = [0u8; 32];
rng.fill(&mut arr[..]);
Pubkey::new_from_array(arr)
})
.take(7)
.collect();
let mut hash_map = HashMap::<ReadOnlyCacheKey, (AccountSharedData, usize)>::new();
for ix in 0..1000 {
if rng.gen_bool(0.1) {
let key = *cache.cache.iter().choose(&mut rng).unwrap().key();
let (pubkey, slot) = key;
let account = cache.load(pubkey, slot).unwrap();
let (other, index) = hash_map.get_mut(&key).unwrap();
assert_eq!(account, *other);
*index = ix;
} else {
let mut data = vec![0u8; DATA_SIZE];
rng.fill(&mut data[..]);
let account = AccountSharedData::from(Account {
lamports: rng.gen(),
data,
executable: rng.gen(),
rent_epoch: rng.gen(),
owner: Pubkey::default(),
});
let slot = *slots.choose(&mut rng).unwrap();
let pubkey = *pubkeys.choose(&mut rng).unwrap();
let key = (pubkey, slot);
hash_map.insert(key, (account.clone(), ix));
cache.store(pubkey, slot, account);
}
}
assert_eq!(cache.cache_len(), 17);
assert_eq!(hash_map.len(), 35);
let index = hash_map
.iter()
.filter(|(k, _)| cache.cache.contains_key(k))
.map(|(_, (_, ix))| *ix)
.min()
.unwrap();
for (key, (account, ix)) in hash_map {
let (pubkey, slot) = key;
assert_eq!(
cache.load(pubkey, slot),
if ix < index { None } else { Some(account) }
);
}
}
}