Move clean accounts to background service (#10898)

This commit is contained in:
sakridge 2020-07-02 22:25:17 -07:00 committed by GitHub
parent f1699721ef
commit 832d47317e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 39 deletions

View File

@ -18,25 +18,32 @@ const INTERVAL_MS: u64 = 100;
const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250; const SHRUNKEN_ACCOUNT_PER_SEC: usize = 250;
const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize = const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize); SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
const CLEAN_INTERVAL_SLOTS: u64 = 100;
impl AccountsBackgroundService { impl AccountsBackgroundService {
pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self { pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self {
info!("AccountsBackgroundService active"); info!("AccountsBackgroundService active");
let exit = exit.clone(); let exit = exit.clone();
let mut consumed_budget = 0; let mut consumed_budget = 0;
let mut last_cleaned_slot = 0;
let t_background = Builder::new() let t_background = Builder::new()
.name("solana-accounts-background".to_string()) .name("solana-accounts-background".to_string())
.spawn(move || loop { .spawn(move || loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
} }
let bank = bank_forks.read().unwrap().working_bank(); let bank = bank_forks.read().unwrap().root_bank().clone();
bank.process_dead_slots(); bank.process_dead_slots();
consumed_budget = bank consumed_budget = bank
.process_stale_slot_with_budget(consumed_budget, SHRUNKEN_ACCOUNT_PER_INTERVAL); .process_stale_slot_with_budget(consumed_budget, SHRUNKEN_ACCOUNT_PER_INTERVAL);
if bank.block_height() - last_cleaned_slot > CLEAN_INTERVAL_SLOTS {
bank.clean_accounts();
last_cleaned_slot = bank.block_height();
}
sleep(Duration::from_millis(INTERVAL_MS)); sleep(Duration::from_millis(INTERVAL_MS));
}) })
.unwrap(); .unwrap();

View File

@ -551,7 +551,7 @@ impl AccountsDB {
fn inc_store_counts( fn inc_store_counts(
no_delete_id: AppendVecId, no_delete_id: AppendVecId,
purges: &HashMap<Pubkey, Vec<(Slot, AccountInfo)>>, purges: &HashMap<Pubkey, (SlotList<AccountInfo>, u64)>,
store_counts: &mut HashMap<AppendVecId, usize>, store_counts: &mut HashMap<AppendVecId, usize>,
already_counted: &mut HashSet<AppendVecId>, already_counted: &mut HashSet<AppendVecId>,
) { ) {
@ -561,7 +561,7 @@ impl AccountsDB {
*store_counts.get_mut(&no_delete_id).unwrap() += 1; *store_counts.get_mut(&no_delete_id).unwrap() += 1;
already_counted.insert(no_delete_id); already_counted.insert(no_delete_id);
let mut affected_pubkeys = HashSet::new(); let mut affected_pubkeys = HashSet::new();
for (key, account_infos) in purges { for (key, (account_infos, _ref_count)) in purges {
for (_slot, account_info) in account_infos { for (_slot, account_info) in account_infos {
if account_info.store_id == no_delete_id { if account_info.store_id == no_delete_id {
affected_pubkeys.insert(key); affected_pubkeys.insert(key);
@ -570,7 +570,7 @@ impl AccountsDB {
} }
} }
for key in affected_pubkeys { for key in affected_pubkeys {
for (_slot, account_info) in purges.get(&key).unwrap() { for (_slot, account_info) in &purges.get(&key).unwrap().0 {
Self::inc_store_counts( Self::inc_store_counts(
account_info.store_id, account_info.store_id,
purges, purges,
@ -582,28 +582,26 @@ impl AccountsDB {
} }
fn calc_delete_dependencies( fn calc_delete_dependencies(
accounts_index: &AccountsIndex<AccountInfo>, purges: &HashMap<Pubkey, (SlotList<AccountInfo>, u64)>,
purges: &HashMap<Pubkey, Vec<(Slot, AccountInfo)>>,
store_counts: &mut HashMap<AppendVecId, usize>, store_counts: &mut HashMap<AppendVecId, usize>,
) { ) {
// Another pass to check if there are some filtered accounts which // Another pass to check if there are some filtered accounts which
// do not match the criteria of deleting all appendvecs which contain them // do not match the criteria of deleting all appendvecs which contain them
// then increment their storage count. // then increment their storage count.
let mut already_counted = HashSet::new(); let mut already_counted = HashSet::new();
for (pubkey, account_infos) in purges.iter() { for (_pubkey, (account_infos, ref_count_from_storage)) in purges.iter() {
let no_delete = let no_delete = if account_infos.len() as u64 != *ref_count_from_storage {
if account_infos.len() as u64 != accounts_index.ref_count_from_storage(&pubkey) { true
true } else {
} else { let mut no_delete = false;
let mut no_delete = false; for (_slot, account_info) in account_infos {
for (_slot, account_info) in account_infos { if *store_counts.get(&account_info.store_id).unwrap() != 0 {
if *store_counts.get(&account_info.store_id).unwrap() != 0 { no_delete = true;
no_delete = true; break;
break;
}
} }
no_delete }
}; no_delete
};
if no_delete { if no_delete {
for (_slot_id, account_info) in account_infos { for (_slot_id, account_info) in account_infos {
Self::inc_store_counts( Self::inc_store_counts(
@ -622,6 +620,7 @@ impl AccountsDB {
// Only remove those accounts where the entire rooted history of the account // Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors // can be purged because there are no live append vecs in the ancestors
pub fn clean_accounts(&self) { pub fn clean_accounts(&self) {
use std::iter::FromIterator;
self.report_store_stats(); self.report_store_stats();
let mut accounts_scan = Measure::start("accounts_scan"); let mut accounts_scan = Measure::start("accounts_scan");
@ -666,13 +665,12 @@ impl AccountsDB {
clean_old_rooted.stop(); clean_old_rooted.stop();
let mut store_counts_time = Measure::start("store_counts"); let mut store_counts_time = Measure::start("store_counts");
let accounts_index = self.accounts_index.read().unwrap();
// Calculate store counts as if everything was purged // Calculate store counts as if everything was purged
// Then purge if we can // Then purge if we can
let mut store_counts: HashMap<AppendVecId, usize> = HashMap::new(); let mut store_counts: HashMap<AppendVecId, usize> = HashMap::new();
let storage = self.storage.read().unwrap(); let storage = self.storage.read().unwrap();
for account_infos in purges.values() { for (account_infos, _ref_count) in purges.values() {
for (slot, account_info) in account_infos { for (slot, account_info) in account_infos {
let slot_storage = storage.0.get(&slot).unwrap(); let slot_storage = storage.0.get(&slot).unwrap();
let store = slot_storage.get(&account_info.store_id).unwrap(); let store = slot_storage.get(&account_info.store_id).unwrap();
@ -686,15 +684,17 @@ impl AccountsDB {
} }
} }
} }
Self::calc_delete_dependencies(&accounts_index, &purges, &mut store_counts);
store_counts_time.stop(); store_counts_time.stop();
drop(storage);
let mut calc_deps_time = Measure::start("calc_deps");
Self::calc_delete_dependencies(&purges, &mut store_counts);
calc_deps_time.stop();
// Only keep purges where the entire history of the account in the root set // Only keep purges where the entire history of the account in the root set
// can be purged. All AppendVecs for those updates are dead. // can be purged. All AppendVecs for those updates are dead.
let mut purge_filter = Measure::start("purge_filter"); let mut purge_filter = Measure::start("purge_filter");
purges.retain(|_pubkey, account_infos| { purges.retain(|_pubkey, (account_infos, _ref_count)| {
for (_slot, account_info) in account_infos.iter() { for (_slot, account_info) in account_infos.iter() {
if *store_counts.get(&account_info.store_id).unwrap() != 0 { if *store_counts.get(&account_info.store_id).unwrap() != 0 {
return false; return false;
@ -706,17 +706,26 @@ impl AccountsDB {
let mut reclaims_time = Measure::start("reclaims"); let mut reclaims_time = Measure::start("reclaims");
// Recalculate reclaims with new purge set // Recalculate reclaims with new purge set
let purges_key_to_slot_set: Vec<_> = purges
.into_iter()
.map(|(key, (slots_list, _ref_count))| {
(
key,
HashSet::from_iter(slots_list.into_iter().map(|(slot, _)| slot)),
)
})
.collect();
let accounts_index = self.accounts_index.read().unwrap();
let mut reclaims = Vec::new(); let mut reclaims = Vec::new();
let mut dead_keys = Vec::new(); let mut dead_keys = Vec::new();
for pubkey in purges.keys() { for (pubkey, slots_set) in purges_key_to_slot_set {
let (new_reclaims, is_empty) = accounts_index.purge(&pubkey); let (new_reclaims, is_empty) = accounts_index.purge_exact(&pubkey, slots_set);
if is_empty { if is_empty {
dead_keys.push(*pubkey); dead_keys.push(pubkey);
} }
reclaims.extend(new_reclaims); reclaims.extend(new_reclaims);
} }
drop(storage);
drop(accounts_index); drop(accounts_index);
if !dead_keys.is_empty() { if !dead_keys.is_empty() {
@ -728,9 +737,13 @@ impl AccountsDB {
self.handle_reclaims_maybe_cleanup(&reclaims); self.handle_reclaims_maybe_cleanup(&reclaims);
reclaims_time.stop(); reclaims_time.stop();
debug!( datapoint_info!(
"clean_accounts: {} {} {} {}", "clean_accounts",
accounts_scan, store_counts_time, purge_filter, reclaims_time ("accounts_scan", accounts_scan.as_us() as i64, i64),
("store_counts", store_counts_time.as_us() as i64, i64),
("purge_filter", purge_filter.as_us() as i64, i64),
("calc_deps", calc_deps_time.as_us() as i64, i64),
("reclaims", reclaims_time.as_us() as i64, i64),
); );
} }
@ -4060,8 +4073,8 @@ pub mod tests {
purges.insert(key0, accounts_index.would_purge(&key0)); purges.insert(key0, accounts_index.would_purge(&key0));
purges.insert(key1, accounts_index.would_purge(&key1)); purges.insert(key1, accounts_index.would_purge(&key1));
purges.insert(key2, accounts_index.would_purge(&key2)); purges.insert(key2, accounts_index.would_purge(&key2));
for (key, list) in &purges { for (key, (list, ref_count)) in &purges {
info!(" purge {} =>", key); info!(" purge {} ref_count {} =>", key, ref_count);
for x in list { for x in list {
info!(" {:?}", x); info!(" {:?}", x);
} }
@ -4072,7 +4085,7 @@ pub mod tests {
store_counts.insert(1, 0); store_counts.insert(1, 0);
store_counts.insert(2, 0); store_counts.insert(2, 0);
store_counts.insert(3, 1); store_counts.insert(3, 1);
AccountsDB::calc_delete_dependencies(&accounts_index, &purges, &mut store_counts); AccountsDB::calc_delete_dependencies(&purges, &mut store_counts);
let mut stores: Vec<_> = store_counts.keys().cloned().collect(); let mut stores: Vec<_> = store_counts.keys().cloned().collect();
stores.sort(); stores.sort();
for store in &stores { for store in &stores {

View File

@ -62,9 +62,14 @@ impl<'a, T: 'a + Clone> AccountsIndex<T> {
.collect() .collect()
} }
pub fn would_purge(&self, pubkey: &Pubkey) -> SlotList<T> { // returns the rooted entries and the storage ref count
let list = &self.account_maps.get(&pubkey).unwrap().1.read().unwrap(); pub fn would_purge(&self, pubkey: &Pubkey) -> (SlotList<T>, RefCount) {
self.get_rooted_entries(&list) let (ref_count, slots_list) = self.account_maps.get(&pubkey).unwrap();
let slots_list_r = &slots_list.read().unwrap();
(
self.get_rooted_entries(&slots_list_r),
ref_count.load(Ordering::Relaxed),
)
} }
// filter any rooted entries and return them along with a bool that indicates // filter any rooted entries and return them along with a bool that indicates
@ -76,6 +81,17 @@ impl<'a, T: 'a + Clone> AccountsIndex<T> {
(reclaims, list.is_empty()) (reclaims, list.is_empty())
} }
pub fn purge_exact(&self, pubkey: &Pubkey, slots: HashSet<Slot>) -> (SlotList<T>, bool) {
let list = &mut self.account_maps.get(&pubkey).unwrap().1.write().unwrap();
let reclaims = list
.iter()
.filter(|(slot, _)| slots.contains(&slot))
.cloned()
.collect();
list.retain(|(slot, _)| !slots.contains(slot));
(reclaims, list.is_empty())
}
// find the latest slot and T in a slice for a given ancestor // find the latest slot and T in a slice for a given ancestor
// returns index into 'slice' if found, None if not. // returns index into 'slice' if found, None if not.
fn latest_slot(&self, ancestors: Option<&Ancestors>, slice: SlotSlice<T>) -> Option<usize> { fn latest_slot(&self, ancestors: Option<&Ancestors>, slice: SlotSlice<T>) -> Option<usize> {

View File

@ -229,7 +229,6 @@ impl BankForks {
bank.squash(); bank.squash();
is_root_bank_squashed = bank_slot == root; is_root_bank_squashed = bank_slot == root;
bank.clean_accounts();
bank.update_accounts_hash(); bank.update_accounts_hash();
if self.snapshot_config.is_some() && accounts_package_sender.is_some() { if self.snapshot_config.is_some() && accounts_package_sender.is_some() {