throttle store_cached when cache size is too large (#21188)

* throttle store_cached when cache size is too large

* reduce max delay

* 100ms max

* 10ms max delay
This commit is contained in:
Jeff Washington (jwash) 2021-11-13 14:00:37 -06:00 committed by GitHub
parent 3f4f05865d
commit 39340ed25b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 18 deletions

View File

@ -17,15 +17,25 @@ use std::{
pub type SlotCache = Arc<SlotCacheInner>;
#[derive(Default, Debug)]
#[derive(Debug)]
pub struct SlotCacheInner {
cache: DashMap<Pubkey, CachedAccount>,
same_account_writes: AtomicU64,
same_account_writes_size: AtomicU64,
unique_account_writes_size: AtomicU64,
size: AtomicU64,
total_size: Arc<AtomicU64>,
is_frozen: AtomicBool,
}
impl Drop for SlotCacheInner {
fn drop(&mut self) {
// broader cache no longer holds our size in memory
self.total_size
.fetch_sub(self.size.load(Ordering::Relaxed), Ordering::Relaxed);
}
}
impl SlotCacheInner {
pub fn report_slot_store_metrics(&self) {
datapoint_info!(
@ -44,7 +54,8 @@ impl SlotCacheInner {
"unique_account_writes_size",
self.unique_account_writes_size.load(Ordering::Relaxed),
i64
)
),
("size", self.size.load(Ordering::Relaxed), i64)
);
}
@ -59,21 +70,36 @@ impl SlotCacheInner {
hash: Option<impl Borrow<Hash>>,
slot: Slot,
) -> CachedAccount {
if self.cache.contains_key(pubkey) {
self.same_account_writes.fetch_add(1, Ordering::Relaxed);
self.same_account_writes_size
.fetch_add(account.data().len() as u64, Ordering::Relaxed);
} else {
self.unique_account_writes_size
.fetch_add(account.data().len() as u64, Ordering::Relaxed);
}
let data_len = account.data().len() as u64;
let item = Arc::new(CachedAccountInner {
account,
hash: RwLock::new(hash.map(|h| *h.borrow())),
slot,
pubkey: *pubkey,
});
self.cache.insert(*pubkey, item.clone());
if let Some(old) = self.cache.insert(*pubkey, item.clone()) {
self.same_account_writes.fetch_add(1, Ordering::Relaxed);
self.same_account_writes_size
.fetch_add(data_len, Ordering::Relaxed);
let old_len = old.account.data().len() as u64;
let grow = old_len.saturating_sub(data_len);
if grow > 0 {
self.size.fetch_add(grow, Ordering::Relaxed);
self.total_size.fetch_add(grow, Ordering::Relaxed);
} else {
let shrink = data_len.saturating_sub(old_len);
if shrink > 0 {
self.size.fetch_add(shrink, Ordering::Relaxed);
self.total_size.fetch_sub(shrink, Ordering::Relaxed);
}
}
} else {
self.size.fetch_add(data_len, Ordering::Relaxed);
self.total_size.fetch_add(data_len, Ordering::Relaxed);
self.unique_account_writes_size
.fetch_add(data_len, Ordering::Relaxed);
}
item
}
@ -146,12 +172,23 @@ pub struct AccountsCache {
// could have triggered a flush of this slot already
maybe_unflushed_roots: RwLock<BTreeSet<Slot>>,
max_flushed_root: AtomicU64,
total_size: Arc<AtomicU64>,
}
impl AccountsCache {
pub fn report_size(&self) {
let total_unique_writes_size: u64 = self
.cache
pub fn new_inner(&self) -> SlotCache {
Arc::new(SlotCacheInner {
cache: DashMap::default(),
same_account_writes: AtomicU64::default(),
same_account_writes_size: AtomicU64::default(),
unique_account_writes_size: AtomicU64::default(),
size: AtomicU64::default(),
total_size: Arc::clone(&self.total_size),
is_frozen: AtomicBool::default(),
})
}
fn unique_account_writes_size(&self) -> u64 {
self.cache
.iter()
.map(|item| {
let slot_cache = item.value();
@ -159,7 +196,12 @@ impl AccountsCache {
.unique_account_writes_size
.load(Ordering::Relaxed)
})
.sum();
.sum()
}
pub fn size(&self) -> u64 {
self.total_size.load(Ordering::Relaxed)
}
pub fn report_size(&self) {
datapoint_info!(
"accounts_cache_size",
(
@ -168,7 +210,12 @@ impl AccountsCache {
i64
),
("num_slots", self.cache.len(), i64),
("total_unique_writes_size", total_unique_writes_size, i64),
(
"total_unique_writes_size",
self.unique_account_writes_size(),
i64
),
("total_size", self.size(), i64),
);
}
@ -187,7 +234,7 @@ impl AccountsCache {
self
.cache
.entry(slot)
.or_insert(Arc::new(SlotCacheInner::default()))
.or_insert(self.new_inner())
.clone());
slot_cache.insert(pubkey, account, hash, slot)

View File

@ -79,7 +79,6 @@ use std::{
};
use tempfile::TempDir;
#[cfg(test)]
use std::{thread::sleep, time::Duration};
const PAGE_SIZE: u64 = 4 * 1024;
@ -6243,7 +6242,31 @@ impl AccountsDb {
}
}
/// sleep while accounts cache size has grown too large, up to a max wait
fn maybe_stall_store_cached(&self, accounts: &[(&Pubkey, &AccountSharedData)]) {
const MAX_DELAY_US: u128 = 10_000;
const CACHE_SIZE_TO_STALL: u64 = 10_000_000_000;
const DELAY_US: u64 = 1_000;
let start = Instant::now();
let mut waited = false;
while self.accounts_cache.size() > CACHE_SIZE_TO_STALL {
waited = true;
sleep(Duration::from_micros(DELAY_US));
if start.elapsed().as_micros() > MAX_DELAY_US {
break;
}
}
if waited {
datapoint_info!(
"accounts_db_store_cached_stall",
("num_accounts", accounts.len(), i64),
("delay_us", start.elapsed().as_micros(), i64),
);
}
}
pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
self.maybe_stall_store_cached(accounts);
self.store(slot, accounts, self.caching_enabled);
}