diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index eac0d0c2f..794858a44 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -22,6 +22,7 @@ use crate::{ accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_index::{ AccountIndex, AccountsIndex, Ancestors, IndexKey, IsCached, SlotList, SlotSlice, + ZeroLamport, }, append_vec::{AppendVec, StoredAccountMeta, StoredMeta}, contains::Contains, @@ -144,6 +145,12 @@ impl IsCached for AccountInfo { } } +impl ZeroLamport for AccountInfo { + fn is_zero_lamport(&self) -> bool { + self.lamports == 0 + } +} + /// An offset into the AccountsDB::storage vector pub type AppendVecId = usize; pub type SnapshotStorage = Vec>; @@ -320,6 +327,16 @@ pub enum BankHashVerificationError { MismatchedTotalLamports(u64, u64), } +#[derive(Default)] +struct CleanKeyTimings { + collect_delta_keys_us: u64, + delta_insert_us: u64, + hashset_to_vec_us: u64, + zero_lamport_key_clone_us: u64, + delta_key_count: u64, + zero_lamport_count: u64, +} + /// Persistent storage structure holding the accounts #[derive(Debug)] pub struct AccountStorageEntry { @@ -628,6 +645,15 @@ pub struct AccountsDB { pub account_indexes: HashSet, pub caching_enabled: bool, + + /// Set of unique keys per slot which is used + /// to drive clean_accounts + /// Generated by get_accounts_delta_hash + uncleaned_pubkeys: DashMap>, + + /// Squashed down uncleaned_pubkeys and the highest slot it + /// is uncleaned at + squashed_uncleaned_pubkeys: RwLock<(HashSet, Slot)>, } #[derive(Debug, Default)] @@ -650,6 +676,7 @@ struct AccountsStats { store_get_slot_store: AtomicU64, store_find_existing: AtomicU64, dropped_stores: AtomicU64, + store_uncleaned_update: AtomicU64, } fn make_min_priority_thread_pool() -> ThreadPool { @@ -688,6 +715,8 @@ impl Default for AccountsDB { storage: AccountStorage::default(), accounts_cache: AccountsCache::default(), recycle_stores: RwLock::new(Vec::new()), + uncleaned_pubkeys: DashMap::new(), + squashed_uncleaned_pubkeys: RwLock::new((HashSet::new(), 0)), next_id: AtomicUsize::new(0), shrink_candidate_slots_v1: Mutex::new(Vec::new()), shrink_candidate_slots: Mutex::new(HashMap::new()), @@ -931,6 +960,100 @@ impl AccountsDB { reclaims } + fn squash_uncleaned_pubkeys(&self, max_slot: Slot) { + let (keys, max_slot_in_uncleaned_pubkeys) = + self.collect_uncleaned_pubkeys_to_slot(max_slot); + + let mut squashed_keys = self.squashed_uncleaned_pubkeys.write().unwrap(); + squashed_keys.0.extend(keys.into_iter().flatten()); + squashed_keys.1 = max_slot_in_uncleaned_pubkeys; + } + + fn collect_uncleaned_pubkeys_to_slot(&self, max_slot: Slot) -> (Vec>, Slot) { + let mut max_slot_in_uncleaned_pubkeys = 0; + let slots: Vec = self + .uncleaned_pubkeys + .iter() + .filter_map(|entry| { + let slot = entry.key(); + max_slot_in_uncleaned_pubkeys = max_slot_in_uncleaned_pubkeys.max(*slot); + if *slot <= max_slot { + Some(*slot) + } else { + None + } + }) + .collect(); + ( + slots + .into_iter() + .filter_map(|slot| { + let maybe_slot_keys = self.uncleaned_pubkeys.remove(&slot); + if self.accounts_index.is_root(slot) { + // Safe to unwrap on rooted slots since this is called from clean_accounts + // and only clean_accounts operates on rooted slots. purge_slots only + // operates on uncleaned_pubkeys + let (_slot, keys) = maybe_slot_keys.expect("Root slot should exist"); + Some(keys) + } else { + None + } + }) + .collect(), + max_slot_in_uncleaned_pubkeys, + ) + } + + // Construct a vec of pubkeys for cleaning from: + // uncleaned_pubkeys - the delta set of updated pubkeys in rooted slots from the last clean + // zero_lamport_pubkeys - set of all alive pubkeys containing 0-lamport updates + fn construct_candidate_clean_keys( + &self, + max_clean_root: Option, + timings: &mut CleanKeyTimings, + ) -> Vec { + let mut zero_lamport_key_clone = Measure::start("zero_lamport_key"); + let mut pubkeys = self.accounts_index.zero_lamport_pubkeys().clone(); + timings.zero_lamport_count = pubkeys.len() as u64; + zero_lamport_key_clone.stop(); + timings.zero_lamport_key_clone_us += zero_lamport_key_clone.as_us(); + + let mut collect_delta_keys = Measure::start("key_create"); + let max_slot = max_clean_root.unwrap_or_else(|| self.accounts_index.max_root()); + let (delta_keys, _max_slot) = self.collect_uncleaned_pubkeys_to_slot(max_slot); + collect_delta_keys.stop(); + timings.collect_delta_keys_us += collect_delta_keys.as_us(); + + let mut delta_insert = Measure::start("delta_insert"); + self.thread_pool_clean.install(|| { + delta_keys.par_iter().for_each(|keys| { + for key in keys { + pubkeys.insert(*key); + } + }); + }); + + { + let mut squashed_uncleaned_pubkeys = self.squashed_uncleaned_pubkeys.write().unwrap(); + if squashed_uncleaned_pubkeys.1 <= max_slot { + let squashed_pubkeys = std::mem::take(&mut squashed_uncleaned_pubkeys.0); + drop(squashed_uncleaned_pubkeys); + pubkeys.extend(squashed_pubkeys); + } + } + delta_insert.stop(); + timings.delta_insert_us += delta_insert.as_us(); + + timings.delta_key_count = pubkeys.len() as u64; + + let mut hashset_to_vec = Measure::start("flat_map"); + let pubkeys: Vec = pubkeys.into_iter().collect(); + hashset_to_vec.stop(); + timings.hashset_to_vec_us += hashset_to_vec.as_us(); + + pubkeys + } + // Purge zero lamport accounts and older rooted account states as garbage // collection // Only remove those accounts where the entire rooted history of the account @@ -951,15 +1074,11 @@ impl AccountsDB { let mut candidates_v1 = self.shrink_candidate_slots_v1.lock().unwrap(); self.report_store_stats(); + let mut key_timings = CleanKeyTimings::default(); + let pubkeys = self.construct_candidate_clean_keys(max_clean_root, &mut key_timings); + + let total_keys_count = pubkeys.len(); let mut accounts_scan = Measure::start("accounts_scan"); - let pubkeys: Vec = self - .accounts_index - .account_maps - .read() - .unwrap() - .keys() - .cloned() - .collect(); // parallel scan the index. let (mut purges, purges_in_root) = { self.thread_pool_clean.install(|| { @@ -972,7 +1091,8 @@ impl AccountsDB { if let Some((locked_entry, index)) = self.accounts_index.get(pubkey, None, max_clean_root) { - let (slot, account_info) = &locked_entry.slot_list()[index]; + let slot_list = locked_entry.slot_list(); + let (slot, account_info) = &slot_list[index]; if account_info.lamports == 0 { purges.insert( *pubkey, @@ -981,6 +1101,16 @@ impl AccountsDB { ); } + // prune zero_lamport_pubkey set which should contain all 0-lamport + // keys whether rooted or not. A 0-lamport update may become rooted + // in the future. + let has_zero_lamport_accounts = slot_list + .iter() + .any(|(_slot, account_info)| account_info.lamports == 0); + if !has_zero_lamport_accounts { + self.accounts_index.remove_zero_lamport_key(pubkey); + } + // Release the lock let slot = *slot; drop(locked_entry); @@ -993,6 +1123,12 @@ impl AccountsDB { } purges_in_root.push(*pubkey); } + } else { + let r_accounts_index = + self.accounts_index.account_maps.read().unwrap(); + if !r_accounts_index.contains_key(pubkey) { + self.accounts_index.remove_zero_lamport_key(pubkey); + } } } (purges, purges_in_root) @@ -1110,12 +1246,25 @@ impl AccountsDB { reclaims_time.stop(); datapoint_info!( "clean_accounts", + ( + "collect_delta_keys_us", + key_timings.collect_delta_keys_us, + i64 + ), + ( + "zero_lamport_key_clone_us", + key_timings.zero_lamport_key_clone_us, + i64 + ), ("accounts_scan", accounts_scan.as_us() as i64, i64), ("clean_old_rooted", clean_old_rooted.as_us() as i64, i64), ("store_counts", store_counts_time.as_us() as i64, i64), ("purge_filter", purge_filter.as_us() as i64, i64), ("calc_deps", calc_deps_time.as_us() as i64, i64), ("reclaims", reclaims_time.as_us() as i64, i64), + ("delta_key_count", key_timings.delta_key_count, i64), + ("zero_lamport_count", key_timings.zero_lamport_count, i64), + ("total_keys_count", total_keys_count, i64), ); } @@ -2466,6 +2615,9 @@ impl AccountsDB { // It should not be possible that a slot is neither in the cache or storage. Even in // a slot with all ticks, `Bank::new_from_parent()` immediately stores some sysvars // on bank creation. + + // Remove any delta pubkey set if existing. + self.uncleaned_pubkeys.remove(remove_slot); } remove_storages_elapsed.stop(); @@ -2862,6 +3014,8 @@ impl AccountsDB { self.accounts_cache.set_max_flush_root(*root); } + self.squash_uncleaned_pubkeys(self.accounts_cache.fetch_max_flush_root()); + // Only add to the uncleaned roots set *after* we've flushed the previous roots, // so that clean will actually be able to clean the slots. self.accounts_index.add_uncleaned_roots(cached_roots); @@ -3410,8 +3564,19 @@ impl AccountsDB { .map(|(pubkey, (_latest_write_version, hash))| (pubkey, hash, 0)) .collect(), }; + let dirty_keys = hashes + .iter() + .map(|(pubkey, _hash, _lamports)| *pubkey) + .collect(); let ret = Self::accumulate_account_hashes(hashes, slot, false); accumulate.stop(); + let mut uncleaned_time = Measure::start("uncleaned_index"); + self.uncleaned_pubkeys.insert(slot, dirty_keys); + uncleaned_time.stop(); + self.stats + .store_uncleaned_update + .fetch_add(uncleaned_time.as_us(), Ordering::Relaxed); + self.stats .delta_hash_scan_time_total_us .fetch_add(scan.as_us(), Ordering::Relaxed); @@ -3994,6 +4159,7 @@ impl AccountsDB { // Need to add these last, otherwise older updates will be cleaned for slot in slots { + self.get_accounts_delta_hash(slot); self.accounts_index.add_root(slot, false); } @@ -4287,6 +4453,7 @@ pub mod tests { } // adding root doesn't change anything + db.get_accounts_delta_hash(1); db.add_root(1); { let slot_0_stores = &db.storage.get_slot_stores(0).unwrap(); @@ -4668,6 +4835,7 @@ pub mod tests { .unwrap(); lock.slot_list()[idx].1.store_id }; + accounts.get_accounts_delta_hash(0); accounts.add_root(1); //slot is still there, since gc is lazy @@ -4683,6 +4851,9 @@ pub mod tests { //store causes clean accounts.store_uncached(1, &[(&pubkey, &account)]); + // generate delta state for slot 1, so clean operates on it. + accounts.get_accounts_delta_hash(1); + //slot is gone accounts.print_accounts_stats("pre-clean"); accounts.clean_accounts(None); @@ -4825,7 +4996,9 @@ pub mod tests { accounts.store_uncached(1, &[(&pubkey, &account)]); // simulate slots are rooted after while + accounts.get_accounts_delta_hash(0); accounts.add_root(0); + accounts.get_accounts_delta_hash(1); accounts.add_root(1); //even if rooted, old state isn't cleaned up @@ -4855,13 +5028,17 @@ pub mod tests { accounts.store_uncached(1, &[(&pubkey2, &normal_account)]); //simulate slots are rooted after while + accounts.get_accounts_delta_hash(0); accounts.add_root(0); + accounts.get_accounts_delta_hash(1); accounts.add_root(1); //even if rooted, old state isn't cleaned up assert_eq!(accounts.alive_account_count_in_slot(0), 2); assert_eq!(accounts.alive_account_count_in_slot(1), 2); + accounts.print_accounts_stats(""); + accounts.clean_accounts(None); //Old state behind zero-lamport account is cleaned up @@ -4903,8 +5080,11 @@ pub mod tests { accounts.store_uncached(2, &[(&pubkey2, &normal_account)]); //simulate slots are rooted after while + accounts.get_accounts_delta_hash(0); accounts.add_root(0); + accounts.get_accounts_delta_hash(1); accounts.add_root(1); + accounts.get_accounts_delta_hash(2); accounts.add_root(2); //even if rooted, old state isn't cleaned up @@ -5038,6 +5218,7 @@ pub mod tests { modify_accounts(&accounts, &pubkeys, 0, 100, 2); assert_eq!(check_storage(&accounts, 0, 100), true); check_accounts(&accounts, &pubkeys, 0, 100, 2); + accounts.get_accounts_delta_hash(0); accounts.add_root(0); let mut pubkeys1: Vec = vec![]; @@ -5056,6 +5237,7 @@ pub mod tests { // accounts create_account(&accounts, &mut pubkeys1, latest_slot, 10, 0, 0); + accounts.get_accounts_delta_hash(latest_slot); accounts.add_root(latest_slot); assert!(check_storage(&accounts, 1, 21)); @@ -5075,6 +5257,7 @@ pub mod tests { // 21 + 10 = 31 accounts create_account(&accounts, &mut pubkeys2, latest_slot, 10, 0, 0); + accounts.get_accounts_delta_hash(latest_slot); accounts.add_root(latest_slot); assert!(check_storage(&accounts, 2, 31)); @@ -6111,6 +6294,7 @@ pub mod tests { current_slot += 1; accounts.store_uncached(current_slot, &[(&pubkey1, &account)]); accounts.store_uncached(current_slot, &[(&pubkey2, &account)]); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); // B: Test multiple updates to pubkey1 in a single slot/storage @@ -6123,6 +6307,7 @@ pub mod tests { // Stores to same pubkey, same slot only count once towards the // ref count assert_eq!(2, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); // C: Yet more update to trigger lazy clean of step A @@ -6130,6 +6315,7 @@ pub mod tests { assert_eq!(2, accounts.ref_count_for_pubkey(&pubkey1)); accounts.store_uncached(current_slot, &[(&pubkey1, &account3)]); assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); // D: Make pubkey1 0-lamport; also triggers clean of step B @@ -6144,11 +6330,13 @@ pub mod tests { 3, /* == 3 - 1 + 1 */ accounts.ref_count_for_pubkey(&pubkey1) ); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); // E: Avoid missing bank hash error current_slot += 1; accounts.store_uncached(current_slot, &[(&dummy_pubkey, &dummy_account)]); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); assert_load_account(&accounts, current_slot, pubkey1, zero_lamport); @@ -6163,6 +6351,8 @@ pub mod tests { let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); accounts.clean_accounts(None); + info!("pubkey: {}", pubkey1); + accounts.print_accounts_stats("pre_clean"); assert_load_account(&accounts, current_slot, pubkey1, zero_lamport); assert_load_account(&accounts, current_slot, pubkey2, old_lamport); assert_load_account(&accounts, current_slot, dummy_pubkey, dummy_lamport); @@ -6170,6 +6360,7 @@ pub mod tests { // F: Finally, make Step A cleanable current_slot += 1; accounts.store_uncached(current_slot, &[(&pubkey2, &account)]); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); // Do clean @@ -6214,6 +6405,7 @@ pub mod tests { .collect::>() ); + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); assert_eq!( @@ -6224,6 +6416,7 @@ pub mod tests { ); current_slot += 1; + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); let slots = (0..6) @@ -6248,8 +6441,11 @@ pub mod tests { vec![] as Vec ); + accounts.get_accounts_delta_hash(0); accounts.add_root(0); + accounts.get_accounts_delta_hash(1); accounts.add_root(1); + accounts.get_accounts_delta_hash(2); accounts.add_root(2); accounts.reset_uncleaned_roots_v1(); @@ -6293,6 +6489,7 @@ pub mod tests { accounts.store_uncached(current_slot, &[(&pubkey, &account)]); } let shrink_slot = current_slot; + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); current_slot += 1; @@ -6302,6 +6499,7 @@ pub mod tests { for pubkey in updated_pubkeys { accounts.store_uncached(current_slot, &[(&pubkey, &account)]); } + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); accounts.clean_accounts(None); @@ -6359,6 +6557,7 @@ pub mod tests { accounts.store_uncached(current_slot, &[(&pubkey, &account)]); } let shrink_slot = current_slot; + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); current_slot += 1; @@ -6368,6 +6567,7 @@ pub mod tests { for pubkey in updated_pubkeys { accounts.store_uncached(current_slot, &[(&pubkey, &account)]); } + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); accounts.clean_accounts(None); @@ -6417,6 +6617,7 @@ pub mod tests { accounts.store_uncached(current_slot, &[(&pubkey, &account)]); } let shrink_slot = current_slot; + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); current_slot += 1; @@ -6426,6 +6627,7 @@ pub mod tests { for pubkey in updated_pubkeys { accounts.store_uncached(current_slot, &[(&pubkey, &account)]); } + accounts.get_accounts_delta_hash(current_slot); accounts.add_root(current_slot); accounts.clean_accounts(None); @@ -6783,7 +6985,9 @@ pub mod tests { // Store zero lamport account into slots 0 and 1, root both slots db.store_uncached(0, &[(&account_key, &zero_lamport_account)]); db.store_uncached(1, &[(&account_key, &zero_lamport_account)]); + db.get_accounts_delta_hash(0); db.add_root(0); + db.get_accounts_delta_hash(1); db.add_root(1); // Only clean zero lamport accounts up to slot 0 @@ -6959,6 +7163,14 @@ pub mod tests { } } + fn slot_stores(db: &AccountsDB, slot: Slot) -> Vec> { + if let Some(x) = db.storage.get_slot_stores(slot) { + x.read().unwrap().values().cloned().collect() + } else { + vec![] + } + } + #[test] fn test_flush_cache_clean() { let caching_enabled = true; @@ -7145,4 +7357,68 @@ pub mod tests { assert_eq!(before_size, after_size + account.stored_size); } } + + #[test] + fn test_partial_clean() { + solana_logger::setup(); + let db = AccountsDB::new(Vec::new(), &ClusterType::Development); + let account_key1 = Pubkey::new_unique(); + let account_key2 = Pubkey::new_unique(); + let account1 = Account::new(1, 0, &Account::default().owner); + let account2 = Account::new(2, 0, &Account::default().owner); + let account3 = Account::new(3, 0, &Account::default().owner); + let account4 = Account::new(4, 0, &Account::default().owner); + + // Store accounts into slots 0 and 1 + db.store_uncached(0, &[(&account_key1, &account1)]); + db.store_uncached(0, &[(&account_key2, &account1)]); + db.store_uncached(1, &[(&account_key1, &account2)]); + db.get_accounts_delta_hash(0); + db.get_accounts_delta_hash(1); + + db.print_accounts_stats("pre-clean1"); + + // clean accounts - no accounts should be cleaned, since no rooted slots + // + // Checking that the uncleaned_pubkeys are not pre-maturely removed + // such that when the slots are rooted, and can actually be cleaned, then the + // delta keys are still there. + db.clean_accounts(None); + + db.print_accounts_stats("post-clean1"); + // Check stores > 0 + assert!(!slot_stores(&db, 0).is_empty()); + assert!(!slot_stores(&db, 1).is_empty()); + + // root slot 0 + db.add_root(0); + + // store into slot 2 + db.store_uncached(2, &[(&account_key2, &account3)]); + db.store_uncached(2, &[(&account_key1, &account3)]); + db.get_accounts_delta_hash(2); + + db.clean_accounts(None); + db.print_accounts_stats("post-clean2"); + + // root slots 1 + db.add_root(1); + db.clean_accounts(None); + + db.print_accounts_stats("post-clean3"); + + db.store_uncached(3, &[(&account_key2, &account4)]); + db.get_accounts_delta_hash(3); + db.add_root(3); + + // Check that we can clean where max_root=3 and slot=2 is not rooted + db.clean_accounts(None); + + assert!(db.uncleaned_pubkeys.is_empty()); + + db.print_accounts_stats("post-clean4"); + + assert!(slot_stores(&db, 0).is_empty()); + assert!(!slot_stores(&db, 1).is_empty()); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 8ecfe75c0..d8da88a95 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -3,6 +3,7 @@ use crate::{ inline_spl_token_v2_0::{self, SPL_TOKEN_ACCOUNT_MINT_OFFSET, SPL_TOKEN_ACCOUNT_OWNER_OFFSET}, secondary_index::*, }; +use dashmap::DashSet; use ouroboros::self_referencing; use solana_sdk::{ clock::Slot, @@ -227,6 +228,10 @@ impl<'a, T: 'static + Clone> Iterator for AccountsIndexIterator<'a, T> { } } +pub trait ZeroLamport { + fn is_zero_lamport(&self) -> bool; +} + #[derive(Debug, Default)] pub struct AccountsIndex { pub account_maps: RwLock>>, @@ -235,9 +240,10 @@ pub struct AccountsIndex { spl_token_owner_index: SecondaryIndex, roots_tracker: RwLock, ongoing_scan_roots: RwLock>, + zero_lamport_pubkeys: DashSet, } -impl AccountsIndex { +impl AccountsIndex { fn iter(&self, range: Option) -> AccountsIndexIterator where R: RangeBounds, @@ -790,6 +796,9 @@ impl AccountsIndex { // - The secondary index is never consulted as primary source of truth for gets/stores. // So, what the accounts_index sees alone is sufficient as a source of truth for other non-scan // account operations. + if account_info.is_zero_lamport() { + self.zero_lamport_pubkeys.insert(*pubkey); + } w_account_entry.update(slot, account_info, reclaims); is_newly_inserted }; @@ -797,6 +806,14 @@ impl AccountsIndex { is_newly_inserted } + pub fn remove_zero_lamport_key(&self, pubkey: &Pubkey) { + self.zero_lamport_pubkeys.remove(pubkey); + } + + pub fn zero_lamport_pubkeys(&self) -> &DashSet { + &self.zero_lamport_pubkeys + } + pub fn unref_from_storage(&self, pubkey: &Pubkey) { if let Some(locked_entry) = self.get_account_read_entry(pubkey) { locked_entry.ref_count().fetch_sub(1, Ordering::Relaxed); @@ -908,7 +925,7 @@ impl AccountsIndex { w_roots_tracker.uncleaned_roots.extend(roots); } - fn max_root(&self) -> Slot { + pub fn max_root(&self) -> Slot { self.roots_tracker.read().unwrap().max_root } @@ -2082,4 +2099,16 @@ pub mod tests { &account_index, ); } + + impl ZeroLamport for bool { + fn is_zero_lamport(&self) -> bool { + false + } + } + + impl ZeroLamport for u64 { + fn is_zero_lamport(&self) -> bool { + false + } + } }