Do periodic inbound cleaning for rooted slots (#8436)

* Do periodic inbound compaction for rooted slots

* Add comment

* nits

* Consider not_compacted_roots in cleanup_dead_slot

* Renames in AccountsIndex

* Rename to reflect expansion of removed accounts

* Fix a comment

* rename

* Parallelize clean over AccountsIndex

* Some niceties

* Reduce locks and real chunked parallelism

* Measure each step for sampling opportunities

* Just noticed par iter is maybe lazy

* Replace storage scan with optimized index scan

* Various clean-ups

* Clear uncleared_roots even if no updates
This commit is contained in:
Ryo Onodera 2020-03-03 14:57:25 +09:00 committed by GitHub
parent 1265afebbb
commit d86103383a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 262 additions and 50 deletions

View File

@ -337,7 +337,7 @@ pub fn add_snapshot<P: AsRef<Path>>(
bank: &Bank, bank: &Bank,
snapshot_storages: &[SnapshotStorage], snapshot_storages: &[SnapshotStorage],
) -> Result<SlotSnapshotPaths> { ) -> Result<SlotSnapshotPaths> {
bank.purge_zero_lamport_accounts(); bank.clean_accounts();
bank.update_accounts_hash(); bank.update_accounts_hash();
let slot = bank.slot(); let slot = bank.slot();
// snapshot_path/slot // snapshot_path/slot

View File

@ -638,18 +638,67 @@ impl AccountsDB {
false false
} }
// Purge zero lamport accounts for garbage collection purposes // Reclaim older states of rooted non-zero lamport accounts as a general
// AccountsDB bloat mitigation and preprocess for better zero-lamport purging.
fn clean_old_rooted_accounts(&self, purges_in_root: Vec<Pubkey>) {
// This number isn't carefully chosen; just guessed randomly such that
// the hot loop will be the order of ~Xms.
const INDEX_CLEAN_BULK_COUNT: usize = 4096;
let mut measure = Measure::start("clean_old_root-ms");
let reclaim_vecs =
purges_in_root
.par_chunks(INDEX_CLEAN_BULK_COUNT)
.map(|pubkeys: &[Pubkey]| {
let mut reclaims = Vec::new();
let accounts_index = self.accounts_index.read().unwrap();
for pubkey in pubkeys {
accounts_index.clean_rooted_entries(&pubkey, &mut reclaims);
}
reclaims
});
let reclaims: Vec<_> = reclaim_vecs.flatten().collect();
measure.stop();
inc_new_counter_info!("clean-old-root-par-clean-ms", measure.as_ms() as usize);
let mut measure = Measure::start("clean_old_root-ms");
self.handle_reclaims(&reclaims);
measure.stop();
inc_new_counter_info!("clean-old-root-reclaim-ms", measure.as_ms() as usize);
}
fn clear_uncleaned_roots(&self) {
let mut accounts_index = self.accounts_index.write().unwrap();
accounts_index.uncleaned_roots.clear();
drop(accounts_index);
}
// Purge zero lamport accounts and older rooted account states as garbage
// collection
// Only remove those accounts where the entire rooted history of the account // Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors // can be purged because there are no live append vecs in the ancestors
pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) { pub fn clean_accounts(&self) {
self.report_store_stats(); self.report_store_stats();
let mut purges = HashMap::new(); let mut purges = HashMap::new();
let mut purges_in_root = Vec::new();
let no_ancestors = HashMap::new();
let accounts_index = self.accounts_index.read().unwrap(); let accounts_index = self.accounts_index.read().unwrap();
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 && accounts_index.is_root(slot) { accounts_index.scan_accounts(&no_ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 {
purges.insert(*pubkey, accounts_index.would_purge(pubkey)); purges.insert(*pubkey, accounts_index.would_purge(pubkey));
} else if accounts_index.uncleaned_roots.contains(&slot) {
purges_in_root.push(*pubkey);
} }
}); });
drop(accounts_index);
if !purges_in_root.is_empty() {
self.clean_old_rooted_accounts(purges_in_root);
}
self.clear_uncleaned_roots();
let accounts_index = self.accounts_index.read().unwrap();
// Calculate store counts as if everything was purged // Calculate store counts as if everything was purged
// Then purge if we can // Then purge if we can
@ -725,9 +774,9 @@ impl AccountsDB {
let mut dead_slots = self.remove_dead_accounts(reclaims); let mut dead_slots = self.remove_dead_accounts(reclaims);
dead_accounts.stop(); dead_accounts.stop();
let mut cleanup_dead_slots = Measure::start("reclaims::purge_slots"); let mut clean_dead_slots = Measure::start("reclaims::purge_slots");
self.cleanup_dead_slots(&mut dead_slots); self.clean_dead_slots(&mut dead_slots);
cleanup_dead_slots.stop(); clean_dead_slots.stop();
let mut purge_slots = Measure::start("reclaims::purge_slots"); let mut purge_slots = Measure::start("reclaims::purge_slots");
for slot in dead_slots { for slot in dead_slots {
@ -1298,7 +1347,7 @@ impl AccountsDB {
dead_slots dead_slots
} }
fn cleanup_dead_slots(&self, dead_slots: &mut HashSet<Slot>) { fn clean_dead_slots(&self, dead_slots: &mut HashSet<Slot>) {
if self.dont_cleanup_dead_slots.load(Ordering::Relaxed) { if self.dont_cleanup_dead_slots.load(Ordering::Relaxed) {
return; return;
} }
@ -1307,7 +1356,7 @@ impl AccountsDB {
{ {
let mut index = self.accounts_index.write().unwrap(); let mut index = self.accounts_index.write().unwrap();
for slot in dead_slots.iter() { for slot in dead_slots.iter() {
index.cleanup_dead_slot(*slot); index.clean_dead_slot(*slot);
} }
} }
{ {
@ -1928,7 +1977,7 @@ pub mod tests {
//slot is still there, since gc is lazy //slot is still there, since gc is lazy
assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some()); assert!(accounts.storage.read().unwrap().0[&0].get(&id).is_some());
//store causes cleanup //store causes clean
accounts.store(1, &[(&pubkey, &account)]); accounts.store(1, &[(&pubkey, &account)]);
//slot is gone //slot is gone
@ -1939,6 +1988,149 @@ pub mod tests {
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
} }
impl AccountsDB {
fn store_count_for_slot(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();
let slot_storage = storage.0.get(&slot);
if let Some(slot_storage) = slot_storage {
slot_storage.values().nth(0).unwrap().count()
} else {
0
}
}
fn uncleaned_root_count(&self) -> usize {
self.accounts_index.read().unwrap().uncleaned_roots.len()
}
}
#[test]
fn test_clean_old_with_normal_account() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new());
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey, &account)]);
accounts.store(1, &[(&pubkey, &account)]);
// simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);
//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 1);
accounts.clean_accounts();
//now old state is cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 1);
}
#[test]
fn test_clean_old_with_zero_lamport_account() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new());
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
accounts.store(1, &[(&pubkey1, &zero_account)]);
accounts.store(0, &[(&pubkey2, &normal_account)]);
accounts.store(1, &[(&pubkey2, &normal_account)]);
//simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);
//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 2);
accounts.clean_accounts();
//still old state behind zero-lamport account isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 2);
}
#[test]
fn test_clean_old_with_both_normal_and_zero_lamport_accounts() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new());
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let normal_account = Account::new(1, 0, &Account::default().owner);
let zero_account = Account::new(0, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey1, &normal_account)]);
accounts.store(1, &[(&pubkey1, &zero_account)]);
accounts.store(0, &[(&pubkey2, &normal_account)]);
accounts.store(2, &[(&pubkey2, &normal_account)]);
//simulate slots are rooted after while
accounts.add_root(0);
accounts.add_root(1);
accounts.add_root(2);
//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 1);
assert_eq!(accounts.store_count_for_slot(2), 1);
accounts.clean_accounts();
//both zero lamport and normal accounts are cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 0);
assert_eq!(accounts.store_count_for_slot(2), 1);
}
#[test]
fn test_uncleaned_roots_with_account() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new());
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
//store an account
accounts.store(0, &[(&pubkey, &account)]);
assert_eq!(accounts.uncleaned_root_count(), 0);
// simulate slots are rooted after while
accounts.add_root(0);
assert_eq!(accounts.uncleaned_root_count(), 1);
//now uncleaned roots are cleaned up
accounts.clean_accounts();
assert_eq!(accounts.uncleaned_root_count(), 0);
}
#[test]
fn test_uncleaned_roots_with_no_account() {
solana_logger::setup();
let accounts = AccountsDB::new(Vec::new());
assert_eq!(accounts.uncleaned_root_count(), 0);
// simulate slots are rooted after while
accounts.add_root(0);
assert_eq!(accounts.uncleaned_root_count(), 1);
//now uncleaned roots are cleaned up
accounts.clean_accounts();
assert_eq!(accounts.uncleaned_root_count(), 0);
}
fn print_accounts(label: &'static str, accounts: &AccountsDB) { fn print_accounts(label: &'static str, accounts: &AccountsDB) {
print_index(label, accounts); print_index(label, accounts);
print_count_and_status(label, accounts); print_count_and_status(label, accounts);
@ -2103,12 +2295,6 @@ pub mod tests {
daccounts daccounts
} }
fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) {
let ancestors = vec![(slot as Slot, 0)].into_iter().collect();
info!("ancestors: {:?}", ancestors);
accounts.purge_zero_lamport_accounts(&ancestors);
}
fn assert_no_stores(accounts: &AccountsDB, slot: Slot) { fn assert_no_stores(accounts: &AccountsDB, slot: Slot) {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
info!("{:?}", stores.0.get(&slot)); info!("{:?}", stores.0.get(&slot));
@ -2153,7 +2339,7 @@ pub mod tests {
print_accounts("pre_purge", &accounts); print_accounts("pre_purge", &accounts);
purge_zero_lamport_accounts(&accounts, current_slot); accounts.clean_accounts();
print_accounts("post_purge", &accounts); print_accounts("post_purge", &accounts);
@ -2216,7 +2402,7 @@ pub mod tests {
info!("ancestors: {:?}", ancestors); info!("ancestors: {:?}", ancestors);
let hash = accounts.update_accounts_hash(current_slot, &ancestors); let hash = accounts.update_accounts_hash(current_slot, &ancestors);
purge_zero_lamport_accounts(&accounts, current_slot); accounts.clean_accounts();
assert_eq!( assert_eq!(
accounts.update_accounts_hash(current_slot, &ancestors), accounts.update_accounts_hash(current_slot, &ancestors),
@ -2287,7 +2473,7 @@ pub mod tests {
print_accounts("accounts", &accounts); print_accounts("accounts", &accounts);
purge_zero_lamport_accounts(&accounts, current_slot); accounts.clean_accounts();
print_accounts("accounts_post_purge", &accounts); print_accounts("accounts_post_purge", &accounts);
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
@ -2356,7 +2542,7 @@ pub mod tests {
fn test_accounts_purge_chained_purge_before_snapshot_restore() { fn test_accounts_purge_chained_purge_before_snapshot_restore() {
solana_logger::setup(); solana_logger::setup();
with_chained_zero_lamport_accounts(|accounts, current_slot| { with_chained_zero_lamport_accounts(|accounts, current_slot| {
purge_zero_lamport_accounts(&accounts, current_slot); accounts.clean_accounts();
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
(accounts, false) (accounts, false)
}); });
@ -2370,7 +2556,7 @@ pub mod tests {
accounts accounts
.dont_cleanup_dead_slots .dont_cleanup_dead_slots
.store(true, Ordering::Relaxed); .store(true, Ordering::Relaxed);
purge_zero_lamport_accounts(&accounts, current_slot); accounts.clean_accounts();
(accounts, true) (accounts, true)
}); });
} }

View File

@ -12,6 +12,7 @@ pub struct AccountsIndex<T> {
pub account_maps: HashMap<Pubkey, RwLock<SlotList<T>>>, pub account_maps: HashMap<Pubkey, RwLock<SlotList<T>>>,
pub roots: HashSet<Slot>, pub roots: HashSet<Slot>,
pub uncleaned_roots: HashSet<Slot>,
} }
impl<T: Clone> AccountsIndex<T> { impl<T: Clone> AccountsIndex<T> {
@ -55,7 +56,7 @@ impl<T: Clone> AccountsIndex<T> {
let mut max = 0; let mut max = 0;
let mut rv = None; let mut rv = None;
for (i, (slot, _t)) in list.iter().rev().enumerate() { for (i, (slot, _t)) in list.iter().rev().enumerate() {
if *slot >= max && (ancestors.get(slot).is_some() || self.is_root(*slot)) { if *slot >= max && (ancestors.contains_key(slot) || self.is_root(*slot)) {
rv = Some((list.len() - 1) - i); rv = Some((list.len() - 1) - i);
max = *slot; max = *slot;
} }
@ -112,15 +113,28 @@ impl<T: Clone> AccountsIndex<T> {
account_info: T, account_info: T,
reclaims: &mut Vec<(Slot, T)>, reclaims: &mut Vec<(Slot, T)>,
) -> Option<T> { ) -> Option<T> {
let roots = &self.roots;
if let Some(lock) = self.account_maps.get(pubkey) { if let Some(lock) = self.account_maps.get(pubkey) {
let mut slot_vec = lock.write().unwrap(); let mut slot_vec = lock.write().unwrap();
// filter out old entries // filter out other dirty entries
reclaims.extend(slot_vec.iter().filter(|(f, _)| *f == slot).cloned()); reclaims.extend(slot_vec.iter().filter(|(f, _)| *f == slot).cloned());
slot_vec.retain(|(f, _)| *f != slot); slot_vec.retain(|(f, _)| *f != slot);
// add the new entry
slot_vec.push((slot, account_info)); slot_vec.push((slot, account_info));
// now, do lazy clean
self.purge_older_root_entries(&mut slot_vec, reclaims);
None
} else {
Some(account_info)
}
}
fn purge_older_root_entries(
&self,
slot_vec: &mut Vec<(Slot, T)>,
reclaims: &mut Vec<(Slot, T)>,
) {
let roots = &self.roots;
let max_root = Self::get_max_root(roots, &slot_vec); let max_root = Self::get_max_root(roots, &slot_vec);
@ -131,9 +145,12 @@ impl<T: Clone> AccountsIndex<T> {
.cloned(), .cloned(),
); );
slot_vec.retain(|(slot, _)| !Self::can_purge(max_root, *slot)); slot_vec.retain(|(slot, _)| !Self::can_purge(max_root, *slot));
None }
} else {
Some(account_info) pub fn clean_rooted_entries(&self, pubkey: &Pubkey, reclaims: &mut Vec<(Slot, T)>) {
if let Some(lock) = self.account_maps.get(pubkey) {
let mut slot_vec = lock.write().unwrap();
self.purge_older_root_entries(&mut slot_vec, reclaims);
} }
} }
@ -155,11 +172,13 @@ impl<T: Clone> AccountsIndex<T> {
pub fn add_root(&mut self, slot: Slot) { pub fn add_root(&mut self, slot: Slot) {
self.roots.insert(slot); self.roots.insert(slot);
self.uncleaned_roots.insert(slot);
} }
/// Remove the slot when the storage for the slot is freed /// Remove the slot when the storage for the slot is freed
/// Accounts no longer reference this slot. /// Accounts no longer reference this slot.
pub fn cleanup_dead_slot(&mut self, slot: Slot) { pub fn clean_dead_slot(&mut self, slot: Slot) {
self.roots.remove(&slot); self.roots.remove(&slot);
self.uncleaned_roots.remove(&slot);
} }
} }
@ -259,26 +278,36 @@ mod tests {
} }
#[test] #[test]
fn test_cleanup_first() { fn test_clean_first() {
let mut index = AccountsIndex::<bool>::default(); let mut index = AccountsIndex::<bool>::default();
index.add_root(0); index.add_root(0);
index.add_root(1); index.add_root(1);
index.cleanup_dead_slot(0); index.clean_dead_slot(0);
assert!(index.is_root(1)); assert!(index.is_root(1));
assert!(!index.is_root(0)); assert!(!index.is_root(0));
} }
#[test] #[test]
fn test_cleanup_last() { fn test_clean_last() {
//this behavior might be undefined, clean up should only occur on older slots //this behavior might be undefined, clean up should only occur on older slots
let mut index = AccountsIndex::<bool>::default(); let mut index = AccountsIndex::<bool>::default();
index.add_root(0); index.add_root(0);
index.add_root(1); index.add_root(1);
index.cleanup_dead_slot(1); index.clean_dead_slot(1);
assert!(!index.is_root(1)); assert!(!index.is_root(1));
assert!(index.is_root(0)); assert!(index.is_root(0));
} }
#[test]
fn test_clean_and_unclean_slot() {
let mut index = AccountsIndex::<bool>::default();
assert_eq!(0, index.uncleaned_roots.len());
index.add_root(1);
assert_eq!(1, index.uncleaned_roots.len());
index.clean_dead_slot(1);
assert_eq!(0, index.uncleaned_roots.len());
}
#[test] #[test]
fn test_update_last_wins() { fn test_update_last_wins() {
let key = Keypair::new(); let key = Keypair::new();

View File

@ -1924,7 +1924,7 @@ impl Bank {
/// A snapshot bank should be purged of 0 lamport accounts which are not part of the hash /// A snapshot bank should be purged of 0 lamport accounts which are not part of the hash
/// calculation and could shield other real accounts. /// calculation and could shield other real accounts.
pub fn verify_snapshot_bank(&self) -> bool { pub fn verify_snapshot_bank(&self) -> bool {
self.purge_zero_lamport_accounts(); self.clean_accounts();
// Order and short-circuiting is significant; verify_hash requires a valid bank hash // Order and short-circuiting is significant; verify_hash requires a valid bank hash
self.verify_bank_hash() && self.verify_hash() self.verify_bank_hash() && self.verify_hash()
} }
@ -2117,11 +2117,8 @@ impl Bank {
); );
} }
pub fn purge_zero_lamport_accounts(&self) { pub fn clean_accounts(&self) {
self.rc self.rc.accounts.accounts_db.clean_accounts();
.accounts
.accounts_db
.purge_zero_lamport_accounts(&self.ancestors);
} }
} }
@ -3170,7 +3167,7 @@ mod tests {
} }
let hash = bank.update_accounts_hash(); let hash = bank.update_accounts_hash();
bank.purge_zero_lamport_accounts(); bank.clean_accounts();
assert_eq!(bank.update_accounts_hash(), hash); assert_eq!(bank.update_accounts_hash(), hash);
let bank0 = Arc::new(new_from_parent(&bank)); let bank0 = Arc::new(new_from_parent(&bank));
@ -3190,14 +3187,14 @@ mod tests {
info!("bank0 purge"); info!("bank0 purge");
let hash = bank0.update_accounts_hash(); let hash = bank0.update_accounts_hash();
bank0.purge_zero_lamport_accounts(); bank0.clean_accounts();
assert_eq!(bank0.update_accounts_hash(), hash); assert_eq!(bank0.update_accounts_hash(), hash);
assert_eq!(bank0.get_account(&keypair.pubkey()).unwrap().lamports, 10); assert_eq!(bank0.get_account(&keypair.pubkey()).unwrap().lamports, 10);
assert_eq!(bank1.get_account(&keypair.pubkey()), None); assert_eq!(bank1.get_account(&keypair.pubkey()), None);
info!("bank1 purge"); info!("bank1 purge");
bank1.purge_zero_lamport_accounts(); bank1.clean_accounts();
assert_eq!(bank0.get_account(&keypair.pubkey()).unwrap().lamports, 10); assert_eq!(bank0.get_account(&keypair.pubkey()).unwrap().lamports, 10);
assert_eq!(bank1.get_account(&keypair.pubkey()), None); assert_eq!(bank1.get_account(&keypair.pubkey()), None);
@ -3215,7 +3212,7 @@ mod tests {
// keypair should have 0 tokens on both forks // keypair should have 0 tokens on both forks
assert_eq!(bank0.get_account(&keypair.pubkey()), None); assert_eq!(bank0.get_account(&keypair.pubkey()), None);
assert_eq!(bank1.get_account(&keypair.pubkey()), None); assert_eq!(bank1.get_account(&keypair.pubkey()), None);
bank1.purge_zero_lamport_accounts(); bank1.clean_accounts();
assert!(bank1.verify_bank_hash()); assert!(bank1.verify_bank_hash());
} }