diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 07f9cc3b27..94cd9a3dfe 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -3526,7 +3526,11 @@ impl AccountsDb { let mut hashes = Vec::with_capacity(total_accounts_after_shrink); let mut write_versions = Vec::with_capacity(total_accounts_after_shrink); + let mut all_are_zero_lamports = true; for (pubkey, alive_account) in alive_accounts { + if all_are_zero_lamports && alive_account.account.lamports() != 0 { + all_are_zero_lamports = false; + } accounts.push((pubkey, &alive_account.account)); hashes.push(alive_account.account.hash); write_versions.push(alive_account.account.meta.write_version); @@ -3557,9 +3561,19 @@ impl AccountsDb { // Purge old, overwritten storage entries let mut start = Measure::start("write_storage_elapsed"); - let remaining_stores = self.mark_dirty_dead_stores(slot, &mut dead_storages, |store| { - !store_ids.contains(&store.append_vec_id()) - }); + let remaining_stores = self.mark_dirty_dead_stores( + slot, + &mut dead_storages, + |store| !store_ids.contains(&store.append_vec_id()), + // If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty. + // otherwise, we'll call 'add_uncleaned_pubkeys_after_shrink' just on the unref'd keys below. + all_are_zero_lamports, + ); + + if !all_are_zero_lamports { + self.add_uncleaned_pubkeys_after_shrink(slot, unrefed_pubkeys.into_iter().cloned()); + } + if remaining_stores > 1 { inc_new_counter_info!("accounts_db_shrink_extra_stores", 1); info!( @@ -3623,20 +3637,23 @@ impl AccountsDb { /// get stores for 'slot' /// retain only the stores where 'should_retain(store)' == true - /// for stores not retained, insert in 'dirty_stores' and 'dead_storages' + /// for stores not retained, insert in 'dead_storages' and optionally 'dirty_stores' /// returns # of remaining stores for this slot pub(crate) fn mark_dirty_dead_stores( &self, slot: Slot, dead_storages: &mut Vec>, should_retain: impl Fn(&AccountStorageEntry) -> bool, + add_dirty_stores: bool, ) -> usize { if let Some(slot_stores) = self.storage.get_slot_stores(slot) { let mut list = slot_stores.write().unwrap(); list.retain(|_key, store| { if !should_retain(store) { - self.dirty_stores - .insert((slot, store.append_vec_id()), store.clone()); + if add_dirty_stores { + self.dirty_stores + .insert((slot, store.append_vec_id()), store.clone()); + } dead_storages.push(store.clone()); false } else { @@ -4064,6 +4081,7 @@ impl AccountsDb { let len = stored_accounts.len(); let alive_accounts_collect = Mutex::new(Vec::with_capacity(len)); + let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len)); self.shrink_ancient_stats .shrink_stats .accounts_loaded @@ -4076,11 +4094,12 @@ impl AccountsDb { let skip = chunk * chunk_size; let mut alive_accounts = Vec::with_capacity(chunk_size); + let mut unrefed_pubkeys = Vec::with_capacity(chunk_size); let alive_total = self.load_accounts_index_for_shrink( &stored_accounts[skip..], chunk_size, &mut alive_accounts, - None, + Some(&mut unrefed_pubkeys), ); // collect @@ -4088,6 +4107,10 @@ impl AccountsDb { .lock() .unwrap() .append(&mut alive_accounts); + unrefed_pubkeys_collect + .lock() + .unwrap() + .push(unrefed_pubkeys); alive_total_collect.fetch_add(alive_total, Ordering::Relaxed); }); }); @@ -4095,6 +4118,7 @@ impl AccountsDb { let mut create_and_insert_store_elapsed = 0; let alive_accounts = alive_accounts_collect.into_inner().unwrap(); + let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap(); let alive_total = alive_total_collect.load(Ordering::Relaxed); index_read_elapsed.stop(); let aligned_total: u64 = Self::page_align(alive_total as u64); @@ -4116,6 +4140,10 @@ impl AccountsDb { start.stop(); let find_alive_elapsed = start.as_us(); + let all_are_zero_lamports = !alive_accounts + .iter() + .any(|(_key, found)| !found.account.is_zero_lamport()); + let mut ids = vec![ancient_store.append_vec_id()]; // if this slot is not the ancient slot we're writing to, then this root will be dropped let mut drop_root = slot != ancient_slot; @@ -4177,13 +4205,27 @@ impl AccountsDb { } rewrite_elapsed.stop(); - let mut start = Measure::start("write_storage_elapsed"); + let mut start = Measure::start("mark_dirty_dead_stores"); // Purge old, overwritten storage entries let mut dead_storages = vec![]; - self.mark_dirty_dead_stores(slot, &mut dead_storages, |store| { - ids.contains(&store.append_vec_id()) - }); + self.mark_dirty_dead_stores( + slot, + &mut dead_storages, + |store| ids.contains(&store.append_vec_id()), + // If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty. + // otherwise, we'll call 'add_uncleaned_pubkeys_after_shrink' just on the unref'd keys below. + all_are_zero_lamports, + ); + + if !all_are_zero_lamports { + self.add_uncleaned_pubkeys_after_shrink( + slot, + unrefed_pubkeys.into_iter().flatten().cloned(), + ); + } + start.stop(); + let write_storage_elapsed = start.as_us(); self.drop_or_recycle_stores(dead_storages); @@ -4281,6 +4323,39 @@ impl AccountsDb { self.shrink_ancient_stats.report(); } + /// add all 'pubkeys' into the set of pubkeys that are 'uncleaned', associated with 'slot' + /// clean will visit these pubkeys next time it runs + fn add_uncleaned_pubkeys_after_shrink( + &self, + slot: Slot, + pubkeys: impl Iterator, + ) { + /* + This is only called during 'shrink'-type operations. + Original accounts were separated into 'accounts' and 'unrefed_pubkeys'. + These sets correspond to 'alive' and 'dead'. + 'alive' means this account in this slot is in the accounts index. + 'dead' means this account in this slot is NOT in the accounts index. + If dead, nobody will care if this version of this account is not written into the newly shrunk append vec for this slot. + For all dead accounts, they were already unrefed and are now absent in the new append vec. + This means that another version of this pubkey could possibly now be cleaned since this one is now gone. + For example, a zero lamport account in a later slot can be removed if we just removed the only non-zero lamport account for that pubkey in this slot. + So, for all unrefed accounts, send them to clean to be revisited next time clean runs. + If an account is alive, then its status has not changed. It was previously alive in this slot. It is still alive in this slot. + Clean doesn't care about alive accounts that remain alive. + Except... A slightly different case is if ALL the alive accounts in this slot are zero lamport accounts, then it is possible that + this slot can be marked dead. So, if all alive accounts are zero lamports, we send the entire OLD/pre-shrunk append vec + to clean so that all the pubkeys are visited. + It is a performance optimization to not send the ENTIRE old/pre-shrunk append vec to clean in the normal case. + */ + + let mut uncleaned_pubkeys = self + .uncleaned_pubkeys + .entry(slot) + .or_insert_with(Vec::default); + uncleaned_pubkeys.extend(pubkeys); + } + pub fn shrink_candidate_slots(&self) -> usize { if !self.shrink_candidate_slots.lock().unwrap().is_empty() { // this can affect 'shrink_candidate_slots', so don't 'take' it until after this completes @@ -16033,4 +16108,67 @@ pub mod tests { ); assert_eq!(db.accounts_index.ref_count_from_storage(&pk1), 0); } + + #[test] + fn test_mark_dirty_dead_stores() { + let db = AccountsDb::new_single_for_tests(); + let slot = 0; + let called = AtomicUsize::default(); + let add_dirty_stores = false; + let mut dead_storages = Vec::default(); + let remaining_stores = db.mark_dirty_dead_stores( + slot, + &mut dead_storages, + |_store| { + called.fetch_add(1, Ordering::Relaxed); + false + }, + add_dirty_stores, + ); + assert_eq!(0, called.load(Ordering::Relaxed)); + assert_eq!(0, remaining_stores); + + let size = 1; + let inserted_store = db.create_and_insert_store(slot, size, "test"); + let remaining_stores = db.mark_dirty_dead_stores( + slot, + &mut dead_storages, + |store| { + assert_eq!(store.append_vec_id(), inserted_store.append_vec_id()); + called.fetch_add(1, Ordering::Relaxed); + true // retain + }, + add_dirty_stores, + ); + assert_eq!(1, called.load(Ordering::Relaxed)); + assert_eq!(1, remaining_stores); + + let called = AtomicUsize::default(); + let remaining_stores = db.mark_dirty_dead_stores( + slot, + &mut dead_storages, + |store| { + assert_eq!(store.append_vec_id(), inserted_store.append_vec_id()); + called.fetch_add(1, Ordering::Relaxed); + false // don't retain + }, + add_dirty_stores, + ); + assert_eq!(1, called.load(Ordering::Relaxed)); + assert!(db + .get_storages_for_slot(slot) + .unwrap_or_default() + .is_empty()); + assert_eq!(0, remaining_stores); + assert!(db.dirty_stores.is_empty()); + } + + #[test] + fn test_add_uncleaned_pubkeys_after_shrink() { + let db = AccountsDb::new_single_for_tests(); + let slot = 0; + let pubkey = Pubkey::new(&[1; 32]); + db.add_uncleaned_pubkeys_after_shrink(slot, vec![pubkey].into_iter()); + assert_eq!(&*db.uncleaned_pubkeys.get(&slot).unwrap(), &vec![pubkey]); + } } diff --git a/runtime/src/snapshot_minimizer.rs b/runtime/src/snapshot_minimizer.rs index 82c184aef3..6e68bdc4d0 100644 --- a/runtime/src/snapshot_minimizer.rs +++ b/runtime/src/snapshot_minimizer.rs @@ -388,6 +388,7 @@ impl<'a> SnapshotMinimizer<'a> { slot, &mut dead_storages.lock().unwrap(), |store| !append_vec_set.contains(&store.append_vec_id()), + true, // add_dirty_stores ); }