From 39340ed25b5b928a945b7938d57180253a64c5c1 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Sat, 13 Nov 2021 14:00:37 -0600 Subject: [PATCH] throttle store_cached when cache size is too large (#21188) * throttle store_cached when cache size is too large * reduce max delay * 100ms max * 10ms max delay --- runtime/src/accounts_cache.rs | 81 +++++++++++++++++++++++++++-------- runtime/src/accounts_db.rs | 25 ++++++++++- 2 files changed, 88 insertions(+), 18 deletions(-) diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index ed00445e83..9cd0b152c3 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -17,15 +17,25 @@ use std::{ pub type SlotCache = Arc; -#[derive(Default, Debug)] +#[derive(Debug)] pub struct SlotCacheInner { cache: DashMap, same_account_writes: AtomicU64, same_account_writes_size: AtomicU64, unique_account_writes_size: AtomicU64, + size: AtomicU64, + total_size: Arc, is_frozen: AtomicBool, } +impl Drop for SlotCacheInner { + fn drop(&mut self) { + // broader cache no longer holds our size in memory + self.total_size + .fetch_sub(self.size.load(Ordering::Relaxed), Ordering::Relaxed); + } +} + impl SlotCacheInner { pub fn report_slot_store_metrics(&self) { datapoint_info!( @@ -44,7 +54,8 @@ impl SlotCacheInner { "unique_account_writes_size", self.unique_account_writes_size.load(Ordering::Relaxed), i64 - ) + ), + ("size", self.size.load(Ordering::Relaxed), i64) ); } @@ -59,21 +70,36 @@ impl SlotCacheInner { hash: Option>, slot: Slot, ) -> CachedAccount { - if self.cache.contains_key(pubkey) { - self.same_account_writes.fetch_add(1, Ordering::Relaxed); - self.same_account_writes_size - .fetch_add(account.data().len() as u64, Ordering::Relaxed); - } else { - self.unique_account_writes_size - .fetch_add(account.data().len() as u64, Ordering::Relaxed); - } + let data_len = account.data().len() as u64; let item = Arc::new(CachedAccountInner { account, hash: RwLock::new(hash.map(|h| *h.borrow())), slot, pubkey: *pubkey, }); - self.cache.insert(*pubkey, item.clone()); + if let Some(old) = self.cache.insert(*pubkey, item.clone()) { + self.same_account_writes.fetch_add(1, Ordering::Relaxed); + self.same_account_writes_size + .fetch_add(data_len, Ordering::Relaxed); + + let old_len = old.account.data().len() as u64; + let grow = old_len.saturating_sub(data_len); + if grow > 0 { + self.size.fetch_add(grow, Ordering::Relaxed); + self.total_size.fetch_add(grow, Ordering::Relaxed); + } else { + let shrink = data_len.saturating_sub(old_len); + if shrink > 0 { + self.size.fetch_add(shrink, Ordering::Relaxed); + self.total_size.fetch_sub(shrink, Ordering::Relaxed); + } + } + } else { + self.size.fetch_add(data_len, Ordering::Relaxed); + self.total_size.fetch_add(data_len, Ordering::Relaxed); + self.unique_account_writes_size + .fetch_add(data_len, Ordering::Relaxed); + } item } @@ -146,12 +172,23 @@ pub struct AccountsCache { // could have triggered a flush of this slot already maybe_unflushed_roots: RwLock>, max_flushed_root: AtomicU64, + total_size: Arc, } impl AccountsCache { - pub fn report_size(&self) { - let total_unique_writes_size: u64 = self - .cache + pub fn new_inner(&self) -> SlotCache { + Arc::new(SlotCacheInner { + cache: DashMap::default(), + same_account_writes: AtomicU64::default(), + same_account_writes_size: AtomicU64::default(), + unique_account_writes_size: AtomicU64::default(), + size: AtomicU64::default(), + total_size: Arc::clone(&self.total_size), + is_frozen: AtomicBool::default(), + }) + } + fn unique_account_writes_size(&self) -> u64 { + self.cache .iter() .map(|item| { let slot_cache = item.value(); @@ -159,7 +196,12 @@ impl AccountsCache { .unique_account_writes_size .load(Ordering::Relaxed) }) - .sum(); + .sum() + } + pub fn size(&self) -> u64 { + self.total_size.load(Ordering::Relaxed) + } + pub fn report_size(&self) { datapoint_info!( "accounts_cache_size", ( @@ -168,7 +210,12 @@ impl AccountsCache { i64 ), ("num_slots", self.cache.len(), i64), - ("total_unique_writes_size", total_unique_writes_size, i64), + ( + "total_unique_writes_size", + self.unique_account_writes_size(), + i64 + ), + ("total_size", self.size(), i64), ); } @@ -187,7 +234,7 @@ impl AccountsCache { self .cache .entry(slot) - .or_insert(Arc::new(SlotCacheInner::default())) + .or_insert(self.new_inner()) .clone()); slot_cache.insert(pubkey, account, hash, slot) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index d92f643e17..b8874f49fe 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -79,7 +79,6 @@ use std::{ }; use tempfile::TempDir; -#[cfg(test)] use std::{thread::sleep, time::Duration}; const PAGE_SIZE: u64 = 4 * 1024; @@ -6243,7 +6242,31 @@ impl AccountsDb { } } + /// sleep while accounts cache size has grown too large, up to a max wait + fn maybe_stall_store_cached(&self, accounts: &[(&Pubkey, &AccountSharedData)]) { + const MAX_DELAY_US: u128 = 10_000; + const CACHE_SIZE_TO_STALL: u64 = 10_000_000_000; + const DELAY_US: u64 = 1_000; + let start = Instant::now(); + let mut waited = false; + while self.accounts_cache.size() > CACHE_SIZE_TO_STALL { + waited = true; + sleep(Duration::from_micros(DELAY_US)); + if start.elapsed().as_micros() > MAX_DELAY_US { + break; + } + } + if waited { + datapoint_info!( + "accounts_db_store_cached_stall", + ("num_accounts", accounts.len(), i64), + ("delay_us", start.elapsed().as_micros(), i64), + ); + } + } + pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { + self.maybe_stall_store_cached(accounts); self.store(slot, accounts, self.caching_enabled); }