From 952cd38b7b9d3b1089176a8cf331ce0f05587446 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Fri, 13 Mar 2020 14:14:37 +0900 Subject: [PATCH] Avoid early clean and bad snapshot by ref-counting (#8724) * Avoid early clean and bad snapshot by ref-counting * Add measure * Clean ups * clean ups --- runtime/src/accounts_db.rs | 138 ++++++++++++++++++++++++++++++++-- runtime/src/accounts_index.rs | 67 +++++++++++------ 2 files changed, 174 insertions(+), 31 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 933c6d092..1dd9df726 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -736,13 +736,17 @@ impl AccountsDB { // Only keep purges where the entire history of the account in the root set // can be purged. All AppendVecs for those updates are dead. - purges.retain(|_pubkey, account_infos| { + purges.retain(|pubkey, account_infos| { + let mut would_unref_count = 0; for (_slot_id, account_info) in account_infos { - if *store_counts.get(&account_info.store_id).unwrap() != 0 { + if *store_counts.get(&account_info.store_id).unwrap() == 0 { + would_unref_count += 1; + } else { return false; } } - true + + would_unref_count == accounts_index.ref_count_from_storage(&pubkey) }); // Recalculate reclaims with new purge set @@ -872,10 +876,10 @@ impl AccountsDB { pubkey: &Pubkey, ) -> Option<(Account, Slot)> { let (lock, index) = accounts_index.get(pubkey, ancestors)?; - let slot = lock[index].0; + let slot = lock.1[index].0; //TODO: thread this as a ref if let Some(slot_storage) = storage.0.get(&slot) { - let info = &lock[index].1; + let info = &lock.1[index].1; slot_storage .get(&info.store_id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) @@ -1178,7 +1182,7 @@ impl AccountsDB { .par_iter() .filter_map(|pubkey| { if let Some((list, index)) = accounts_index.get(pubkey, ancestors) { - let (slot, account_info) = &list[index]; + let (slot, account_info) = &list.1[index]; if account_info.lamports != 0 { storage .0 @@ -1353,6 +1357,20 @@ impl AccountsDB { fn clean_dead_slots(&self, dead_slots: &mut HashSet) { if !dead_slots.is_empty() { { + let mut measure = Measure::start("clean_dead_slots-ms"); + let index = self.accounts_index.read().unwrap(); + let storage = self.storage.read().unwrap(); + for slot in dead_slots.iter() { + for store in storage.0.get(slot).unwrap().values() { + for account in store.accounts.accounts(0) { + index.unref_from_storage(&account.meta.pubkey); + } + } + } + drop(index); + measure.stop(); + inc_new_counter_info!("clean_dead_slots-unref-ms", measure.as_ms() as usize); + let mut index = self.accounts_index.write().unwrap(); for slot in dead_slots.iter() { index.clean_dead_slot(*slot); @@ -1499,7 +1517,7 @@ impl AccountsDB { let mut counts = HashMap::new(); for slot_list in accounts_index.account_maps.values() { - for (_slot, account_entry) in slot_list.read().unwrap().iter() { + for (_slot, account_entry) in slot_list.read().unwrap().1.iter() { *counts.entry(account_entry.store_id).or_insert(0) += 1; } } @@ -1526,6 +1544,7 @@ impl AccountsDB { pub mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use crate::accounts_index::RefCount; use crate::append_vec::AccountMeta; use assert_matches::assert_matches; use bincode::serialize_into; @@ -1988,7 +2007,7 @@ pub mod tests { let id = { let index = accounts.accounts_index.read().unwrap(); let (list, idx) = index.get(&pubkey, &ancestors).unwrap(); - list[idx].1.store_id + list.1[idx].1.store_id }; accounts.add_root(1); @@ -2018,6 +2037,13 @@ pub mod tests { } } + fn ref_count_for_pubkey(&self, pubkey: &Pubkey) -> RefCount { + self.accounts_index + .read() + .unwrap() + .ref_count_from_storage(&pubkey) + } + fn uncleaned_root_count(&self) -> usize { self.accounts_index.read().unwrap().uncleaned_roots.len() } @@ -2288,6 +2314,11 @@ pub mod tests { assert_eq!((account.lamports, slot), (expected_lamports, slot)); } + fn assert_not_load_account(accounts: &AccountsDB, slot: Slot, pubkey: Pubkey) { + let ancestors = vec![(slot, 0)].into_iter().collect(); + assert!(accounts.load_slow(&ancestors, &pubkey).is_none()); + } + fn reconstruct_accounts_db_via_serialization(accounts: &AccountsDB, slot: Slot) -> AccountsDB { let mut writer = Cursor::new(vec![]); let snapshot_storages = accounts.get_snapshot_storages(slot); @@ -2372,6 +2403,7 @@ pub mod tests { .unwrap() .read() .unwrap() + .1 .len(), 2 ); @@ -3026,4 +3058,94 @@ pub mod tests { assert_load_account(&accounts, current_slot, purged_pubkey1, 0); assert_load_account(&accounts, current_slot, purged_pubkey2, 0); } + + #[test] + fn test_accounts_clean_after_snapshot_restore_then_old_revives() { + solana_logger::setup(); + let old_lamport = 223; + let zero_lamport = 0; + let no_data = 0; + let dummy_lamport = 999999; + let owner = Account::default().owner; + + let account = Account::new(old_lamport, no_data, &owner); + let account2 = Account::new(old_lamport + 100_001, no_data, &owner); + let account3 = Account::new(old_lamport + 100_002, no_data, &owner); + let dummy_account = Account::new(dummy_lamport, no_data, &owner); + let zero_lamport_account = Account::new(zero_lamport, no_data, &owner); + + let pubkey1 = Pubkey::new_rand(); + let pubkey2 = Pubkey::new_rand(); + let dummy_pubkey = Pubkey::new_rand(); + + let mut current_slot = 0; + let accounts = AccountsDB::new_single(); + + // A: Initialize AccountsDB with pubkey1 and pubkey2 + current_slot += 1; + accounts.store(current_slot, &[(&pubkey1, &account)]); + accounts.store(current_slot, &[(&pubkey2, &account)]); + accounts.add_root(current_slot); + + // B: Test multiple updates to pubkey1 in a single slot/storage + current_slot += 1; + assert_eq!(0, accounts.store_count_for_slot(current_slot)); + assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.store(current_slot, &[(&pubkey1, &account2)]); + accounts.store(current_slot, &[(&pubkey1, &account2)]); + assert_eq!(1, accounts.store_count_for_slot(current_slot)); + assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.add_root(current_slot); + + // C: Yet more update to trigger lazy clean of step A + current_slot += 1; + assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.store(current_slot, &[(&pubkey1, &account3)]); + assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.add_root(current_slot); + + // D: Make pubkey1 0-lamport; also triggers clean of step B + current_slot += 1; + assert_eq!(4, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.store(current_slot, &[(&pubkey1, &zero_lamport_account)]); + assert_eq!( + 3, /* == 4 - 2 + 1 */ + accounts.ref_count_for_pubkey(&pubkey1) + ); + accounts.add_root(current_slot); + + // E: Avoid missing bank hash error + current_slot += 1; + accounts.store(current_slot, &[(&dummy_pubkey, &dummy_account)]); + accounts.add_root(current_slot); + + assert_load_account(&accounts, current_slot, pubkey1, zero_lamport); + assert_load_account(&accounts, current_slot, pubkey2, old_lamport); + assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport); + + // At this point, there is no index entries for A and B + // If step C and step D should be purged, snapshot restore would cause + // pubkey1 to be revived as the state of step A. + // So, prevent that from happening by introducing refcount + accounts.clean_accounts(); + let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); + accounts.clean_accounts(); + + assert_load_account(&accounts, current_slot, pubkey1, zero_lamport); + assert_load_account(&accounts, current_slot, pubkey2, old_lamport); + assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport); + + // F: Finally, make Step A cleanable + current_slot += 1; + accounts.store(current_slot, &[(&pubkey2, &account)]); + accounts.add_root(current_slot); + + // Do clean + accounts.clean_accounts(); + + // Ensure pubkey2 is cleaned from the index finally + assert_not_load_account(&accounts, current_slot, pubkey1); + assert_load_account(&accounts, current_slot, pubkey2, old_lamport); + assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index ff54880b4..d9d69f032 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -6,10 +6,12 @@ use std::{ pub type Slot = u64; type SlotList = Vec<(Slot, T)>; +pub type RefCount = u64; +type AccountMapEntry = (RefCount, SlotList); #[derive(Debug, Default)] pub struct AccountsIndex { - pub account_maps: HashMap>>, + pub account_maps: HashMap>>, pub roots: HashSet, pub uncleaned_roots: HashSet, @@ -22,7 +24,7 @@ impl AccountsIndex { F: FnMut(&Pubkey, (&T, Slot)) -> (), { for (pubkey, list) in self.account_maps.iter() { - let list_r = list.read().unwrap(); + let list_r = &list.read().unwrap().1; if let Some(index) = self.latest_slot(ancestors, &list_r) { func(pubkey, (&list_r[index].1, list_r[index].0)); } @@ -37,14 +39,14 @@ impl AccountsIndex { } pub fn would_purge(&self, pubkey: &Pubkey) -> Vec<(Slot, T)> { - let list = self.account_maps.get(&pubkey).unwrap().read().unwrap(); + let list = &self.account_maps.get(&pubkey).unwrap().read().unwrap().1; self.get_rooted_entries(&list) } // filter any rooted entries and return them along with a bool that indicates // if this account has no more entries. pub fn purge(&self, pubkey: &Pubkey) -> (Vec<(Slot, T)>, bool) { - let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap(); + let list = &mut self.account_maps.get(&pubkey).unwrap().write().unwrap().1; let reclaims = self.get_rooted_entries(&list); list.retain(|(slot, _)| !self.is_root(*slot)); (reclaims, list.is_empty()) @@ -70,11 +72,12 @@ impl AccountsIndex { &self, pubkey: &Pubkey, ancestors: &HashMap, - ) -> Option<(RwLockReadGuard>, usize)> { + ) -> Option<(RwLockReadGuard>, usize)> { self.account_maps.get(pubkey).and_then(|list| { - let lock = list.read().unwrap(); + let list_r = list.read().unwrap(); + let lock = &list_r.1; let found_index = self.latest_slot(ancestors, &lock)?; - Some((lock, found_index)) + Some((list_r, found_index)) }) } @@ -98,7 +101,7 @@ impl AccountsIndex { let _slot_vec = self .account_maps .entry(*pubkey) - .or_insert_with(|| RwLock::new(Vec::with_capacity(32))); + .or_insert_with(|| RwLock::new((0 as RefCount, Vec::with_capacity(32)))); self.update(slot, pubkey, account_info, reclaims); } @@ -114,14 +117,15 @@ impl AccountsIndex { reclaims: &mut Vec<(Slot, T)>, ) -> Option { if let Some(lock) = self.account_maps.get(pubkey) { - let mut slot_vec = lock.write().unwrap(); + let slot_vec = &mut lock.write().unwrap(); // filter out other dirty entries - reclaims.extend(slot_vec.iter().filter(|(f, _)| *f == slot).cloned()); - slot_vec.retain(|(f, _)| *f != slot); + reclaims.extend(slot_vec.1.iter().filter(|(f, _)| *f == slot).cloned()); + slot_vec.1.retain(|(f, _)| *f != slot); - slot_vec.push((slot, account_info)); + slot_vec.0 += 1 as RefCount; + slot_vec.1.push((slot, account_info)); // now, do lazy clean - self.purge_older_root_entries(&mut slot_vec, reclaims); + self.purge_older_root_entries(&mut slot_vec.1, reclaims); None } else { @@ -129,6 +133,23 @@ impl AccountsIndex { } } + pub fn unref_from_storage(&self, pubkey: &Pubkey) { + let locked_slot_vec = self.account_maps.get(pubkey); + if let Some(slot_vec) = locked_slot_vec { + let mut slot_vec = slot_vec.write().unwrap(); + slot_vec.0 -= 1 as RefCount; + } + } + + pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount { + let locked_slot_vec = self.account_maps.get(pubkey); + if let Some(slot_vec) = locked_slot_vec { + slot_vec.read().unwrap().0 + } else { + 0 + } + } + fn purge_older_root_entries( &self, slot_vec: &mut Vec<(Slot, T)>, @@ -150,7 +171,7 @@ impl AccountsIndex { pub fn clean_rooted_entries(&self, pubkey: &Pubkey, reclaims: &mut Vec<(Slot, T)>) { if let Some(lock) = self.account_maps.get(pubkey) { let mut slot_vec = lock.write().unwrap(); - self.purge_older_root_entries(&mut slot_vec, reclaims); + self.purge_older_root_entries(&mut slot_vec.1, reclaims); } } @@ -158,8 +179,8 @@ impl AccountsIndex { let entry = self .account_maps .entry(*pubkey) - .or_insert_with(|| RwLock::new(vec![])); - entry.write().unwrap().push((slot, account_info)); + .or_insert_with(|| RwLock::new((1 as RefCount, vec![]))); + entry.write().unwrap().1.push((slot, account_info)); } pub fn can_purge(max_root: Slot, slot: Slot) -> bool { @@ -241,7 +262,7 @@ mod tests { let ancestors = vec![(0, 0)].into_iter().collect(); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (0, true)); + assert_eq!(list.1[idx], (0, true)); let mut num = 0; let mut found_key = false; @@ -274,7 +295,7 @@ mod tests { let ancestors = vec![].into_iter().collect(); index.add_root(0); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (0, true)); + assert_eq!(list.1[idx], (0, true)); } #[test] @@ -317,14 +338,14 @@ mod tests { index.insert(0, &key.pubkey(), true, &mut gc); assert!(gc.is_empty()); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (0, true)); + assert_eq!(list.1[idx], (0, true)); drop(list); let mut gc = Vec::new(); index.insert(0, &key.pubkey(), false, &mut gc); assert_eq!(gc, vec![(0, true)]); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (0, false)); + assert_eq!(list.1[idx], (0, false)); } #[test] @@ -339,10 +360,10 @@ mod tests { index.insert(1, &key.pubkey(), false, &mut gc); assert!(gc.is_empty()); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (0, true)); + assert_eq!(list.1[idx], (0, true)); let ancestors = vec![(1, 0)].into_iter().collect(); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (1, false)); + assert_eq!(list.1[idx], (1, false)); } #[test] @@ -362,7 +383,7 @@ mod tests { assert_eq!(gc, vec![(0, true), (1, false), (2, true)]); let ancestors = vec![].into_iter().collect(); let (list, idx) = index.get(&key.pubkey(), &ancestors).unwrap(); - assert_eq!(list[idx], (3, true)); + assert_eq!(list.1[idx], (3, true)); let mut num = 0; let mut found_key = false;