diff --git a/runtime/src/accounts_cache.rs b/runtime/src/accounts_cache.rs index 9cd0b152c..1aec6a5c6 100644 --- a/runtime/src/accounts_cache.rs +++ b/runtime/src/accounts_cache.rs @@ -289,26 +289,21 @@ impl AccountsCache { removed_slots } - pub fn find_older_frozen_slots(&self, num_to_retain: usize) -> Vec { - if self.cache.len() > num_to_retain { - let mut slots: Vec<_> = self - .cache - .iter() - .filter_map(|item| { - let (slot, slot_cache) = item.pair(); - if slot_cache.is_frozen() { - Some(*slot) - } else { - None - } - }) - .collect(); - slots.sort_unstable(); - slots.truncate(slots.len().saturating_sub(num_to_retain)); - slots - } else { - vec![] - } + pub fn cached_frozen_slots(&self) -> Vec { + let mut slots: Vec<_> = self + .cache + .iter() + .filter_map(|item| { + let (slot, slot_cache) = item.pair(); + if slot_cache.is_frozen() { + Some(*slot) + } else { + None + } + }) + .collect(); + slots.sort_unstable(); + slots } pub fn num_slots(&self) -> usize { @@ -347,10 +342,10 @@ pub mod tests { } #[test] - fn test_find_older_frozen_slots() { + fn test_cached_frozen_slots() { let cache = AccountsCache::default(); // Cache is empty, should return nothing - assert!(cache.find_older_frozen_slots(0).is_empty()); + assert!(cache.cached_frozen_slots().is_empty()); let inserted_slot = 0; cache.store( inserted_slot, @@ -359,14 +354,11 @@ pub mod tests { Some(&Hash::default()), ); - // If the cache is told the size limit is 0, it should return nothing because there's only - // one cached slot - assert!(cache.find_older_frozen_slots(1).is_empty()); // If the cache is told the size limit is 0, it should return nothing, because there's no // frozen slots - assert!(cache.find_older_frozen_slots(0).is_empty()); + assert!(cache.cached_frozen_slots().is_empty()); cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen(); // If the cache is told the size limit is 0, it should return the one frozen slot - assert_eq!(cache.find_older_frozen_slots(0), vec![inserted_slot]); + assert_eq!(cache.cached_frozen_slots(), vec![inserted_slot]); } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index d1693abd6..38af3072f 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -84,7 +84,9 @@ use std::{thread::sleep, time::Duration}; const PAGE_SIZE: u64 = 4 * 1024; const MAX_RECYCLE_STORES: usize = 1000; const STORE_META_OVERHEAD: usize = 256; -const MAX_CACHE_SLOTS: usize = 200; +// when the accounts write cache exceeds this many bytes, we will flush it +// this can be specified on the command line, too (--accounts-db-cache-limit-mb) +const WRITE_CACHE_LIMIT_BYTES_DEFAULT: u64 = 15_000_000_000; const FLUSH_CACHE_RANDOM_THRESHOLD: usize = MAX_LOCKOUT_HISTORY; const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000; @@ -129,12 +131,14 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { accounts_hash_cache_path: None, filler_account_count: None, hash_calc_num_passes: None, + write_cache_limit_bytes: None, }; pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig { index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS), accounts_hash_cache_path: None, filler_account_count: None, hash_calc_num_passes: None, + write_cache_limit_bytes: None, }; pub type BinnedHashData = Vec>; @@ -151,6 +155,7 @@ pub struct AccountsDbConfig { pub accounts_hash_cache_path: Option, pub filler_account_count: Option, pub hash_calc_num_passes: Option, + pub write_cache_limit_bytes: Option, } struct FoundStoredAccount<'a> { @@ -960,6 +965,8 @@ pub struct AccountsDb { pub accounts_cache: AccountsCache, + write_cache_limit_bytes: Option, + sender_bg_hasher: Option>, pub read_only_accounts_cache: ReadOnlyAccountsCache, @@ -1186,7 +1193,7 @@ impl PurgeStats { } } -#[derive(Debug)] +#[derive(Debug, Default)] struct FlushStats { #[allow(dead_code)] slot: Slot, @@ -1554,6 +1561,7 @@ impl AccountsDb { next_id: AtomicUsize::new(0), shrink_candidate_slots_v1: Mutex::new(Vec::new()), shrink_candidate_slots: Mutex::new(HashMap::new()), + write_cache_limit_bytes: None, write_version: AtomicU64::new(0), paths: vec![], accounts_hash_cache_path, @@ -1649,6 +1657,9 @@ impl AccountsDb { accounts_update_notifier, filler_account_count, filler_account_suffix, + write_cache_limit_bytes: accounts_db_config + .as_ref() + .and_then(|x| x.write_cache_limit_bytes), ..Self::default_with_accounts_index( accounts_index, accounts_hash_cache_path, @@ -4489,16 +4500,21 @@ impl AccountsDb { self.flush_slot_cache(slot, None::<&mut fn(&_, &_) -> bool>); } + /// true if write cache is too big + fn should_aggressively_flush_cache(&self) -> bool { + self.write_cache_limit_bytes + .unwrap_or(WRITE_CACHE_LIMIT_BYTES_DEFAULT) + < self.accounts_cache.size() + } + // `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then // flushes: - // 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache, - // 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess - // unrooted slots + // 1) excess remaining roots or unrooted slots while 'should_aggressively_flush_cache' is true pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option) { #[cfg(not(test))] assert!(requested_flush_root.is_some()); - if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { + if !force_flush && !self.should_aggressively_flush_cache() { return; } @@ -4522,9 +4538,9 @@ impl AccountsDb { // for those slot, let the Bank::drop() implementation do cleanup instead on dead // banks - // If there are > MAX_CACHE_SLOTS, then flush the excess ones to storage + // If 'should_aggressively_flush_cache', then flush the excess ones to storage let (total_new_excess_roots, num_excess_roots_flushed) = - if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS { + if self.should_aggressively_flush_cache() { // Start by flushing the roots // // Cannot do any cleaning on roots past `requested_flush_root` because future @@ -4534,26 +4550,40 @@ impl AccountsDb { } else { (0, 0) }; - let old_slots = self.accounts_cache.find_older_frozen_slots(MAX_CACHE_SLOTS); - let excess_slot_count = old_slots.len(); + + let mut excess_slot_count = 0; let mut unflushable_unrooted_slot_count = 0; let max_flushed_root = self.accounts_cache.fetch_max_flush_root(); - let old_slot_flush_stats: Vec<_> = old_slots - .into_iter() - .filter_map(|old_slot| { + if self.should_aggressively_flush_cache() { + let old_slots = self.accounts_cache.cached_frozen_slots(); + excess_slot_count = old_slots.len(); + let mut flush_stats = FlushStats::default(); + old_slots.into_iter().for_each(|old_slot| { // Don't flush slots that are known to be unrooted if old_slot > max_flushed_root { - Some(self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>)) + if self.should_aggressively_flush_cache() { + if let Some(stats) = + self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>) + { + flush_stats.num_flushed += stats.num_flushed; + flush_stats.num_purged += stats.num_purged; + flush_stats.total_size += stats.total_size; + } + } } else { unflushable_unrooted_slot_count += 1; - None } - }) - .collect(); - info!( - "req_flush_root: {:?} old_slot_flushes: {:?}", - requested_flush_root, old_slot_flush_stats - ); + }); + datapoint_info!( + "accounts_db-flush_accounts_cache_aggressively", + ("num_flushed", flush_stats.num_flushed, i64), + ("num_purged", flush_stats.num_purged, i64), + ("total_flush_size", flush_stats.total_size, i64), + ("total_cache_size", self.accounts_cache.size(), i64), + ("total_frozen_slots", excess_slot_count, i64), + ("total_slots", self.accounts_cache.num_slots(), i64), + ); + } datapoint_info!( "accounts_db-flush_accounts_cache", @@ -4582,7 +4612,7 @@ impl AccountsDb { let num_slots_remaining = self.accounts_cache.num_slots(); if force_flush && num_slots_remaining >= FLUSH_CACHE_RANDOM_THRESHOLD { // Don't flush slots that are known to be unrooted - let mut frozen_slots = self.accounts_cache.find_older_frozen_slots(0); + let mut frozen_slots = self.accounts_cache.cached_frozen_slots(); frozen_slots.retain(|s| *s > max_flushed_root); // Remove a random index 0 <= i < `frozen_slots.len()` let rand_slot = frozen_slots.choose(&mut thread_rng()); @@ -11286,25 +11316,32 @@ pub mod tests { ); } + fn max_cache_slots() -> usize { + // this used to be the limiting factor - used here to facilitate tests. + 200 + } + #[test] fn test_flush_accounts_cache_if_needed() { - run_test_flush_accounts_cache_if_needed(0, 2 * MAX_CACHE_SLOTS); - run_test_flush_accounts_cache_if_needed(2 * MAX_CACHE_SLOTS, 0); - run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS - 1, 0); - run_test_flush_accounts_cache_if_needed(0, MAX_CACHE_SLOTS - 1); - run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS, 0); - run_test_flush_accounts_cache_if_needed(0, MAX_CACHE_SLOTS); - run_test_flush_accounts_cache_if_needed(2 * MAX_CACHE_SLOTS, 2 * MAX_CACHE_SLOTS); - run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS - 1, MAX_CACHE_SLOTS - 1); - run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS, MAX_CACHE_SLOTS); + run_test_flush_accounts_cache_if_needed(0, 2 * max_cache_slots()); + run_test_flush_accounts_cache_if_needed(2 * max_cache_slots(), 0); + run_test_flush_accounts_cache_if_needed(max_cache_slots() - 1, 0); + run_test_flush_accounts_cache_if_needed(0, max_cache_slots() - 1); + run_test_flush_accounts_cache_if_needed(max_cache_slots(), 0); + run_test_flush_accounts_cache_if_needed(0, max_cache_slots()); + run_test_flush_accounts_cache_if_needed(2 * max_cache_slots(), 2 * max_cache_slots()); + run_test_flush_accounts_cache_if_needed(max_cache_slots() - 1, max_cache_slots() - 1); + run_test_flush_accounts_cache_if_needed(max_cache_slots(), max_cache_slots()); } fn run_test_flush_accounts_cache_if_needed(num_roots: usize, num_unrooted: usize) { let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development); + db.write_cache_limit_bytes = Some(max_cache_slots() as u64); db.caching_enabled = true; - let account0 = AccountSharedData::new(1, 0, &Pubkey::default()); + let space = 1; // # data bytes per account. write cache counts data len + let account0 = AccountSharedData::new(1, space, &Pubkey::default()); let mut keys = vec![]; - let num_slots = 2 * MAX_CACHE_SLOTS; + let num_slots = 2 * max_cache_slots(); for i in 0..num_roots + num_unrooted { let key = Pubkey::new_unique(); db.store_cached(i as Slot, &[(&key, &account0)]); @@ -11319,18 +11356,24 @@ pub mod tests { let total_slots = num_roots + num_unrooted; // If there's <= the max size, then nothing will be flushed from the slot - if total_slots <= MAX_CACHE_SLOTS { + if total_slots <= max_cache_slots() { assert_eq!(db.accounts_cache.num_slots(), total_slots); } else { - // Otherwise, all the roots are flushed, and only at most MAX_CACHE_SLOTS + // Otherwise, all the roots are flushed, and only at most max_cache_slots() // of the unrooted slots are kept in the cache - let expected_size = std::cmp::min(num_unrooted, MAX_CACHE_SLOTS); + let expected_size = std::cmp::min(num_unrooted, max_cache_slots()); if expected_size > 0 { - for unrooted_slot in total_slots - expected_size..total_slots { - assert!(db - .accounts_cache - .slot_cache(unrooted_slot as Slot) - .is_some()); + // +1: slot is 1-based. slot 1 has 1 byte of data + for unrooted_slot in (total_slots - expected_size + 1)..total_slots { + assert!( + db.accounts_cache + .slot_cache(unrooted_slot as Slot) + .is_some(), + "unrooted_slot: {}, total_slots: {}, expected_size: {}", + unrooted_slot, + total_slots, + expected_size + ); } } } @@ -11751,15 +11794,19 @@ pub mod tests { fn setup_accounts_db_cache_clean( num_slots: usize, scan_slot: Option, + write_cache_limit_bytes: Option, ) -> (Arc, Vec, Vec, Option) { let caching_enabled = true; - let accounts_db = Arc::new(AccountsDb::new_with_config_for_tests( + let mut accounts_db = AccountsDb::new_with_config_for_tests( Vec::new(), &ClusterType::Development, AccountSecondaryIndexes::default(), caching_enabled, AccountShrinkThreshold::default(), - )); + ); + accounts_db.write_cache_limit_bytes = write_cache_limit_bytes; + let accounts_db = Arc::new(accounts_db); + let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect(); let stall_slot = num_slots as Slot; let scan_stall_key = Pubkey::new_unique(); @@ -11781,9 +11828,10 @@ pub mod tests { let mut scan_tracker = None; for slot in &slots { for key in &keys[*slot as usize..] { + let space = 1; // 1 byte allows us to track by size accounts_db.store_cached( *slot, - &[(key, &AccountSharedData::new(1, 0, &Pubkey::default()))], + &[(key, &AccountSharedData::new(1, space, &Pubkey::default()))], ); } accounts_db.add_root(*slot as Slot); @@ -11805,8 +11853,8 @@ pub mod tests { accounts_db.accounts_cache.remove_slot(stall_slot); - // If there's <= MAX_CACHE_SLOTS, no slots should be flushed - if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS { + // If there's <= max_cache_slots(), no slots should be flushed + if accounts_db.accounts_cache.num_slots() <= max_cache_slots() { accounts_db.flush_accounts_cache(false, None); assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots); } @@ -11817,7 +11865,8 @@ pub mod tests { #[test] fn test_accounts_db_cache_clean_dead_slots() { let num_slots = 10; - let (accounts_db, keys, mut slots, _) = setup_accounts_db_cache_clean(num_slots, None); + let (accounts_db, keys, mut slots, _) = + setup_accounts_db_cache_clean(num_slots, None, None); let last_dead_slot = (num_slots - 1) as Slot; assert_eq!(*slots.last().unwrap(), last_dead_slot); let alive_slot = last_dead_slot as Slot + 1; @@ -11894,7 +11943,7 @@ pub mod tests { #[test] fn test_accounts_db_cache_clean() { - let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(10, None); + let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(10, None, None); // If no `max_clean_root` is specified, cleaning should purge all flushed slots accounts_db.flush_accounts_cache(true, None); @@ -11935,8 +11984,8 @@ pub mod tests { ) { assert!(requested_flush_root < (num_slots as Slot)); let (accounts_db, keys, slots, scan_tracker) = - setup_accounts_db_cache_clean(num_slots, scan_root); - let is_cache_at_limit = num_slots - requested_flush_root as usize - 1 > MAX_CACHE_SLOTS; + setup_accounts_db_cache_clean(num_slots, scan_root, Some(max_cache_slots() as u64)); + let is_cache_at_limit = num_slots - requested_flush_root as usize - 1 > max_cache_slots(); // If: // 1) `requested_flush_root` is specified, @@ -12068,10 +12117,10 @@ pub mod tests { #[test] fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() { let requested_flush_root = 5; - // Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots + // Test that if there are > max_cache_slots() in the cache after flush, then more roots // will be flushed run_test_accounts_db_cache_clean_max_root( - MAX_CACHE_SLOTS + requested_flush_root as usize + 2, + max_cache_slots() + requested_flush_root as usize + 2, requested_flush_root, None, ); @@ -12080,15 +12129,15 @@ pub mod tests { #[test] fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit_and_scan() { let requested_flush_root = 5; - // Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots + // Test that if there are > max_cache_slots() in the cache after flush, then more roots // will be flushed run_test_accounts_db_cache_clean_max_root( - MAX_CACHE_SLOTS + requested_flush_root as usize + 2, + max_cache_slots() + requested_flush_root as usize + 2, requested_flush_root, Some(requested_flush_root - 1), ); run_test_accounts_db_cache_clean_max_root( - MAX_CACHE_SLOTS + requested_flush_root as usize + 2, + max_cache_slots() + requested_flush_root as usize + 2, requested_flush_root, Some(requested_flush_root + 1), ); @@ -12096,7 +12145,7 @@ pub mod tests { fn run_flush_rooted_accounts_cache(should_clean: bool) { let num_slots = 10; - let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(num_slots, None); + let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(num_slots, None, None); let mut cleaned_bytes = 0; let mut cleaned_accounts = 0; let should_clean_tracker = if should_clean { diff --git a/validator/src/main.rs b/validator/src/main.rs index c5264f19c..733bee6a0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1497,6 +1497,14 @@ pub fn main() { .help("Enables faster starting of validators by skipping shrink. \ This option is for use during testing."), ) + .arg( + Arg::with_name("accounts_db_cache_limit_mb") + .long("accounts-db-cache-limit-mb") + .value_name("MEGABYTES") + .validator(is_parsable::) + .takes_value(true) + .help("How large the write cache for account data can become. If this is exceeded, the cache is flushed more aggressively."), + ) .arg( Arg::with_name("accounts_index_scan_results_limit_mb") .long("accounts-index-scan-results-limit-mb") @@ -2125,6 +2133,9 @@ pub fn main() { index: Some(accounts_index_config), accounts_hash_cache_path: Some(ledger_path.clone()), filler_account_count, + write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64) + .ok() + .map(|mb| mb * MB as u64), ..AccountsDbConfig::default() };