diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 4351c039f..f2d1ec921 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -70,7 +70,7 @@ pub struct ErrorCounters { #[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)] pub struct AccountInfo { /// index identifying the append storage - id: AppendVecId, + store_id: AppendVecId, /// offset into the storage offset: usize, @@ -560,25 +560,77 @@ impl AccountsDB { false } + // Purge zero lamport accounts for garbage collection purposes + // Only remove those accounts where the entire rooted history of the account + // can be purged because there are no live append vecs in the ancestors pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap) { self.report_store_stats(); + let mut purges = HashMap::new(); let accounts_index = self.accounts_index.read().unwrap(); - let mut purges = Vec::new(); accounts_index.scan_accounts(ancestors, |pubkey, (account_info, slot)| { if account_info.lamports == 0 && accounts_index.is_root(slot) { - purges.push(*pubkey); + purges.insert(*pubkey, accounts_index.would_purge(pubkey)); } }); - drop(accounts_index); - let mut reclaims = Vec::new(); - let mut accounts_index = self.accounts_index.write().unwrap(); - for purge in &purges { - reclaims.extend(accounts_index.purge(purge)); + + // Calculate store counts as if everything was purged + // Then purge if we can + let mut store_counts: HashMap = HashMap::new(); + let storage = self.storage.read().unwrap(); + for account_infos in purges.values() { + for (slot_id, account_info) in account_infos { + let slot_storage = storage.0.get(&slot_id).unwrap(); + let store = slot_storage.get(&account_info.store_id).unwrap(); + if let Some(store_count) = store_counts.get_mut(&account_info.store_id) { + *store_count -= 1; + } else { + store_counts.insert( + account_info.store_id, + store.count_and_status.read().unwrap().0 - 1, + ); + } + } } + + // Only keep purges where the entire history of the account in the root set + // can be purged. All AppendVecs for those updates are dead. + purges.retain(|_pubkey, account_infos| { + for (_slot_id, account_info) in account_infos { + if *store_counts.get(&account_info.store_id).unwrap() != 0 { + return false; + } + } + true + }); + + // Recalculate reclaims with new purge set + let mut reclaims = Vec::new(); + for pubkey in purges.keys() { + reclaims.extend(accounts_index.purge(&pubkey)); + } + let last_root = accounts_index.last_root; + drop(accounts_index); + drop(storage); + + self.handle_reclaims(&reclaims, last_root); + } + + fn handle_reclaims(&self, reclaims: &[(Slot, AccountInfo)], last_root: Slot) { + let mut dead_accounts = Measure::start("reclaims::remove_dead_accounts"); let mut dead_slots = self.remove_dead_accounts(reclaims); + dead_accounts.stop(); + + let mut cleanup_dead_slots = Measure::start("reclaims::purge_slots"); self.cleanup_dead_slots(&mut dead_slots, last_root); + cleanup_dead_slots.stop(); + + let mut purge_slots = Measure::start("reclaims::purge_slots"); + for slot in dead_slots { + self.purge_slot(slot); + } + purge_slots.stop(); } pub fn scan_accounts(&self, ancestors: &HashMap, scan_func: F) -> A @@ -595,7 +647,7 @@ impl AccountsDB { storage .0 .get(&slot) - .and_then(|storage_map| storage_map.get(&account_info.id)) + .and_then(|storage_map| storage_map.get(&account_info.store_id)) .and_then(|store| { Some( store @@ -663,7 +715,7 @@ impl AccountsDB { if let Some(slot_storage) = storage.0.get(&slot) { let info = &lock[index].1; slot_storage - .get(&info.id) + .get(&info.store_id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) .map(|account| (account, slot)) } else { @@ -831,7 +883,7 @@ impl AccountsDB { for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) { storage.add_account(); infos.push(AccountInfo { - id: storage.id, + store_id: storage.id, offset: *offset, lamports: account.lamports, }); @@ -938,19 +990,19 @@ impl AccountsDB { (reclaims, last_root) } - fn remove_dead_accounts(&self, reclaims: Vec<(Slot, AccountInfo)>) -> HashSet { + fn remove_dead_accounts(&self, reclaims: &[(Slot, AccountInfo)]) -> HashSet { let storage = self.storage.read().unwrap(); let mut dead_slots = HashSet::new(); for (slot_id, account_info) in reclaims { - if let Some(slot_storage) = storage.0.get(&slot_id) { - if let Some(store) = slot_storage.get(&account_info.id) { + if let Some(slot_storage) = storage.0.get(slot_id) { + if let Some(store) = slot_storage.get(&account_info.store_id) { assert_eq!( - slot_id, store.slot_id, + *slot_id, store.slot_id, "AccountDB::accounts_index corrupted. Storage should only point to one slot" ); let count = store.remove_account(); if count == 0 { - dead_slots.insert(slot_id); + dead_slots.insert(*slot_id); } } } @@ -1036,21 +1088,7 @@ impl AccountsDB { update_index.stop(); trace!("reclaim: {}", reclaims.len()); - let mut remove_dead_accounts = Measure::start("store::remove_dead"); - let mut dead_slots = self.remove_dead_accounts(reclaims); - remove_dead_accounts.stop(); - trace!("dead_slots: {}", dead_slots.len()); - - let mut cleanup_dead_slots = Measure::start("store::cleanup_dead_slots"); - self.cleanup_dead_slots(&mut dead_slots, last_root); - cleanup_dead_slots.stop(); - trace!("purge_slots: {}", dead_slots.len()); - - let mut purge_slots = Measure::start("store::purge_slots"); - for slot in dead_slots { - self.purge_slot(slot); - } - purge_slots.stop(); + self.handle_reclaims(&reclaims, last_root); } pub fn add_root(&self, slot: Slot) { @@ -1095,10 +1133,10 @@ impl AccountsDB { .scan_account_storage( *slot_id, |stored_account: &StoredAccount, - id: AppendVecId, + store_id: AppendVecId, accum: &mut HashMap| { let account_info = AccountInfo { - id, + store_id, offset: stored_account.offset, lamports: stored_account.account_meta.lamports, }; @@ -1125,7 +1163,7 @@ impl AccountsDB { let mut counts = HashMap::new(); for slot_list in accounts_index.account_maps.values() { for (_slot, account_entry) in slot_list.read().unwrap().iter() { - *counts.entry(account_entry.id).or_insert(0) += 1; + *counts.entry(account_entry.store_id).or_insert(0) += 1; } } for slot_stores in storage.0.values() { @@ -1603,7 +1641,7 @@ pub mod tests { let id = { let index = accounts.accounts_index.read().unwrap(); let (list, idx) = index.get(&pubkey, &ancestors).unwrap(); - list[idx].1.id + list[idx].1.store_id }; //slot 0 is behind root, but it is not root, therefore it is purged accounts.add_root(1); @@ -1623,12 +1661,38 @@ pub mod tests { assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); } + fn print_accounts(label: &'static str, accounts: &AccountsDB) { + print_index(label, accounts); + print_count_and_status(label, accounts); + } + + fn print_index(label: &'static str, accounts: &AccountsDB) { + info!( + "{}: accounts.accounts_index roots: {:?}", + label, + accounts.accounts_index.read().unwrap().roots + ); + for (pubkey, list) in &accounts.accounts_index.read().unwrap().account_maps { + info!(" key: {}", pubkey); + info!(" slots: {:?}", *list.read().unwrap()); + } + } + fn print_count_and_status(label: &'static str, accounts: &AccountsDB) { - for (_slot, slot_stores) in &accounts.storage.read().unwrap().0 { - for (id, entry) in slot_stores { + let storage = accounts.storage.read().unwrap(); + let mut slots: Vec<_> = storage.0.keys().cloned().collect(); + slots.sort(); + info!("{}: count_and status for {} slots:", label, slots.len()); + for slot in &slots { + let slot_stores = storage.0.get(slot).unwrap(); + + let mut ids: Vec<_> = slot_stores.keys().cloned().collect(); + ids.sort(); + for id in &ids { + let entry = slot_stores.get(id).unwrap(); info!( - "{}: {} count_and_status: {:?}", - label, + " slot: {} id: {} count_and_status: {:?}", + slot, id, *entry.count_and_status.read().unwrap() ); @@ -1680,33 +1744,12 @@ pub mod tests { accounts.store(latest_slot, &[(&pubkeys[31], &account)]); accounts.add_root(latest_slot); - let mut writer = Cursor::new(vec![]); - serialize_into( - &mut writer, - &AccountsDBSerialize::new(&accounts, latest_slot), - ) - .unwrap(); assert!(check_storage(&accounts, 0, 90)); assert!(check_storage(&accounts, 1, 21)); assert!(check_storage(&accounts, 2, 31)); - let buf = writer.into_inner(); - let mut reader = BufReader::new(&buf[..]); - let daccounts = AccountsDB::new(None); + let daccounts = reconstruct_accounts_db_via_serialization(&accounts, latest_slot); - print_count_and_status("accounts", &accounts); - - let local_paths = { - let paths = daccounts.paths.read().unwrap(); - AccountsDB::format_paths(paths.to_vec()) - }; - - let copied_accounts = TempDir::new().unwrap(); - // Simulate obtaining a copy of the AppendVecs from a tarball - copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); - daccounts - .accounts_from_stream(&mut reader, local_paths, copied_accounts.path()) - .unwrap(); assert_eq!( daccounts.write_version.load(Ordering::Relaxed), accounts.write_version.load(Ordering::Relaxed) @@ -1735,6 +1778,226 @@ pub mod tests { assert!(check_storage(&daccounts, 2, 31)); } + fn assert_load_account( + accounts: &AccountsDB, + slot: Slot, + pubkey: Pubkey, + expected_lamports: u64, + ) { + let ancestors = vec![(slot, 0)].into_iter().collect(); + let (account, slot) = accounts.load_slow(&ancestors, &pubkey).unwrap(); + assert_eq!((account.lamports, slot), (expected_lamports, slot)); + } + + fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB { + let mut writer = Cursor::new(vec![]); + serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap(); + + let buf = writer.into_inner(); + let mut reader = BufReader::new(&buf[..]); + let daccounts = AccountsDB::new(None); + + let local_paths = { + let paths = daccounts.paths.read().unwrap(); + AccountsDB::format_paths(paths.to_vec()) + }; + + let copied_accounts = TempDir::new().unwrap(); + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); + daccounts + .accounts_from_stream(&mut reader, local_paths, copied_accounts.path()) + .unwrap(); + + print_count_and_status("daccounts", &daccounts); + + daccounts + } + + fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) { + let ancestors = vec![(slot as Slot, 0)].into_iter().collect(); + info!("ancestors: {:?}", ancestors); + accounts.purge_zero_lamport_accounts(&ancestors); + } + + fn assert_no_stores(accounts: &AccountsDB, slot: Slot) { + let stores = accounts.storage.read().unwrap(); + info!("{:?}", stores.0.get(&slot)); + assert!(stores.0.get(&slot).is_none() || stores.0.get(&slot).unwrap().len() == 0); + } + + #[test] + fn test_accounts_db_purge_keep_live() { + solana_logger::setup(); + let some_lamport = 223; + let zero_lamport = 0; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + let pubkey = Pubkey::new_rand(); + + let account2 = Account::new(some_lamport, no_data, &owner); + let pubkey2 = Pubkey::new_rand(); + + let zero_lamport_account = Account::new(zero_lamport, no_data, &owner); + + let accounts = AccountsDB::new_single(); + accounts.add_root(0); + + let mut current_slot = 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + + // Store another live account to slot 1 which will prevent any purge + // since the store count will not be zero + accounts.store(current_slot, &[(&pubkey2, &account2)]); + accounts.add_root(current_slot); + + current_slot += 1; + accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]); + accounts.add_root(current_slot); + + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + + current_slot += 1; + accounts.add_root(current_slot); + + purge_zero_lamport_accounts(&accounts, current_slot); + + print_accounts("post_purge", &accounts); + + // Make sure the index is for pubkey cleared + assert_eq!( + accounts + .accounts_index + .read() + .unwrap() + .account_maps + .get(&pubkey) + .unwrap() + .read() + .unwrap() + .len(), + 2 + ); + + // slot 1 & 2 should have stores + check_storage(&accounts, 1, 2); + check_storage(&accounts, 2, 1); + } + + #[test] + fn test_accounts_db_purge() { + solana_logger::setup(); + let some_lamport = 223; + let zero_lamport = 0; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + let pubkey = Pubkey::new_rand(); + + let zero_lamport_account = Account::new(zero_lamport, no_data, &owner); + + let accounts = AccountsDB::new_single(); + accounts.add_root(0); + + let mut current_slot = 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + accounts.add_root(current_slot); + + current_slot += 1; + accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]); + accounts.add_root(current_slot); + + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + + // Otherwise slot 2 will not be removed + current_slot += 1; + accounts.add_root(current_slot); + + purge_zero_lamport_accounts(&accounts, current_slot); + + print_accounts("post_purge", &accounts); + + // Make sure the index is for pubkey cleared + assert_eq!( + accounts + .accounts_index + .read() + .unwrap() + .account_maps + .get(&pubkey) + .unwrap() + .read() + .unwrap() + .len(), + 0 + ); + + // slot 1 & 2 should not have any stores + assert_no_stores(&accounts, 1); + assert_no_stores(&accounts, 2); + } + + #[test] + fn test_accounts_db_serialize_zero_and_free() { + solana_logger::setup(); + + let some_lamport = 223; + let zero_lamport = 0; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + let pubkey = Pubkey::new_rand(); + let zero_lamport_account = Account::new(zero_lamport, no_data, &owner); + + let account2 = Account::new(some_lamport + 1, no_data, &owner); + let pubkey2 = Pubkey::new_rand(); + + let filler_account = Account::new(some_lamport, no_data, &owner); + let filler_account_pubkey = Pubkey::new_rand(); + + let accounts = AccountsDB::new_single(); + + let mut current_slot = 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + accounts.add_root(current_slot); + + current_slot += 1; + accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]); + accounts.store(current_slot, &[(&pubkey2, &account2)]); + + // Store enough accounts such that an additional store for slot 2 is created. + while accounts + .storage + .read() + .unwrap() + .0 + .get(¤t_slot) + .unwrap() + .len() + < 2 + { + accounts.store(current_slot, &[(&filler_account_pubkey, &filler_account)]); + } + accounts.add_root(current_slot); + + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + + print_accounts("accounts", &accounts); + + purge_zero_lamport_accounts(&accounts, current_slot); + + print_accounts("accounts_post_purge", &accounts); + let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); + + print_accounts("reconstructed", &accounts); + + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + } + #[test] #[ignore] fn test_store_account_stress() { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0939f3581..0254fe10f 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -29,13 +29,21 @@ impl AccountsIndex { } } - pub fn purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> { - let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap(); - let reclaims = list - .iter() + fn get_rooted_entries(&self, list: &[(Slot, T)]) -> Vec<(Slot, T)> { + list.iter() .filter(|(slot, _)| self.is_root(*slot)) .cloned() - .collect(); + .collect() + } + + pub fn would_purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> { + let list = self.account_maps.get(&pubkey).unwrap().read().unwrap(); + self.get_rooted_entries(&list) + } + + pub fn purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> { + let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap(); + let reclaims = self.get_rooted_entries(&list); list.retain(|(slot, _)| !self.is_root(*slot)); reclaims } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 19036dd6d..7a527b3af 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1446,20 +1446,6 @@ impl Bank { self.rc .accounts .verify_hash_internal_state(self.slot(), &self.ancestors) - && !self.has_accounts_with_zero_lamports() - } - - fn has_accounts_with_zero_lamports(&self) -> bool { - self.rc.accounts.accounts_db.scan_accounts( - &self.ancestors, - |collector: &mut bool, option| { - if let Some((_, account, _)) = option { - if account.lamports == 0 { - *collector = true; - } - } - }, - ) } /// Return the number of hashes per tick @@ -2316,10 +2302,6 @@ mod tests { ); } - fn assert_no_zero_balance_accounts(bank: &Arc) { - assert!(!bank.has_accounts_with_zero_lamports()); - } - // Test that purging 0 lamports accounts works. #[test] fn test_purge_empty_accounts() { @@ -2338,8 +2320,6 @@ mod tests { bank.purge_zero_lamport_accounts(); - assert_no_zero_balance_accounts(&bank); - let bank0 = Arc::new(new_from_parent(&bank)); let blockhash = bank.last_blockhash(); let keypair = Keypair::new(); @@ -2378,7 +2358,6 @@ mod tests { bank1.purge_zero_lamport_accounts(); assert!(bank1.verify_hash_internal_state()); - assert_no_zero_balance_accounts(&bank1); } #[test]