Avoid early clean and bad snapshot by ref-counting (#8724)

* Avoid early clean and bad snapshot by ref-counting

* Add measure

* Clean ups

* clean ups
This commit is contained in:
Ryo Onodera 2020-03-13 14:14:37 +09:00 committed by GitHub
parent 9a79be5ca0
commit 952cd38b7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 174 additions and 31 deletions

View File

@ -736,13 +736,17 @@ impl AccountsDB {
// Only keep purges where the entire history of the account in the root set
// can be purged. All AppendVecs for those updates are dead.
purges.retain(|_pubkey, account_infos| {
purges.retain(|pubkey, account_infos| {
let mut would_unref_count = 0;
for (_slot_id, account_info) in account_infos {
if *store_counts.get(&account_info.store_id).unwrap() != 0 {
if *store_counts.get(&account_info.store_id).unwrap() == 0 {
would_unref_count += 1;
} else {
return false;
}
}
true
would_unref_count == accounts_index.ref_count_from_storage(&pubkey)
});
// Recalculate reclaims with new purge set
@ -872,10 +876,10 @@ impl AccountsDB {
pubkey: &Pubkey,
) -> Option<(Account, Slot)> {
let (lock, index) = accounts_index.get(pubkey, ancestors)?;
let slot = lock[index].0;
let slot = lock.1[index].0;
//TODO: thread this as a ref
if let Some(slot_storage) = storage.0.get(&slot) {
let info = &lock[index].1;
let info = &lock.1[index].1;
slot_storage
.get(&info.store_id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
@ -1178,7 +1182,7 @@ impl AccountsDB {
.par_iter()
.filter_map(|pubkey| {
if let Some((list, index)) = accounts_index.get(pubkey, ancestors) {
let (slot, account_info) = &list[index];
let (slot, account_info) = &list.1[index];
if account_info.lamports != 0 {
storage
.0
@ -1353,6 +1357,20 @@ impl AccountsDB {
fn clean_dead_slots(&self, dead_slots: &mut HashSet<Slot>) {
if !dead_slots.is_empty() {
{
let mut measure = Measure::start("clean_dead_slots-ms");
let index = self.accounts_index.read().unwrap();
let storage = self.storage.read().unwrap();
for slot in dead_slots.iter() {
for store in storage.0.get(slot).unwrap().values() {
for account in store.accounts.accounts(0) {
index.unref_from_storage(&account.meta.pubkey);
}
}
}
drop(index);
measure.stop();
inc_new_counter_info!("clean_dead_slots-unref-ms", measure.as_ms() as usize);
let mut index = self.accounts_index.write().unwrap();
for slot in dead_slots.iter() {
index.clean_dead_slot(*slot);
@ -1499,7 +1517,7 @@ impl AccountsDB {
let mut counts = HashMap::new();
for slot_list in accounts_index.account_maps.values() {
for (_slot, account_entry) in slot_list.read().unwrap().iter() {
for (_slot, account_entry) in slot_list.read().unwrap().1.iter() {
*counts.entry(account_entry.store_id).or_insert(0) += 1;
}
}
@ -1526,6 +1544,7 @@ impl AccountsDB {
pub mod tests {
// TODO: all the bank tests are bank specific, issue: 2194
use super::*;
use crate::accounts_index::RefCount;
use crate::append_vec::AccountMeta;
use assert_matches::assert_matches;
use bincode::serialize_into;
@ -1988,7 +2007,7 @@ pub mod tests {
let id = {
let index = accounts.accounts_index.read().unwrap();
let (list, idx) = index.get(&pubkey, &ancestors).unwrap();
list[idx].1.store_id
list.1[idx].1.store_id
};
accounts.add_root(1);
@ -2018,6 +2037,13 @@ pub mod tests {
}
}
fn ref_count_for_pubkey(&self, pubkey: &Pubkey) -> RefCount {
self.accounts_index
.read()
.unwrap()
.ref_count_from_storage(&pubkey)
}
fn uncleaned_root_count(&self) -> usize {
self.accounts_index.read().unwrap().uncleaned_roots.len()
}
@ -2288,6 +2314,11 @@ pub mod tests {
assert_eq!((account.lamports, slot), (expected_lamports, slot));
}
fn assert_not_load_account(accounts: &AccountsDB, slot: Slot, pubkey: Pubkey) {
let ancestors = vec![(slot, 0)].into_iter().collect();
assert!(accounts.load_slow(&ancestors, &pubkey).is_none());
}
fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB {
let mut writer = Cursor::new(vec![]);
let snapshot_storages = accounts.get_snapshot_storages(slot);
@ -2372,6 +2403,7 @@ pub mod tests {
.unwrap()
.read()
.unwrap()
.1
.len(),
2
);
@ -3026,4 +3058,94 @@ pub mod tests {
assert_load_account(&accounts, current_slot, purged_pubkey1, 0);
assert_load_account(&accounts, current_slot, purged_pubkey2, 0);
}
#[test]
fn test_accounts_clean_after_snapshot_restore_then_old_revives() {
solana_logger::setup();
let old_lamport = 223;
let zero_lamport = 0;
let no_data = 0;
let dummy_lamport = 999999;
let owner = Account::default().owner;
let account = Account::new(old_lamport, no_data, &owner);
let account2 = Account::new(old_lamport + 100_001, no_data, &owner);
let account3 = Account::new(old_lamport + 100_002, no_data, &owner);
let dummy_account = Account::new(dummy_lamport, no_data, &owner);
let zero_lamport_account = Account::new(zero_lamport, no_data, &owner);
let pubkey1 = Pubkey::new_rand();
let pubkey2 = Pubkey::new_rand();
let dummy_pubkey = Pubkey::new_rand();
let mut current_slot = 0;
let accounts = AccountsDB::new_single();
// A: Initialize AccountsDB with pubkey1 and pubkey2
current_slot += 1;
accounts.store(current_slot, &[(&pubkey1, &account)]);
accounts.store(current_slot, &[(&pubkey2, &account)]);
accounts.add_root(current_slot);
// B: Test multiple updates to pubkey1 in a single slot/storage
current_slot += 1;
assert_eq!(0, accounts.store_count_for_slot(current_slot));
assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1));
accounts.store(current_slot, &[(&pubkey1, &account2)]);
accounts.store(current_slot, &[(&pubkey1, &account2)]);
assert_eq!(1, accounts.store_count_for_slot(current_slot));
assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1));
accounts.add_root(current_slot);
// C: Yet more update to trigger lazy clean of step A
current_slot += 1;
assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1));
accounts.store(current_slot, &[(&pubkey1, &account3)]);
assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1));
accounts.add_root(current_slot);
// D: Make pubkey1 0-lamport; also triggers clean of step B
current_slot += 1;
assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1));
accounts.store(current_slot, &[(&pubkey1, &zero_lamport_account)]);
assert_eq!(
3, /* == 4 - 2 + 1 */
accounts.ref_count_for_pubkey(&pubkey1)
);
accounts.add_root(current_slot);
// E: Avoid missing bank hash error
current_slot += 1;
accounts.store(current_slot, &[(&dummy_pubkey, &dummy_account)]);
accounts.add_root(current_slot);
assert_load_account(&accounts, current_slot, pubkey1, zero_lamport);
assert_load_account(&accounts, current_slot, pubkey2, old_lamport);
assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport);
// At this point, there is no index entries for A and B
// If step C and step D should be purged, snapshot restore would cause
// pubkey1 to be revived as the state of step A.
// So, prevent that from happening by introducing refcount
accounts.clean_accounts();
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
accounts.clean_accounts();
assert_load_account(&accounts, current_slot, pubkey1, zero_lamport);
assert_load_account(&accounts, current_slot, pubkey2, old_lamport);
assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport);
// F: Finally, make Step A cleanable
current_slot += 1;
accounts.store(current_slot, &[(&pubkey2, &account)]);
accounts.add_root(current_slot);
// Do clean
accounts.clean_accounts();
// Ensure pubkey2 is cleaned from the index finally
assert_not_load_account(&accounts, current_slot, pubkey1);
assert_load_account(&accounts, current_slot, pubkey2, old_lamport);
assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport);
}
}

View File

@ -6,10 +6,12 @@ use std::{
pub type Slot = u64;
type SlotList<T> = Vec<(Slot, T)>;
pub type RefCount = u64;
type AccountMapEntry<T> = (RefCount, SlotList<T>);
#[derive(Debug, Default)]
pub struct AccountsIndex<T> {
pub account_maps: HashMap<Pubkey, RwLock<SlotList<T>>>,
pub account_maps: HashMap<Pubkey, RwLock<AccountMapEntry<T>>>,
pub roots: HashSet<Slot>,
pub uncleaned_roots: HashSet<Slot>,
@ -22,7 +24,7 @@ impl<T: Clone> AccountsIndex<T> {
F: FnMut(&Pubkey, (&T, Slot)) -> (),
{
for (pubkey, list) in self.account_maps.iter() {
let list_r = list.read().unwrap();
let list_r = &list.read().unwrap().1;
if let Some(index) = self.latest_slot(ancestors, &list_r) {
func(pubkey, (&list_r[index].1, list_r[index].0));
}
@ -37,14 +39,14 @@ impl<T: Clone> AccountsIndex<T> {
}
pub fn would_purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> {
let list = self.account_maps.get(&pubkey).unwrap().read().unwrap();
let list = &self.account_maps.get(&pubkey).unwrap().read().unwrap().1;
self.get_rooted_entries(&list)
}
// filter any rooted entries and return them along with a bool that indicates
// if this account has no more entries.
pub fn purge(&self, pubkey: &Pubkey) -> (Vec<(Slot, T)>, bool) {
let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap();
let list = &mut self.account_maps.get(&pubkey).unwrap().write().unwrap().1;
let reclaims = self.get_rooted_entries(&list);
list.retain(|(slot, _)| !self.is_root(*slot));
(reclaims, list.is_empty())
@ -70,11 +72,12 @@ impl<T: Clone> AccountsIndex<T> {
&self,
pubkey: &Pubkey,
ancestors: &HashMap<Slot, usize>,
) -> Option<(RwLockReadGuard<SlotList<T>>, usize)> {
) -> Option<(RwLockReadGuard<AccountMapEntry<T>>, usize)> {
self.account_maps.get(pubkey).and_then(|list| {
let lock = list.read().unwrap();
let list_r = list.read().unwrap();
let lock = &list_r.1;
let found_index = self.latest_slot(ancestors, &lock)?;
Some((lock, found_index))
Some((list_r, found_index))
})
}
@ -98,7 +101,7 @@ impl<T: Clone> AccountsIndex<T> {
let _slot_vec = self
.account_maps
.entry(*pubkey)
.or_insert_with(|| RwLock::new(Vec::with_capacity(32)));
.or_insert_with(|| RwLock::new((0 as RefCount, Vec::with_capacity(32))));
self.update(slot, pubkey, account_info, reclaims);
}
@ -114,14 +117,15 @@ impl<T: Clone> AccountsIndex<T> {
reclaims: &mut Vec<(Slot, T)>,
) -> Option<T> {
if let Some(lock) = self.account_maps.get(pubkey) {
let mut slot_vec = lock.write().unwrap();
let slot_vec = &mut lock.write().unwrap();
// filter out other dirty entries
reclaims.extend(slot_vec.iter().filter(|(f, _)| *f == slot).cloned());
slot_vec.retain(|(f, _)| *f != slot);
reclaims.extend(slot_vec.1.iter().filter(|(f, _)| *f == slot).cloned());
slot_vec.1.retain(|(f, _)| *f != slot);
slot_vec.push((slot, account_info));
slot_vec.0 += 1 as RefCount;
slot_vec.1.push((slot, account_info));
// now, do lazy clean
self.purge_older_root_entries(&mut slot_vec, reclaims);
self.purge_older_root_entries(&mut slot_vec.1, reclaims);
None
} else {
@ -129,6 +133,23 @@ impl<T: Clone> AccountsIndex<T> {
}
}
pub fn unref_from_storage(&self, pubkey: &Pubkey) {
let locked_slot_vec = self.account_maps.get(pubkey);
if let Some(slot_vec) = locked_slot_vec {
let mut slot_vec = slot_vec.write().unwrap();
slot_vec.0 -= 1 as RefCount;
}
}
pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount {
let locked_slot_vec = self.account_maps.get(pubkey);
if let Some(slot_vec) = locked_slot_vec {
slot_vec.read().unwrap().0
} else {
0
}
}
fn purge_older_root_entries(
&self,
slot_vec: &mut Vec<(Slot, T)>,
@ -150,7 +171,7 @@ impl<T: Clone> AccountsIndex<T> {
pub fn clean_rooted_entries(&self, pubkey: &Pubkey, reclaims: &mut Vec<(Slot, T)>) {
if let Some(lock) = self.account_maps.get(pubkey) {
let mut slot_vec = lock.write().unwrap();
self.purge_older_root_entries(&mut slot_vec, reclaims);
self.purge_older_root_entries(&mut slot_vec.1, reclaims);
}
}
@ -158,8 +179,8 @@ impl<T: Clone> AccountsIndex<T> {
let entry = self
.account_maps
.entry(*pubkey)
.or_insert_with(|| RwLock::new(vec![]));
entry.write().unwrap().push((slot, account_info));
.or_insert_with(|| RwLock::new((1 as RefCount, vec![])));
entry.write().unwrap().1.push((slot, account_info));
}
pub fn can_purge(max_root: Slot, slot: Slot) -> bool {
@ -241,7 +262,7 @@ mod tests {
let ancestors = vec![(0, 0)].into_iter().collect();
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (0, true));
assert_eq!(list.1[idx], (0, true));
let mut num = 0;
let mut found_key = false;
@ -274,7 +295,7 @@ mod tests {
let ancestors = vec![].into_iter().collect();
index.add_root(0);
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (0, true));
assert_eq!(list.1[idx], (0, true));
}
#[test]
@ -317,14 +338,14 @@ mod tests {
index.insert(0, &key.pubkey(), true, &mut gc);
assert!(gc.is_empty());
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (0, true));
assert_eq!(list.1[idx], (0, true));
drop(list);
let mut gc = Vec::new();
index.insert(0, &key.pubkey(), false, &mut gc);
assert_eq!(gc, vec![(0, true)]);
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (0, false));
assert_eq!(list.1[idx], (0, false));
}
#[test]
@ -339,10 +360,10 @@ mod tests {
index.insert(1, &key.pubkey(), false, &mut gc);
assert!(gc.is_empty());
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (0, true));
assert_eq!(list.1[idx], (0, true));
let ancestors = vec![(1, 0)].into_iter().collect();
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (1, false));
assert_eq!(list.1[idx], (1, false));
}
#[test]
@ -362,7 +383,7 @@ mod tests {
assert_eq!(gc, vec![(0, true), (1, false), (2, true)]);
let ancestors = vec![].into_iter().collect();
let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap();
assert_eq!(list[idx], (3, true));
assert_eq!(list.1[idx], (3, true));
let mut num = 0;
let mut found_key = false;