when shrinking, don't send entire old append vec to clean (#27914)
This commit is contained in:
parent
cccade42b3
commit
d74480a125
|
@ -3526,7 +3526,11 @@ impl AccountsDb {
|
|||
let mut hashes = Vec::with_capacity(total_accounts_after_shrink);
|
||||
let mut write_versions = Vec::with_capacity(total_accounts_after_shrink);
|
||||
|
||||
let mut all_are_zero_lamports = true;
|
||||
for (pubkey, alive_account) in alive_accounts {
|
||||
if all_are_zero_lamports && alive_account.account.lamports() != 0 {
|
||||
all_are_zero_lamports = false;
|
||||
}
|
||||
accounts.push((pubkey, &alive_account.account));
|
||||
hashes.push(alive_account.account.hash);
|
||||
write_versions.push(alive_account.account.meta.write_version);
|
||||
|
@ -3557,9 +3561,19 @@ impl AccountsDb {
|
|||
|
||||
// Purge old, overwritten storage entries
|
||||
let mut start = Measure::start("write_storage_elapsed");
|
||||
let remaining_stores = self.mark_dirty_dead_stores(slot, &mut dead_storages, |store| {
|
||||
!store_ids.contains(&store.append_vec_id())
|
||||
});
|
||||
let remaining_stores = self.mark_dirty_dead_stores(
|
||||
slot,
|
||||
&mut dead_storages,
|
||||
|store| !store_ids.contains(&store.append_vec_id()),
|
||||
// If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty.
|
||||
// otherwise, we'll call 'add_uncleaned_pubkeys_after_shrink' just on the unref'd keys below.
|
||||
all_are_zero_lamports,
|
||||
);
|
||||
|
||||
if !all_are_zero_lamports {
|
||||
self.add_uncleaned_pubkeys_after_shrink(slot, unrefed_pubkeys.into_iter().cloned());
|
||||
}
|
||||
|
||||
if remaining_stores > 1 {
|
||||
inc_new_counter_info!("accounts_db_shrink_extra_stores", 1);
|
||||
info!(
|
||||
|
@ -3623,20 +3637,23 @@ impl AccountsDb {
|
|||
|
||||
/// get stores for 'slot'
|
||||
/// retain only the stores where 'should_retain(store)' == true
|
||||
/// for stores not retained, insert in 'dirty_stores' and 'dead_storages'
|
||||
/// for stores not retained, insert in 'dead_storages' and optionally 'dirty_stores'
|
||||
/// returns # of remaining stores for this slot
|
||||
pub(crate) fn mark_dirty_dead_stores(
|
||||
&self,
|
||||
slot: Slot,
|
||||
dead_storages: &mut Vec<Arc<AccountStorageEntry>>,
|
||||
should_retain: impl Fn(&AccountStorageEntry) -> bool,
|
||||
add_dirty_stores: bool,
|
||||
) -> usize {
|
||||
if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
|
||||
let mut list = slot_stores.write().unwrap();
|
||||
list.retain(|_key, store| {
|
||||
if !should_retain(store) {
|
||||
self.dirty_stores
|
||||
.insert((slot, store.append_vec_id()), store.clone());
|
||||
if add_dirty_stores {
|
||||
self.dirty_stores
|
||||
.insert((slot, store.append_vec_id()), store.clone());
|
||||
}
|
||||
dead_storages.push(store.clone());
|
||||
false
|
||||
} else {
|
||||
|
@ -4064,6 +4081,7 @@ impl AccountsDb {
|
|||
|
||||
let len = stored_accounts.len();
|
||||
let alive_accounts_collect = Mutex::new(Vec::with_capacity(len));
|
||||
let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
|
||||
self.shrink_ancient_stats
|
||||
.shrink_stats
|
||||
.accounts_loaded
|
||||
|
@ -4076,11 +4094,12 @@ impl AccountsDb {
|
|||
let skip = chunk * chunk_size;
|
||||
|
||||
let mut alive_accounts = Vec::with_capacity(chunk_size);
|
||||
let mut unrefed_pubkeys = Vec::with_capacity(chunk_size);
|
||||
let alive_total = self.load_accounts_index_for_shrink(
|
||||
&stored_accounts[skip..],
|
||||
chunk_size,
|
||||
&mut alive_accounts,
|
||||
None,
|
||||
Some(&mut unrefed_pubkeys),
|
||||
);
|
||||
|
||||
// collect
|
||||
|
@ -4088,6 +4107,10 @@ impl AccountsDb {
|
|||
.lock()
|
||||
.unwrap()
|
||||
.append(&mut alive_accounts);
|
||||
unrefed_pubkeys_collect
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push(unrefed_pubkeys);
|
||||
alive_total_collect.fetch_add(alive_total, Ordering::Relaxed);
|
||||
});
|
||||
});
|
||||
|
@ -4095,6 +4118,7 @@ impl AccountsDb {
|
|||
let mut create_and_insert_store_elapsed = 0;
|
||||
|
||||
let alive_accounts = alive_accounts_collect.into_inner().unwrap();
|
||||
let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap();
|
||||
let alive_total = alive_total_collect.load(Ordering::Relaxed);
|
||||
index_read_elapsed.stop();
|
||||
let aligned_total: u64 = Self::page_align(alive_total as u64);
|
||||
|
@ -4116,6 +4140,10 @@ impl AccountsDb {
|
|||
start.stop();
|
||||
let find_alive_elapsed = start.as_us();
|
||||
|
||||
let all_are_zero_lamports = !alive_accounts
|
||||
.iter()
|
||||
.any(|(_key, found)| !found.account.is_zero_lamport());
|
||||
|
||||
let mut ids = vec![ancient_store.append_vec_id()];
|
||||
// if this slot is not the ancient slot we're writing to, then this root will be dropped
|
||||
let mut drop_root = slot != ancient_slot;
|
||||
|
@ -4177,13 +4205,27 @@ impl AccountsDb {
|
|||
}
|
||||
rewrite_elapsed.stop();
|
||||
|
||||
let mut start = Measure::start("write_storage_elapsed");
|
||||
let mut start = Measure::start("mark_dirty_dead_stores");
|
||||
// Purge old, overwritten storage entries
|
||||
let mut dead_storages = vec![];
|
||||
self.mark_dirty_dead_stores(slot, &mut dead_storages, |store| {
|
||||
ids.contains(&store.append_vec_id())
|
||||
});
|
||||
self.mark_dirty_dead_stores(
|
||||
slot,
|
||||
&mut dead_storages,
|
||||
|store| ids.contains(&store.append_vec_id()),
|
||||
// If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty.
|
||||
// otherwise, we'll call 'add_uncleaned_pubkeys_after_shrink' just on the unref'd keys below.
|
||||
all_are_zero_lamports,
|
||||
);
|
||||
|
||||
if !all_are_zero_lamports {
|
||||
self.add_uncleaned_pubkeys_after_shrink(
|
||||
slot,
|
||||
unrefed_pubkeys.into_iter().flatten().cloned(),
|
||||
);
|
||||
}
|
||||
|
||||
start.stop();
|
||||
|
||||
let write_storage_elapsed = start.as_us();
|
||||
|
||||
self.drop_or_recycle_stores(dead_storages);
|
||||
|
@ -4281,6 +4323,39 @@ impl AccountsDb {
|
|||
self.shrink_ancient_stats.report();
|
||||
}
|
||||
|
||||
/// add all 'pubkeys' into the set of pubkeys that are 'uncleaned', associated with 'slot'
|
||||
/// clean will visit these pubkeys next time it runs
|
||||
fn add_uncleaned_pubkeys_after_shrink(
|
||||
&self,
|
||||
slot: Slot,
|
||||
pubkeys: impl Iterator<Item = Pubkey>,
|
||||
) {
|
||||
/*
|
||||
This is only called during 'shrink'-type operations.
|
||||
Original accounts were separated into 'accounts' and 'unrefed_pubkeys'.
|
||||
These sets correspond to 'alive' and 'dead'.
|
||||
'alive' means this account in this slot is in the accounts index.
|
||||
'dead' means this account in this slot is NOT in the accounts index.
|
||||
If dead, nobody will care if this version of this account is not written into the newly shrunk append vec for this slot.
|
||||
For all dead accounts, they were already unrefed and are now absent in the new append vec.
|
||||
This means that another version of this pubkey could possibly now be cleaned since this one is now gone.
|
||||
For example, a zero lamport account in a later slot can be removed if we just removed the only non-zero lamport account for that pubkey in this slot.
|
||||
So, for all unrefed accounts, send them to clean to be revisited next time clean runs.
|
||||
If an account is alive, then its status has not changed. It was previously alive in this slot. It is still alive in this slot.
|
||||
Clean doesn't care about alive accounts that remain alive.
|
||||
Except... A slightly different case is if ALL the alive accounts in this slot are zero lamport accounts, then it is possible that
|
||||
this slot can be marked dead. So, if all alive accounts are zero lamports, we send the entire OLD/pre-shrunk append vec
|
||||
to clean so that all the pubkeys are visited.
|
||||
It is a performance optimization to not send the ENTIRE old/pre-shrunk append vec to clean in the normal case.
|
||||
*/
|
||||
|
||||
let mut uncleaned_pubkeys = self
|
||||
.uncleaned_pubkeys
|
||||
.entry(slot)
|
||||
.or_insert_with(Vec::default);
|
||||
uncleaned_pubkeys.extend(pubkeys);
|
||||
}
|
||||
|
||||
pub fn shrink_candidate_slots(&self) -> usize {
|
||||
if !self.shrink_candidate_slots.lock().unwrap().is_empty() {
|
||||
// this can affect 'shrink_candidate_slots', so don't 'take' it until after this completes
|
||||
|
@ -16033,4 +16108,67 @@ pub mod tests {
|
|||
);
|
||||
assert_eq!(db.accounts_index.ref_count_from_storage(&pk1), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mark_dirty_dead_stores() {
|
||||
let db = AccountsDb::new_single_for_tests();
|
||||
let slot = 0;
|
||||
let called = AtomicUsize::default();
|
||||
let add_dirty_stores = false;
|
||||
let mut dead_storages = Vec::default();
|
||||
let remaining_stores = db.mark_dirty_dead_stores(
|
||||
slot,
|
||||
&mut dead_storages,
|
||||
|_store| {
|
||||
called.fetch_add(1, Ordering::Relaxed);
|
||||
false
|
||||
},
|
||||
add_dirty_stores,
|
||||
);
|
||||
assert_eq!(0, called.load(Ordering::Relaxed));
|
||||
assert_eq!(0, remaining_stores);
|
||||
|
||||
let size = 1;
|
||||
let inserted_store = db.create_and_insert_store(slot, size, "test");
|
||||
let remaining_stores = db.mark_dirty_dead_stores(
|
||||
slot,
|
||||
&mut dead_storages,
|
||||
|store| {
|
||||
assert_eq!(store.append_vec_id(), inserted_store.append_vec_id());
|
||||
called.fetch_add(1, Ordering::Relaxed);
|
||||
true // retain
|
||||
},
|
||||
add_dirty_stores,
|
||||
);
|
||||
assert_eq!(1, called.load(Ordering::Relaxed));
|
||||
assert_eq!(1, remaining_stores);
|
||||
|
||||
let called = AtomicUsize::default();
|
||||
let remaining_stores = db.mark_dirty_dead_stores(
|
||||
slot,
|
||||
&mut dead_storages,
|
||||
|store| {
|
||||
assert_eq!(store.append_vec_id(), inserted_store.append_vec_id());
|
||||
called.fetch_add(1, Ordering::Relaxed);
|
||||
false // don't retain
|
||||
},
|
||||
add_dirty_stores,
|
||||
);
|
||||
assert_eq!(1, called.load(Ordering::Relaxed));
|
||||
assert!(db
|
||||
.get_storages_for_slot(slot)
|
||||
.unwrap_or_default()
|
||||
.is_empty());
|
||||
assert_eq!(0, remaining_stores);
|
||||
assert!(db.dirty_stores.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_add_uncleaned_pubkeys_after_shrink() {
|
||||
let db = AccountsDb::new_single_for_tests();
|
||||
let slot = 0;
|
||||
let pubkey = Pubkey::new(&[1; 32]);
|
||||
db.add_uncleaned_pubkeys_after_shrink(slot, vec![pubkey].into_iter());
|
||||
assert_eq!(&*db.uncleaned_pubkeys.get(&slot).unwrap(), &vec![pubkey]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -388,6 +388,7 @@ impl<'a> SnapshotMinimizer<'a> {
|
|||
slot,
|
||||
&mut dead_storages.lock().unwrap(),
|
||||
|store| !append_vec_set.contains(&store.append_vec_id()),
|
||||
true, // add_dirty_stores
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue