More conservative purge_zero_lamport_accounts purge logic (#7157)

This commit is contained in:
sakridge 2019-12-02 09:51:05 -08:00 committed by GitHub
parent 1eaf71b5b4
commit 887bff572a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 337 additions and 87 deletions

View File

@ -70,7 +70,7 @@ pub struct ErrorCounters {
#[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)] #[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)]
pub struct AccountInfo { pub struct AccountInfo {
/// index identifying the append storage /// index identifying the append storage
id: AppendVecId, store_id: AppendVecId,
/// offset into the storage /// offset into the storage
offset: usize, offset: usize,
@ -560,25 +560,77 @@ impl AccountsDB {
false false
} }
// Purge zero lamport accounts for garbage collection purposes
// Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors
pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) { pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) {
self.report_store_stats(); self.report_store_stats();
let mut purges = HashMap::new();
let accounts_index = self.accounts_index.read().unwrap(); let accounts_index = self.accounts_index.read().unwrap();
let mut purges = Vec::new();
accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| { accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| {
if account_info.lamports == 0 && accounts_index.is_root(slot) { if account_info.lamports == 0 && accounts_index.is_root(slot) {
purges.push(*pubkey); purges.insert(*pubkey, accounts_index.would_purge(pubkey));
} }
}); });
drop(accounts_index);
let mut reclaims = Vec::new(); // Calculate store counts as if everything was purged
let mut accounts_index = self.accounts_index.write().unwrap(); // Then purge if we can
for purge in &purges { let mut store_counts: HashMap<AppendVecId, usize> = HashMap::new();
reclaims.extend(accounts_index.purge(purge)); let storage = self.storage.read().unwrap();
for account_infos in purges.values() {
for (slot_id, account_info) in account_infos {
let slot_storage = storage.0.get(&slot_id).unwrap();
let store = slot_storage.get(&account_info.store_id).unwrap();
if let Some(store_count) = store_counts.get_mut(&account_info.store_id) {
*store_count -= 1;
} else {
store_counts.insert(
account_info.store_id,
store.count_and_status.read().unwrap().0 - 1,
);
} }
}
}
// Only keep purges where the entire history of the account in the root set
// can be purged. All AppendVecs for those updates are dead.
purges.retain(|_pubkey, account_infos| {
for (_slot_id, account_info) in account_infos {
if *store_counts.get(&account_info.store_id).unwrap() != 0 {
return false;
}
}
true
});
// Recalculate reclaims with new purge set
let mut reclaims = Vec::new();
for pubkey in purges.keys() {
reclaims.extend(accounts_index.purge(&pubkey));
}
let last_root = accounts_index.last_root; let last_root = accounts_index.last_root;
drop(accounts_index); drop(accounts_index);
drop(storage);
self.handle_reclaims(&reclaims, last_root);
}
fn handle_reclaims(&self, reclaims: &[(Slot, AccountInfo)], last_root: Slot) {
let mut dead_accounts = Measure::start("reclaims::remove_dead_accounts");
let mut dead_slots = self.remove_dead_accounts(reclaims); let mut dead_slots = self.remove_dead_accounts(reclaims);
dead_accounts.stop();
let mut cleanup_dead_slots = Measure::start("reclaims::purge_slots");
self.cleanup_dead_slots(&mut dead_slots, last_root); self.cleanup_dead_slots(&mut dead_slots, last_root);
cleanup_dead_slots.stop();
let mut purge_slots = Measure::start("reclaims::purge_slots");
for slot in dead_slots {
self.purge_slot(slot);
}
purge_slots.stop();
} }
pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
@ -595,7 +647,7 @@ impl AccountsDB {
storage storage
.0 .0
.get(&slot) .get(&slot)
.and_then(|storage_map| storage_map.get(&account_info.id)) .and_then(|storage_map| storage_map.get(&account_info.store_id))
.and_then(|store| { .and_then(|store| {
Some( Some(
store store
@ -663,7 +715,7 @@ impl AccountsDB {
if let Some(slot_storage) = storage.0.get(&slot) { if let Some(slot_storage) = storage.0.get(&slot) {
let info = &lock[index].1; let info = &lock[index].1;
slot_storage slot_storage
.get(&info.id) .get(&info.store_id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
.map(|account| (account, slot)) .map(|account| (account, slot))
} else { } else {
@ -831,7 +883,7 @@ impl AccountsDB {
for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) { for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) {
storage.add_account(); storage.add_account();
infos.push(AccountInfo { infos.push(AccountInfo {
id: storage.id, store_id: storage.id,
offset: *offset, offset: *offset,
lamports: account.lamports, lamports: account.lamports,
}); });
@ -938,19 +990,19 @@ impl AccountsDB {
(reclaims, last_root) (reclaims, last_root)
} }
fn remove_dead_accounts(&self, reclaims: Vec<(Slot, AccountInfo)>) -> HashSet<Slot> { fn remove_dead_accounts(&self, reclaims: &[(Slot, AccountInfo)]) -> HashSet<Slot> {
let storage = self.storage.read().unwrap(); let storage = self.storage.read().unwrap();
let mut dead_slots = HashSet::new(); let mut dead_slots = HashSet::new();
for (slot_id, account_info) in reclaims { for (slot_id, account_info) in reclaims {
if let Some(slot_storage) = storage.0.get(&slot_id) { if let Some(slot_storage) = storage.0.get(slot_id) {
if let Some(store) = slot_storage.get(&account_info.id) { if let Some(store) = slot_storage.get(&account_info.store_id) {
assert_eq!( assert_eq!(
slot_id, store.slot_id, *slot_id, store.slot_id,
"AccountDB::accounts_index corrupted. Storage should only point to one slot" "AccountDB::accounts_index corrupted. Storage should only point to one slot"
); );
let count = store.remove_account(); let count = store.remove_account();
if count == 0 { if count == 0 {
dead_slots.insert(slot_id); dead_slots.insert(*slot_id);
} }
} }
} }
@ -1036,21 +1088,7 @@ impl AccountsDB {
update_index.stop(); update_index.stop();
trace!("reclaim: {}", reclaims.len()); trace!("reclaim: {}", reclaims.len());
let mut remove_dead_accounts = Measure::start("store::remove_dead"); self.handle_reclaims(&reclaims, last_root);
let mut dead_slots = self.remove_dead_accounts(reclaims);
remove_dead_accounts.stop();
trace!("dead_slots: {}", dead_slots.len());
let mut cleanup_dead_slots = Measure::start("store::cleanup_dead_slots");
self.cleanup_dead_slots(&mut dead_slots, last_root);
cleanup_dead_slots.stop();
trace!("purge_slots: {}", dead_slots.len());
let mut purge_slots = Measure::start("store::purge_slots");
for slot in dead_slots {
self.purge_slot(slot);
}
purge_slots.stop();
} }
pub fn add_root(&self, slot: Slot) { pub fn add_root(&self, slot: Slot) {
@ -1095,10 +1133,10 @@ impl AccountsDB {
.scan_account_storage( .scan_account_storage(
*slot_id, *slot_id,
|stored_account: &StoredAccount, |stored_account: &StoredAccount,
id: AppendVecId, store_id: AppendVecId,
accum: &mut HashMap<Pubkey, (u64, AccountInfo)>| { accum: &mut HashMap<Pubkey, (u64, AccountInfo)>| {
let account_info = AccountInfo { let account_info = AccountInfo {
id, store_id,
offset: stored_account.offset, offset: stored_account.offset,
lamports: stored_account.account_meta.lamports, lamports: stored_account.account_meta.lamports,
}; };
@ -1125,7 +1163,7 @@ impl AccountsDB {
let mut counts = HashMap::new(); let mut counts = HashMap::new();
for slot_list in accounts_index.account_maps.values() { for slot_list in accounts_index.account_maps.values() {
for (_slot, account_entry) in slot_list.read().unwrap().iter() { for (_slot, account_entry) in slot_list.read().unwrap().iter() {
*counts.entry(account_entry.id).or_insert(0) += 1; *counts.entry(account_entry.store_id).or_insert(0) += 1;
} }
} }
for slot_stores in storage.0.values() { for slot_stores in storage.0.values() {
@ -1603,7 +1641,7 @@ pub mod tests {
let id = { let id = {
let index = accounts.accounts_index.read().unwrap(); let index = accounts.accounts_index.read().unwrap();
let (list, idx) = index.get(&pubkey, &ancestors).unwrap(); let (list, idx) = index.get(&pubkey, &ancestors).unwrap();
list[idx].1.id list[idx].1.store_id
}; };
//slot 0 is behind root, but it is not root, therefore it is purged //slot 0 is behind root, but it is not root, therefore it is purged
accounts.add_root(1); accounts.add_root(1);
@ -1623,12 +1661,38 @@ pub mod tests {
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
} }
fn print_count_and_status(label: &'static str, accounts: &AccountsDB) { fn print_accounts(label: &'static str, accounts: &AccountsDB) {
for (_slot, slot_stores) in &accounts.storage.read().unwrap().0 { print_index(label, accounts);
for (id, entry) in slot_stores { print_count_and_status(label, accounts);
}
fn print_index(label: &'static str, accounts: &AccountsDB) {
info!( info!(
"{}: {} count_and_status: {:?}", "{}: accounts.accounts_index roots: {:?}",
label, label,
accounts.accounts_index.read().unwrap().roots
);
for (pubkey, list) in &accounts.accounts_index.read().unwrap().account_maps {
info!(" key: {}", pubkey);
info!(" slots: {:?}", *list.read().unwrap());
}
}
fn print_count_and_status(label: &'static str, accounts: &AccountsDB) {
let storage = accounts.storage.read().unwrap();
let mut slots: Vec<_> = storage.0.keys().cloned().collect();
slots.sort();
info!("{}: count_and status for {} slots:", label, slots.len());
for slot in &slots {
let slot_stores = storage.0.get(slot).unwrap();
let mut ids: Vec<_> = slot_stores.keys().cloned().collect();
ids.sort();
for id in &ids {
let entry = slot_stores.get(id).unwrap();
info!(
" slot: {} id: {} count_and_status: {:?}",
slot,
id, id,
*entry.count_and_status.read().unwrap() *entry.count_and_status.read().unwrap()
); );
@ -1680,33 +1744,12 @@ pub mod tests {
accounts.store(latest_slot, &[(&pubkeys[31], &account)]); accounts.store(latest_slot, &[(&pubkeys[31], &account)]);
accounts.add_root(latest_slot); accounts.add_root(latest_slot);
let mut writer = Cursor::new(vec![]);
serialize_into(
&mut writer,
&AccountsDBSerialize::new(&accounts, latest_slot),
)
.unwrap();
assert!(check_storage(&accounts, 0, 90)); assert!(check_storage(&accounts, 0, 90));
assert!(check_storage(&accounts, 1, 21)); assert!(check_storage(&accounts, 1, 21));
assert!(check_storage(&accounts, 2, 31)); assert!(check_storage(&accounts, 2, 31));
let buf = writer.into_inner(); let daccounts = reconstruct_accounts_db_via_serialization(&accounts, latest_slot);
let mut reader = BufReader::new(&buf[..]);
let daccounts = AccountsDB::new(None);
print_count_and_status("accounts", &accounts);
let local_paths = {
let paths = daccounts.paths.read().unwrap();
AccountsDB::format_paths(paths.to_vec())
};
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
daccounts
.accounts_from_stream(&mut reader, local_paths, copied_accounts.path())
.unwrap();
assert_eq!( assert_eq!(
daccounts.write_version.load(Ordering::Relaxed), daccounts.write_version.load(Ordering::Relaxed),
accounts.write_version.load(Ordering::Relaxed) accounts.write_version.load(Ordering::Relaxed)
@ -1735,6 +1778,226 @@ pub mod tests {
assert!(check_storage(&daccounts, 2, 31)); assert!(check_storage(&daccounts, 2, 31));
} }
fn assert_load_account(
accounts: &AccountsDB,
slot: Slot,
pubkey: Pubkey,
expected_lamports: u64,
) {
let ancestors = vec![(slot, 0)].into_iter().collect();
let (account, slot) = accounts.load_slow(&ancestors, &pubkey).unwrap();
assert_eq!((account.lamports, slot), (expected_lamports, slot));
}
fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB {
let mut writer = Cursor::new(vec![]);
serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap();
let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
let daccounts = AccountsDB::new(None);
let local_paths = {
let paths = daccounts.paths.read().unwrap();
AccountsDB::format_paths(paths.to_vec())
};
let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
daccounts
.accounts_from_stream(&mut reader, local_paths, copied_accounts.path())
.unwrap();
print_count_and_status("daccounts", &daccounts);
daccounts
}
fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) {
let ancestors = vec![(slot as Slot, 0)].into_iter().collect();
info!("ancestors: {:?}", ancestors);
accounts.purge_zero_lamport_accounts(&ancestors);
}
fn assert_no_stores(accounts: &AccountsDB, slot: Slot) {
let stores = accounts.storage.read().unwrap();
info!("{:?}", stores.0.get(&slot));
assert!(stores.0.get(&slot).is_none() || stores.0.get(&slot).unwrap().len() == 0);
}
#[test]
fn test_accounts_db_purge_keep_live() {
solana_logger::setup();
let some_lamport = 223;
let zero_lamport = 0;
let no_data = 0;
let owner = Account::default().owner;
let account = Account::new(some_lamport, no_data, &owner);
let pubkey = Pubkey::new_rand();
let account2 = Account::new(some_lamport, no_data, &owner);
let pubkey2 = Pubkey::new_rand();
let zero_lamport_account = Account::new(zero_lamport, no_data, &owner);
let accounts = AccountsDB::new_single();
accounts.add_root(0);
let mut current_slot = 1;
accounts.store(current_slot, &[(&pubkey, &account)]);
// Store another live account to slot 1 which will prevent any purge
// since the store count will not be zero
accounts.store(current_slot, &[(&pubkey2, &account2)]);
accounts.add_root(current_slot);
current_slot += 1;
accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]);
accounts.add_root(current_slot);
assert_load_account(&accounts, current_slot, pubkey, zero_lamport);
current_slot += 1;
accounts.add_root(current_slot);
purge_zero_lamport_accounts(&accounts, current_slot);
print_accounts("post_purge", &accounts);
// Make sure the index is for pubkey cleared
assert_eq!(
accounts
.accounts_index
.read()
.unwrap()
.account_maps
.get(&pubkey)
.unwrap()
.read()
.unwrap()
.len(),
2
);
// slot 1 & 2 should have stores
check_storage(&accounts, 1, 2);
check_storage(&accounts, 2, 1);
}
#[test]
fn test_accounts_db_purge() {
solana_logger::setup();
let some_lamport = 223;
let zero_lamport = 0;
let no_data = 0;
let owner = Account::default().owner;
let account = Account::new(some_lamport, no_data, &owner);
let pubkey = Pubkey::new_rand();
let zero_lamport_account = Account::new(zero_lamport, no_data, &owner);
let accounts = AccountsDB::new_single();
accounts.add_root(0);
let mut current_slot = 1;
accounts.store(current_slot, &[(&pubkey, &account)]);
accounts.add_root(current_slot);
current_slot += 1;
accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]);
accounts.add_root(current_slot);
assert_load_account(&accounts, current_slot, pubkey, zero_lamport);
// Otherwise slot 2 will not be removed
current_slot += 1;
accounts.add_root(current_slot);
purge_zero_lamport_accounts(&accounts, current_slot);
print_accounts("post_purge", &accounts);
// Make sure the index is for pubkey cleared
assert_eq!(
accounts
.accounts_index
.read()
.unwrap()
.account_maps
.get(&pubkey)
.unwrap()
.read()
.unwrap()
.len(),
0
);
// slot 1 & 2 should not have any stores
assert_no_stores(&accounts, 1);
assert_no_stores(&accounts, 2);
}
#[test]
fn test_accounts_db_serialize_zero_and_free() {
solana_logger::setup();
let some_lamport = 223;
let zero_lamport = 0;
let no_data = 0;
let owner = Account::default().owner;
let account = Account::new(some_lamport, no_data, &owner);
let pubkey = Pubkey::new_rand();
let zero_lamport_account = Account::new(zero_lamport, no_data, &owner);
let account2 = Account::new(some_lamport + 1, no_data, &owner);
let pubkey2 = Pubkey::new_rand();
let filler_account = Account::new(some_lamport, no_data, &owner);
let filler_account_pubkey = Pubkey::new_rand();
let accounts = AccountsDB::new_single();
let mut current_slot = 1;
accounts.store(current_slot, &[(&pubkey, &account)]);
accounts.add_root(current_slot);
current_slot += 1;
accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]);
accounts.store(current_slot, &[(&pubkey2, &account2)]);
// Store enough accounts such that an additional store for slot 2 is created.
while accounts
.storage
.read()
.unwrap()
.0
.get(&current_slot)
.unwrap()
.len()
< 2
{
accounts.store(current_slot, &[(&filler_account_pubkey, &filler_account)]);
}
accounts.add_root(current_slot);
assert_load_account(&accounts, current_slot, pubkey, zero_lamport);
print_accounts("accounts", &accounts);
purge_zero_lamport_accounts(&accounts, current_slot);
print_accounts("accounts_post_purge", &accounts);
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
print_accounts("reconstructed", &accounts);
assert_load_account(&accounts, current_slot, pubkey, zero_lamport);
}
#[test] #[test]
#[ignore] #[ignore]
fn test_store_account_stress() { fn test_store_account_stress() {

View File

@ -29,13 +29,21 @@ impl<T: Clone> AccountsIndex<T> {
} }
} }
pub fn purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> { fn get_rooted_entries(&self, list: &[(Slot, T)]) -> Vec<(Slot, T)> {
let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap(); list.iter()
let reclaims = list
.iter()
.filter(|(slot, _)| self.is_root(*slot)) .filter(|(slot, _)| self.is_root(*slot))
.cloned() .cloned()
.collect(); .collect()
}
pub fn would_purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> {
let list = self.account_maps.get(&pubkey).unwrap().read().unwrap();
self.get_rooted_entries(&list)
}
pub fn purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> {
let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap();
let reclaims = self.get_rooted_entries(&list);
list.retain(|(slot, _)| !self.is_root(*slot)); list.retain(|(slot, _)| !self.is_root(*slot));
reclaims reclaims
} }

View File

@ -1446,20 +1446,6 @@ impl Bank {
self.rc self.rc
.accounts .accounts
.verify_hash_internal_state(self.slot(), &self.ancestors) .verify_hash_internal_state(self.slot(), &self.ancestors)
&& !self.has_accounts_with_zero_lamports()
}
fn has_accounts_with_zero_lamports(&self) -> bool {
self.rc.accounts.accounts_db.scan_accounts(
&self.ancestors,
|collector: &mut bool, option| {
if let Some((_, account, _)) = option {
if account.lamports == 0 {
*collector = true;
}
}
},
)
} }
/// Return the number of hashes per tick /// Return the number of hashes per tick
@ -2316,10 +2302,6 @@ mod tests {
); );
} }
fn assert_no_zero_balance_accounts(bank: &Arc<Bank>) {
assert!(!bank.has_accounts_with_zero_lamports());
}
// Test that purging 0 lamports accounts works. // Test that purging 0 lamports accounts works.
#[test] #[test]
fn test_purge_empty_accounts() { fn test_purge_empty_accounts() {
@ -2338,8 +2320,6 @@ mod tests {
bank.purge_zero_lamport_accounts(); bank.purge_zero_lamport_accounts();
assert_no_zero_balance_accounts(&bank);
let bank0 = Arc::new(new_from_parent(&bank)); let bank0 = Arc::new(new_from_parent(&bank));
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
let keypair = Keypair::new(); let keypair = Keypair::new();
@ -2378,7 +2358,6 @@ mod tests {
bank1.purge_zero_lamport_accounts(); bank1.purge_zero_lamport_accounts();
assert!(bank1.verify_hash_internal_state()); assert!(bank1.verify_hash_internal_state());
assert_no_zero_balance_accounts(&bank1);
} }
#[test] #[test]