From 832d47317e8b64d879e144c043f5db8a4ab0173a Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 2 Jul 2020 22:25:17 -0700 Subject: [PATCH] Move clean accounts to background service (#10898) --- core/src/accounts_background_service.rs | 9 ++- runtime/src/accounts_db.rs | 81 ++++++++++++++----------- runtime/src/accounts_index.rs | 22 ++++++- runtime/src/bank_forks.rs | 1 - 4 files changed, 74 insertions(+), 39 deletions(-) diff --git a/core/src/accounts_background_service.rs b/core/src/accounts_background_service.rs index 9d26925ed6..030bdb3ff3 100644 --- a/core/src/accounts_background_service.rs +++ b/core/src/accounts_background_service.rs @@ -18,25 +18,32 @@ const INTERVAL_MS: u64 = 100; const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250; const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize = SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize); +const CLEAN_INTERVAL_SLOTS: u64 = 100; impl AccountsBackgroundService { pub fn new(bank_forks: Arc>, exit: &Arc) -> Self { info!("AccountsBackgroundService active"); let exit = exit.clone(); let mut consumed_budget = 0; + let mut last_cleaned_slot = 0; let t_background = Builder::new() .name("solana-accounts-background".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; } - let bank = bank_forks.read().unwrap().working_bank(); + let bank = bank_forks.read().unwrap().root_bank().clone(); bank.process_dead_slots(); consumed_budget = bank .process_stale_slot_with_budget(consumed_budget, SHRUNKEN_ACCOUNT_PER_INTERVAL); + if bank.block_height() - last_cleaned_slot > CLEAN_INTERVAL_SLOTS { + bank.clean_accounts(); + last_cleaned_slot = bank.block_height(); + } + sleep(Duration::from_millis(INTERVAL_MS)); }) .unwrap(); diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 49e7ccba0a..f99b0ecb95 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -551,7 +551,7 @@ impl AccountsDB { fn inc_store_counts( no_delete_id: AppendVecId, - purges: &HashMap>, + purges: &HashMap, u64)>, store_counts: &mut HashMap, already_counted: &mut HashSet, ) { @@ -561,7 +561,7 @@ impl AccountsDB { *store_counts.get_mut(&no_delete_id).unwrap() += 1; already_counted.insert(no_delete_id); let mut affected_pubkeys = HashSet::new(); - for (key, account_infos) in purges { + for (key, (account_infos, _ref_count)) in purges { for (_slot, account_info) in account_infos { if account_info.store_id == no_delete_id { affected_pubkeys.insert(key); @@ -570,7 +570,7 @@ impl AccountsDB { } } for key in affected_pubkeys { - for (_slot, account_info) in purges.get(&key).unwrap() { + for (_slot, account_info) in &purges.get(&key).unwrap().0 { Self::inc_store_counts( account_info.store_id, purges, @@ -582,28 +582,26 @@ impl AccountsDB { } fn calc_delete_dependencies( - accounts_index: &AccountsIndex, - purges: &HashMap>, + purges: &HashMap, u64)>, store_counts: &mut HashMap, ) { // Another pass to check if there are some filtered accounts which // do not match the criteria of deleting all appendvecs which contain them // then increment their storage count. let mut already_counted = HashSet::new(); - for (pubkey, account_infos) in purges.iter() { - let no_delete = - if account_infos.len() as u64 != accounts_index.ref_count_from_storage(&pubkey) { - true - } else { - let mut no_delete = false; - for (_slot, account_info) in account_infos { - if *store_counts.get(&account_info.store_id).unwrap() != 0 { - no_delete = true; - break; - } + for (_pubkey, (account_infos, ref_count_from_storage)) in purges.iter() { + let no_delete = if account_infos.len() as u64 != *ref_count_from_storage { + true + } else { + let mut no_delete = false; + for (_slot, account_info) in account_infos { + if *store_counts.get(&account_info.store_id).unwrap() != 0 { + no_delete = true; + break; } - no_delete - }; + } + no_delete + }; if no_delete { for (_slot_id, account_info) in account_infos { Self::inc_store_counts( @@ -622,6 +620,7 @@ impl AccountsDB { // Only remove those accounts where the entire rooted history of the account // can be purged because there are no live append vecs in the ancestors pub fn clean_accounts(&self) { + use std::iter::FromIterator; self.report_store_stats(); let mut accounts_scan = Measure::start("accounts_scan"); @@ -666,13 +665,12 @@ impl AccountsDB { clean_old_rooted.stop(); let mut store_counts_time = Measure::start("store_counts"); - let accounts_index = self.accounts_index.read().unwrap(); // Calculate store counts as if everything was purged // Then purge if we can let mut store_counts: HashMap = HashMap::new(); let storage = self.storage.read().unwrap(); - for account_infos in purges.values() { + for (account_infos, _ref_count) in purges.values() { for (slot, account_info) in account_infos { let slot_storage = storage.0.get(&slot).unwrap(); let store = slot_storage.get(&account_info.store_id).unwrap(); @@ -686,15 +684,17 @@ impl AccountsDB { } } } - - Self::calc_delete_dependencies(&accounts_index, &purges, &mut store_counts); - store_counts_time.stop(); + drop(storage); + + let mut calc_deps_time = Measure::start("calc_deps"); + Self::calc_delete_dependencies(&purges, &mut store_counts); + calc_deps_time.stop(); // Only keep purges where the entire history of the account in the root set // can be purged. All AppendVecs for those updates are dead. let mut purge_filter = Measure::start("purge_filter"); - purges.retain(|_pubkey, account_infos| { + purges.retain(|_pubkey, (account_infos, _ref_count)| { for (_slot, account_info) in account_infos.iter() { if *store_counts.get(&account_info.store_id).unwrap() != 0 { return false; @@ -706,17 +706,26 @@ impl AccountsDB { let mut reclaims_time = Measure::start("reclaims"); // Recalculate reclaims with new purge set + let purges_key_to_slot_set: Vec<_> = purges + .into_iter() + .map(|(key, (slots_list, _ref_count))| { + ( + key, + HashSet::from_iter(slots_list.into_iter().map(|(slot, _)| slot)), + ) + }) + .collect(); + let accounts_index = self.accounts_index.read().unwrap(); let mut reclaims = Vec::new(); let mut dead_keys = Vec::new(); - for pubkey in purges.keys() { - let (new_reclaims, is_empty) = accounts_index.purge(&pubkey); + for (pubkey, slots_set) in purges_key_to_slot_set { + let (new_reclaims, is_empty) = accounts_index.purge_exact(&pubkey, slots_set); if is_empty { - dead_keys.push(*pubkey); + dead_keys.push(pubkey); } reclaims.extend(new_reclaims); } - drop(storage); drop(accounts_index); if !dead_keys.is_empty() { @@ -728,9 +737,13 @@ impl AccountsDB { self.handle_reclaims_maybe_cleanup(&reclaims); reclaims_time.stop(); - debug!( - "clean_accounts: {} {} {} {}", - accounts_scan, store_counts_time, purge_filter, reclaims_time + datapoint_info!( + "clean_accounts", + ("accounts_scan", accounts_scan.as_us() as i64, i64), + ("store_counts", store_counts_time.as_us() as i64, i64), + ("purge_filter", purge_filter.as_us() as i64, i64), + ("calc_deps", calc_deps_time.as_us() as i64, i64), + ("reclaims", reclaims_time.as_us() as i64, i64), ); } @@ -4060,8 +4073,8 @@ pub mod tests { purges.insert(key0, accounts_index.would_purge(&key0)); purges.insert(key1, accounts_index.would_purge(&key1)); purges.insert(key2, accounts_index.would_purge(&key2)); - for (key, list) in &purges { - info!(" purge {} =>", key); + for (key, (list, ref_count)) in &purges { + info!(" purge {} ref_count {} =>", key, ref_count); for x in list { info!(" {:?}", x); } @@ -4072,7 +4085,7 @@ pub mod tests { store_counts.insert(1, 0); store_counts.insert(2, 0); store_counts.insert(3, 1); - AccountsDB::calc_delete_dependencies(&accounts_index, &purges, &mut store_counts); + AccountsDB::calc_delete_dependencies(&purges, &mut store_counts); let mut stores: Vec<_> = store_counts.keys().cloned().collect(); stores.sort(); for store in &stores { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 5553094317..cd43d7095e 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -62,9 +62,14 @@ impl<'a, T: 'a + Clone> AccountsIndex { .collect() } - pub fn would_purge(&self, pubkey: &Pubkey) -> SlotList { - let list = &self.account_maps.get(&pubkey).unwrap().1.read().unwrap(); - self.get_rooted_entries(&list) + // returns the rooted entries and the storage ref count + pub fn would_purge(&self, pubkey: &Pubkey) -> (SlotList, RefCount) { + let (ref_count, slots_list) = self.account_maps.get(&pubkey).unwrap(); + let slots_list_r = &slots_list.read().unwrap(); + ( + self.get_rooted_entries(&slots_list_r), + ref_count.load(Ordering::Relaxed), + ) } // filter any rooted entries and return them along with a bool that indicates @@ -76,6 +81,17 @@ impl<'a, T: 'a + Clone> AccountsIndex { (reclaims, list.is_empty()) } + pub fn purge_exact(&self, pubkey: &Pubkey, slots: HashSet) -> (SlotList, bool) { + let list = &mut self.account_maps.get(&pubkey).unwrap().1.write().unwrap(); + let reclaims = list + .iter() + .filter(|(slot, _)| slots.contains(&slot)) + .cloned() + .collect(); + list.retain(|(slot, _)| !slots.contains(slot)); + (reclaims, list.is_empty()) + } + // find the latest slot and T in a slice for a given ancestor // returns index into 'slice' if found, None if not. fn latest_slot(&self, ancestors: Option<&Ancestors>, slice: SlotSlice) -> Option { diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index dd723004cf..fadfab928d 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -229,7 +229,6 @@ impl BankForks { bank.squash(); is_root_bank_squashed = bank_slot == root; - bank.clean_accounts(); bank.update_accounts_hash(); if self.snapshot_config.is_some() && accounts_package_sender.is_some() {