aggressively flush cache based on cache size (#21332)

* aggressively flush cache based on cache size

* find_older_frozen_slots -> cached_frozen_slots

* remove 'WRITE_CACHE_LIMIT_BYTES_DEFAULT'

* tweaks to stats

* fix tests
This commit is contained in:
Jeff Washington (jwash) 2021-12-01 13:10:48 -06:00 committed by GitHub
parent 1eefdeba85
commit 308d7d40d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 135 additions and 83 deletions

View File

@ -289,26 +289,21 @@ impl AccountsCache {
removed_slots
}
pub fn find_older_frozen_slots(&self, num_to_retain: usize) -> Vec<Slot> {
if self.cache.len() > num_to_retain {
let mut slots: Vec<_> = self
.cache
.iter()
.filter_map(|item| {
let (slot, slot_cache) = item.pair();
if slot_cache.is_frozen() {
Some(*slot)
} else {
None
}
})
.collect();
slots.sort_unstable();
slots.truncate(slots.len().saturating_sub(num_to_retain));
slots
} else {
vec![]
}
pub fn cached_frozen_slots(&self) -> Vec<Slot> {
let mut slots: Vec<_> = self
.cache
.iter()
.filter_map(|item| {
let (slot, slot_cache) = item.pair();
if slot_cache.is_frozen() {
Some(*slot)
} else {
None
}
})
.collect();
slots.sort_unstable();
slots
}
pub fn num_slots(&self) -> usize {
@ -347,10 +342,10 @@ pub mod tests {
}
#[test]
fn test_find_older_frozen_slots() {
fn test_cached_frozen_slots() {
let cache = AccountsCache::default();
// Cache is empty, should return nothing
assert!(cache.find_older_frozen_slots(0).is_empty());
assert!(cache.cached_frozen_slots().is_empty());
let inserted_slot = 0;
cache.store(
inserted_slot,
@ -359,14 +354,11 @@ pub mod tests {
Some(&Hash::default()),
);
// If the cache is told the size limit is 0, it should return nothing because there's only
// one cached slot
assert!(cache.find_older_frozen_slots(1).is_empty());
// If the cache is told the size limit is 0, it should return nothing, because there's no
// frozen slots
assert!(cache.find_older_frozen_slots(0).is_empty());
assert!(cache.cached_frozen_slots().is_empty());
cache.slot_cache(inserted_slot).unwrap().mark_slot_frozen();
// If the cache is told the size limit is 0, it should return the one frozen slot
assert_eq!(cache.find_older_frozen_slots(0), vec![inserted_slot]);
assert_eq!(cache.cached_frozen_slots(), vec![inserted_slot]);
}
}

View File

@ -84,7 +84,9 @@ use std::{thread::sleep, time::Duration};
const PAGE_SIZE: u64 = 4 * 1024;
const MAX_RECYCLE_STORES: usize = 1000;
const STORE_META_OVERHEAD: usize = 256;
const MAX_CACHE_SLOTS: usize = 200;
// when the accounts write cache exceeds this many bytes, we will flush it
// this can be specified on the command line, too (--accounts-db-cache-limit-mb)
const WRITE_CACHE_LIMIT_BYTES_DEFAULT: u64 = 15_000_000_000;
const FLUSH_CACHE_RANDOM_THRESHOLD: usize = MAX_LOCKOUT_HISTORY;
const SCAN_SLOT_PAR_ITER_THRESHOLD: usize = 4000;
@ -129,12 +131,14 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
accounts_hash_cache_path: None,
filler_account_count: None,
hash_calc_num_passes: None,
write_cache_limit_bytes: None,
};
pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
accounts_hash_cache_path: None,
filler_account_count: None,
hash_calc_num_passes: None,
write_cache_limit_bytes: None,
};
pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
@ -151,6 +155,7 @@ pub struct AccountsDbConfig {
pub accounts_hash_cache_path: Option<PathBuf>,
pub filler_account_count: Option<usize>,
pub hash_calc_num_passes: Option<usize>,
pub write_cache_limit_bytes: Option<u64>,
}
struct FoundStoredAccount<'a> {
@ -960,6 +965,8 @@ pub struct AccountsDb {
pub accounts_cache: AccountsCache,
write_cache_limit_bytes: Option<u64>,
sender_bg_hasher: Option<Sender<CachedAccount>>,
pub read_only_accounts_cache: ReadOnlyAccountsCache,
@ -1186,7 +1193,7 @@ impl PurgeStats {
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
struct FlushStats {
#[allow(dead_code)]
slot: Slot,
@ -1554,6 +1561,7 @@ impl AccountsDb {
next_id: AtomicUsize::new(0),
shrink_candidate_slots_v1: Mutex::new(Vec::new()),
shrink_candidate_slots: Mutex::new(HashMap::new()),
write_cache_limit_bytes: None,
write_version: AtomicU64::new(0),
paths: vec![],
accounts_hash_cache_path,
@ -1649,6 +1657,9 @@ impl AccountsDb {
accounts_update_notifier,
filler_account_count,
filler_account_suffix,
write_cache_limit_bytes: accounts_db_config
.as_ref()
.and_then(|x| x.write_cache_limit_bytes),
..Self::default_with_accounts_index(
accounts_index,
accounts_hash_cache_path,
@ -4489,16 +4500,21 @@ impl AccountsDb {
self.flush_slot_cache(slot, None::<&mut fn(&_, &_) -> bool>);
}
/// true if write cache is too big
fn should_aggressively_flush_cache(&self) -> bool {
self.write_cache_limit_bytes
.unwrap_or(WRITE_CACHE_LIMIT_BYTES_DEFAULT)
< self.accounts_cache.size()
}
// `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then
// flushes:
// 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
// 2) It there are still > MAX_CACHE_SLOTS remaining slots in the cache, the excess
// unrooted slots
// 1) excess remaining roots or unrooted slots while 'should_aggressively_flush_cache' is true
pub fn flush_accounts_cache(&self, force_flush: bool, requested_flush_root: Option<Slot>) {
#[cfg(not(test))]
assert!(requested_flush_root.is_some());
if !force_flush && self.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
if !force_flush && !self.should_aggressively_flush_cache() {
return;
}
@ -4522,9 +4538,9 @@ impl AccountsDb {
// for those slot, let the Bank::drop() implementation do cleanup instead on dead
// banks
// If there are > MAX_CACHE_SLOTS, then flush the excess ones to storage
// If 'should_aggressively_flush_cache', then flush the excess ones to storage
let (total_new_excess_roots, num_excess_roots_flushed) =
if self.accounts_cache.num_slots() > MAX_CACHE_SLOTS {
if self.should_aggressively_flush_cache() {
// Start by flushing the roots
//
// Cannot do any cleaning on roots past `requested_flush_root` because future
@ -4534,26 +4550,40 @@ impl AccountsDb {
} else {
(0, 0)
};
let old_slots = self.accounts_cache.find_older_frozen_slots(MAX_CACHE_SLOTS);
let excess_slot_count = old_slots.len();
let mut excess_slot_count = 0;
let mut unflushable_unrooted_slot_count = 0;
let max_flushed_root = self.accounts_cache.fetch_max_flush_root();
let old_slot_flush_stats: Vec<_> = old_slots
.into_iter()
.filter_map(|old_slot| {
if self.should_aggressively_flush_cache() {
let old_slots = self.accounts_cache.cached_frozen_slots();
excess_slot_count = old_slots.len();
let mut flush_stats = FlushStats::default();
old_slots.into_iter().for_each(|old_slot| {
// Don't flush slots that are known to be unrooted
if old_slot > max_flushed_root {
Some(self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>))
if self.should_aggressively_flush_cache() {
if let Some(stats) =
self.flush_slot_cache(old_slot, None::<&mut fn(&_, &_) -> bool>)
{
flush_stats.num_flushed += stats.num_flushed;
flush_stats.num_purged += stats.num_purged;
flush_stats.total_size += stats.total_size;
}
}
} else {
unflushable_unrooted_slot_count += 1;
None
}
})
.collect();
info!(
"req_flush_root: {:?} old_slot_flushes: {:?}",
requested_flush_root, old_slot_flush_stats
);
});
datapoint_info!(
"accounts_db-flush_accounts_cache_aggressively",
("num_flushed", flush_stats.num_flushed, i64),
("num_purged", flush_stats.num_purged, i64),
("total_flush_size", flush_stats.total_size, i64),
("total_cache_size", self.accounts_cache.size(), i64),
("total_frozen_slots", excess_slot_count, i64),
("total_slots", self.accounts_cache.num_slots(), i64),
);
}
datapoint_info!(
"accounts_db-flush_accounts_cache",
@ -4582,7 +4612,7 @@ impl AccountsDb {
let num_slots_remaining = self.accounts_cache.num_slots();
if force_flush && num_slots_remaining >= FLUSH_CACHE_RANDOM_THRESHOLD {
// Don't flush slots that are known to be unrooted
let mut frozen_slots = self.accounts_cache.find_older_frozen_slots(0);
let mut frozen_slots = self.accounts_cache.cached_frozen_slots();
frozen_slots.retain(|s| *s > max_flushed_root);
// Remove a random index 0 <= i < `frozen_slots.len()`
let rand_slot = frozen_slots.choose(&mut thread_rng());
@ -11286,25 +11316,32 @@ pub mod tests {
);
}
fn max_cache_slots() -> usize {
// this used to be the limiting factor - used here to facilitate tests.
200
}
#[test]
fn test_flush_accounts_cache_if_needed() {
run_test_flush_accounts_cache_if_needed(0, 2 * MAX_CACHE_SLOTS);
run_test_flush_accounts_cache_if_needed(2 * MAX_CACHE_SLOTS, 0);
run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS - 1, 0);
run_test_flush_accounts_cache_if_needed(0, MAX_CACHE_SLOTS - 1);
run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS, 0);
run_test_flush_accounts_cache_if_needed(0, MAX_CACHE_SLOTS);
run_test_flush_accounts_cache_if_needed(2 * MAX_CACHE_SLOTS, 2 * MAX_CACHE_SLOTS);
run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS - 1, MAX_CACHE_SLOTS - 1);
run_test_flush_accounts_cache_if_needed(MAX_CACHE_SLOTS, MAX_CACHE_SLOTS);
run_test_flush_accounts_cache_if_needed(0, 2 * max_cache_slots());
run_test_flush_accounts_cache_if_needed(2 * max_cache_slots(), 0);
run_test_flush_accounts_cache_if_needed(max_cache_slots() - 1, 0);
run_test_flush_accounts_cache_if_needed(0, max_cache_slots() - 1);
run_test_flush_accounts_cache_if_needed(max_cache_slots(), 0);
run_test_flush_accounts_cache_if_needed(0, max_cache_slots());
run_test_flush_accounts_cache_if_needed(2 * max_cache_slots(), 2 * max_cache_slots());
run_test_flush_accounts_cache_if_needed(max_cache_slots() - 1, max_cache_slots() - 1);
run_test_flush_accounts_cache_if_needed(max_cache_slots(), max_cache_slots());
}
fn run_test_flush_accounts_cache_if_needed(num_roots: usize, num_unrooted: usize) {
let mut db = AccountsDb::new(Vec::new(), &ClusterType::Development);
db.write_cache_limit_bytes = Some(max_cache_slots() as u64);
db.caching_enabled = true;
let account0 = AccountSharedData::new(1, 0, &Pubkey::default());
let space = 1; // # data bytes per account. write cache counts data len
let account0 = AccountSharedData::new(1, space, &Pubkey::default());
let mut keys = vec![];
let num_slots = 2 * MAX_CACHE_SLOTS;
let num_slots = 2 * max_cache_slots();
for i in 0..num_roots + num_unrooted {
let key = Pubkey::new_unique();
db.store_cached(i as Slot, &[(&key, &account0)]);
@ -11319,18 +11356,24 @@ pub mod tests {
let total_slots = num_roots + num_unrooted;
// If there's <= the max size, then nothing will be flushed from the slot
if total_slots <= MAX_CACHE_SLOTS {
if total_slots <= max_cache_slots() {
assert_eq!(db.accounts_cache.num_slots(), total_slots);
} else {
// Otherwise, all the roots are flushed, and only at most MAX_CACHE_SLOTS
// Otherwise, all the roots are flushed, and only at most max_cache_slots()
// of the unrooted slots are kept in the cache
let expected_size = std::cmp::min(num_unrooted, MAX_CACHE_SLOTS);
let expected_size = std::cmp::min(num_unrooted, max_cache_slots());
if expected_size > 0 {
for unrooted_slot in total_slots - expected_size..total_slots {
assert!(db
.accounts_cache
.slot_cache(unrooted_slot as Slot)
.is_some());
// +1: slot is 1-based. slot 1 has 1 byte of data
for unrooted_slot in (total_slots - expected_size + 1)..total_slots {
assert!(
db.accounts_cache
.slot_cache(unrooted_slot as Slot)
.is_some(),
"unrooted_slot: {}, total_slots: {}, expected_size: {}",
unrooted_slot,
total_slots,
expected_size
);
}
}
}
@ -11751,15 +11794,19 @@ pub mod tests {
fn setup_accounts_db_cache_clean(
num_slots: usize,
scan_slot: Option<Slot>,
write_cache_limit_bytes: Option<u64>,
) -> (Arc<AccountsDb>, Vec<Pubkey>, Vec<Slot>, Option<ScanTracker>) {
let caching_enabled = true;
let accounts_db = Arc::new(AccountsDb::new_with_config_for_tests(
let mut accounts_db = AccountsDb::new_with_config_for_tests(
Vec::new(),
&ClusterType::Development,
AccountSecondaryIndexes::default(),
caching_enabled,
AccountShrinkThreshold::default(),
));
);
accounts_db.write_cache_limit_bytes = write_cache_limit_bytes;
let accounts_db = Arc::new(accounts_db);
let slots: Vec<_> = (0..num_slots as Slot).into_iter().collect();
let stall_slot = num_slots as Slot;
let scan_stall_key = Pubkey::new_unique();
@ -11781,9 +11828,10 @@ pub mod tests {
let mut scan_tracker = None;
for slot in &slots {
for key in &keys[*slot as usize..] {
let space = 1; // 1 byte allows us to track by size
accounts_db.store_cached(
*slot,
&[(key, &AccountSharedData::new(1, 0, &Pubkey::default()))],
&[(key, &AccountSharedData::new(1, space, &Pubkey::default()))],
);
}
accounts_db.add_root(*slot as Slot);
@ -11805,8 +11853,8 @@ pub mod tests {
accounts_db.accounts_cache.remove_slot(stall_slot);
// If there's <= MAX_CACHE_SLOTS, no slots should be flushed
if accounts_db.accounts_cache.num_slots() <= MAX_CACHE_SLOTS {
// If there's <= max_cache_slots(), no slots should be flushed
if accounts_db.accounts_cache.num_slots() <= max_cache_slots() {
accounts_db.flush_accounts_cache(false, None);
assert_eq!(accounts_db.accounts_cache.num_slots(), num_slots);
}
@ -11817,7 +11865,8 @@ pub mod tests {
#[test]
fn test_accounts_db_cache_clean_dead_slots() {
let num_slots = 10;
let (accounts_db, keys, mut slots, _) = setup_accounts_db_cache_clean(num_slots, None);
let (accounts_db, keys, mut slots, _) =
setup_accounts_db_cache_clean(num_slots, None, None);
let last_dead_slot = (num_slots - 1) as Slot;
assert_eq!(*slots.last().unwrap(), last_dead_slot);
let alive_slot = last_dead_slot as Slot + 1;
@ -11894,7 +11943,7 @@ pub mod tests {
#[test]
fn test_accounts_db_cache_clean() {
let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(10, None);
let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(10, None, None);
// If no `max_clean_root` is specified, cleaning should purge all flushed slots
accounts_db.flush_accounts_cache(true, None);
@ -11935,8 +11984,8 @@ pub mod tests {
) {
assert!(requested_flush_root < (num_slots as Slot));
let (accounts_db, keys, slots, scan_tracker) =
setup_accounts_db_cache_clean(num_slots, scan_root);
let is_cache_at_limit = num_slots - requested_flush_root as usize - 1 > MAX_CACHE_SLOTS;
setup_accounts_db_cache_clean(num_slots, scan_root, Some(max_cache_slots() as u64));
let is_cache_at_limit = num_slots - requested_flush_root as usize - 1 > max_cache_slots();
// If:
// 1) `requested_flush_root` is specified,
@ -12068,10 +12117,10 @@ pub mod tests {
#[test]
fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit() {
let requested_flush_root = 5;
// Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots
// Test that if there are > max_cache_slots() in the cache after flush, then more roots
// will be flushed
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + requested_flush_root as usize + 2,
max_cache_slots() + requested_flush_root as usize + 2,
requested_flush_root,
None,
);
@ -12080,15 +12129,15 @@ pub mod tests {
#[test]
fn test_accounts_db_cache_clean_max_root_with_cache_limit_hit_and_scan() {
let requested_flush_root = 5;
// Test that if there are > MAX_CACHE_SLOTS in the cache after flush, then more roots
// Test that if there are > max_cache_slots() in the cache after flush, then more roots
// will be flushed
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + requested_flush_root as usize + 2,
max_cache_slots() + requested_flush_root as usize + 2,
requested_flush_root,
Some(requested_flush_root - 1),
);
run_test_accounts_db_cache_clean_max_root(
MAX_CACHE_SLOTS + requested_flush_root as usize + 2,
max_cache_slots() + requested_flush_root as usize + 2,
requested_flush_root,
Some(requested_flush_root + 1),
);
@ -12096,7 +12145,7 @@ pub mod tests {
fn run_flush_rooted_accounts_cache(should_clean: bool) {
let num_slots = 10;
let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(num_slots, None);
let (accounts_db, keys, slots, _) = setup_accounts_db_cache_clean(num_slots, None, None);
let mut cleaned_bytes = 0;
let mut cleaned_accounts = 0;
let should_clean_tracker = if should_clean {

View File

@ -1497,6 +1497,14 @@ pub fn main() {
.help("Enables faster starting of validators by skipping shrink. \
This option is for use during testing."),
)
.arg(
Arg::with_name("accounts_db_cache_limit_mb")
.long("accounts-db-cache-limit-mb")
.value_name("MEGABYTES")
.validator(is_parsable::<u64>)
.takes_value(true)
.help("How large the write cache for account data can become. If this is exceeded, the cache is flushed more aggressively."),
)
.arg(
Arg::with_name("accounts_index_scan_results_limit_mb")
.long("accounts-index-scan-results-limit-mb")
@ -2125,6 +2133,9 @@ pub fn main() {
index: Some(accounts_index_config),
accounts_hash_cache_path: Some(ledger_path.clone()),
filler_account_count,
write_cache_limit_bytes: value_t!(matches, "accounts_db_cache_limit_mb", u64)
.ok()
.map(|mb| mb * MB as u64),
..AccountsDbConfig::default()
};