clean race condition with extra unref (#27682)
* clean race condition with extra unref * clarify comment * fix test * cleanup test * clippy
This commit is contained in:
parent
abd01553d5
commit
1811d684b5
|
@ -424,6 +424,7 @@ pub(crate) type SlotStores = Arc<RwLock<HashMap<AppendVecId, Arc<AccountStorageE
|
|||
type AccountSlots = HashMap<Pubkey, HashSet<Slot>>;
|
||||
type AppendVecOffsets = HashMap<AppendVecId, HashSet<usize>>;
|
||||
type ReclaimResult = (AccountSlots, AppendVecOffsets);
|
||||
type PubkeysRemovedFromAccountsIndex = HashSet<Pubkey>;
|
||||
type ShrinkCandidates = HashMap<Slot, HashMap<AppendVecId, Arc<AccountStorageEntry>>>;
|
||||
|
||||
trait Versioned {
|
||||
|
@ -2170,21 +2171,28 @@ impl AccountsDb {
|
|||
.expect("Cluster type must be set at initialization")
|
||||
}
|
||||
|
||||
/// Reclaim older states of accounts older than max_clean_root_inclusive for AccountsDb bloat mitigation
|
||||
/// Reclaim older states of accounts older than max_clean_root_inclusive for AccountsDb bloat mitigation.
|
||||
/// Any accounts which are removed from the accounts index are returned in PubkeysRemovedFromAccountsIndex.
|
||||
/// These should NOT be unref'd later from the accounts index.
|
||||
fn clean_accounts_older_than_root(
|
||||
&self,
|
||||
purges: Vec<Pubkey>,
|
||||
max_clean_root_inclusive: Option<Slot>,
|
||||
ancient_account_cleans: &AtomicU64,
|
||||
) -> ReclaimResult {
|
||||
) -> (ReclaimResult, PubkeysRemovedFromAccountsIndex) {
|
||||
let pubkeys_removed_from_accounts_index = HashSet::default();
|
||||
if purges.is_empty() {
|
||||
return ReclaimResult::default();
|
||||
return (
|
||||
ReclaimResult::default(),
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
}
|
||||
// This number isn't carefully chosen; just guessed randomly such that
|
||||
// the hot loop will be the order of ~Xms.
|
||||
const INDEX_CLEAN_BULK_COUNT: usize = 4096;
|
||||
|
||||
let one_epoch_old = self.get_accounts_hash_complete_one_epoch_old();
|
||||
let pubkeys_removed_from_accounts_index = Mutex::new(pubkeys_removed_from_accounts_index);
|
||||
|
||||
let mut clean_rooted = Measure::start("clean_old_root-ms");
|
||||
let reclaim_vecs = purges
|
||||
|
@ -2192,12 +2200,19 @@ impl AccountsDb {
|
|||
.filter_map(|pubkeys: &[Pubkey]| {
|
||||
let mut reclaims = Vec::new();
|
||||
for pubkey in pubkeys {
|
||||
self.accounts_index.clean_rooted_entries(
|
||||
let removed_from_index = self.accounts_index.clean_rooted_entries(
|
||||
pubkey,
|
||||
&mut reclaims,
|
||||
max_clean_root_inclusive,
|
||||
);
|
||||
if removed_from_index {
|
||||
pubkeys_removed_from_accounts_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(*pubkey);
|
||||
}
|
||||
}
|
||||
|
||||
(!reclaims.is_empty()).then(|| {
|
||||
// figure out how many ancient accounts have been reclaimed
|
||||
let old_reclaims = reclaims
|
||||
|
@ -2210,6 +2225,8 @@ impl AccountsDb {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
clean_rooted.stop();
|
||||
let pubkeys_removed_from_accounts_index =
|
||||
pubkeys_removed_from_accounts_index.into_inner().unwrap();
|
||||
self.clean_accounts_stats
|
||||
.clean_old_root_us
|
||||
.fetch_add(clean_rooted.as_us(), Ordering::Relaxed);
|
||||
|
@ -2226,13 +2243,14 @@ impl AccountsDb {
|
|||
None,
|
||||
Some((&self.clean_accounts_stats.purge_stats, &mut reclaim_result)),
|
||||
reset_accounts,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
measure.stop();
|
||||
debug!("{} {}", clean_rooted, measure);
|
||||
self.clean_accounts_stats
|
||||
.clean_old_root_reclaim_us
|
||||
.fetch_add(measure.as_us(), Ordering::Relaxed);
|
||||
reclaim_result
|
||||
(reclaim_result, pubkeys_removed_from_accounts_index)
|
||||
}
|
||||
|
||||
fn do_reset_uncleaned_roots(&self, max_clean_root: Option<Slot>) {
|
||||
|
@ -2368,10 +2386,11 @@ impl AccountsDb {
|
|||
self.sender_bg_hasher = Some(sender);
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub(crate) fn purge_keys_exact<'a, C: 'a>(
|
||||
&'a self,
|
||||
pubkey_to_slot_set: impl Iterator<Item = &'a (Pubkey, C)>,
|
||||
) -> Vec<(Slot, AccountInfo)>
|
||||
) -> (Vec<(Slot, AccountInfo)>, PubkeysRemovedFromAccountsIndex)
|
||||
where
|
||||
C: Contains<'a, Slot>,
|
||||
{
|
||||
|
@ -2387,9 +2406,10 @@ impl AccountsDb {
|
|||
}
|
||||
}
|
||||
|
||||
self.accounts_index
|
||||
let pubkeys_removed_from_accounts_index = self
|
||||
.accounts_index
|
||||
.handle_dead_keys(&dead_keys, &self.account_indexes);
|
||||
reclaims
|
||||
(reclaims, pubkeys_removed_from_accounts_index)
|
||||
}
|
||||
|
||||
fn max_clean_root(&self, proposed_clean_root: Option<Slot>) -> Option<Slot> {
|
||||
|
@ -2823,11 +2843,12 @@ impl AccountsDb {
|
|||
accounts_scan.stop();
|
||||
|
||||
let mut clean_old_rooted = Measure::start("clean_old_roots");
|
||||
let (purged_account_slots, removed_accounts) = self.clean_accounts_older_than_root(
|
||||
purges_old_accounts,
|
||||
max_clean_root_inclusive,
|
||||
&ancient_account_cleans,
|
||||
);
|
||||
let ((purged_account_slots, removed_accounts), mut pubkeys_removed_from_accounts_index) =
|
||||
self.clean_accounts_older_than_root(
|
||||
purges_old_accounts,
|
||||
max_clean_root_inclusive,
|
||||
&ancient_account_cleans,
|
||||
);
|
||||
|
||||
if self.caching_enabled {
|
||||
self.do_reset_uncleaned_roots(max_clean_root_inclusive);
|
||||
|
@ -2924,7 +2945,10 @@ impl AccountsDb {
|
|||
})
|
||||
.collect();
|
||||
|
||||
let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter());
|
||||
let (reclaims, pubkeys_removed_from_accounts_index2) =
|
||||
self.purge_keys_exact(pubkey_to_slot_set.iter());
|
||||
pubkeys_removed_from_accounts_index
|
||||
.extend(pubkeys_removed_from_accounts_index2.into_iter());
|
||||
|
||||
// Don't reset from clean, since the pubkeys in those stores may need to be unref'ed
|
||||
// and those stores may be used for background hashing.
|
||||
|
@ -2935,6 +2959,7 @@ impl AccountsDb {
|
|||
None,
|
||||
Some((&self.clean_accounts_stats.purge_stats, &mut reclaim_result)),
|
||||
reset_accounts,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
|
||||
reclaims_time.stop();
|
||||
|
@ -2950,6 +2975,11 @@ impl AccountsDb {
|
|||
i64
|
||||
),
|
||||
("oldest_dirty_slot", key_timings.oldest_dirty_slot, i64),
|
||||
(
|
||||
"pubkeys_removed_from_accounts_index",
|
||||
pubkeys_removed_from_accounts_index.len(),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"dirty_store_processing_us",
|
||||
key_timings.dirty_store_processing_us,
|
||||
|
@ -3082,12 +3112,15 @@ impl AccountsDb {
|
|||
/// * `reset_accounts` - Reset the append_vec store when the store is dead (count==0)
|
||||
/// From the clean and shrink paths it should be false since there may be an in-progress
|
||||
/// hash operation and the stores may hold accounts that need to be unref'ed.
|
||||
/// * `pubkeys_removed_from_accounts_index` - These keys have already been removed from the accounts index
|
||||
/// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||
fn handle_reclaims<'a, I>(
|
||||
&'a self,
|
||||
reclaims: Option<I>,
|
||||
expected_single_dead_slot: Option<Slot>,
|
||||
purge_stats_and_reclaim_result: Option<(&PurgeStats, &mut ReclaimResult)>,
|
||||
reset_accounts: bool,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) where
|
||||
I: Iterator<Item = &'a (Slot, AccountInfo)>,
|
||||
{
|
||||
|
@ -3122,7 +3155,12 @@ impl AccountsDb {
|
|||
}
|
||||
}
|
||||
|
||||
self.process_dead_slots(&dead_slots, purged_account_slots, purge_stats);
|
||||
self.process_dead_slots(
|
||||
&dead_slots,
|
||||
purged_account_slots,
|
||||
purge_stats,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
} else {
|
||||
assert!(dead_slots.is_empty());
|
||||
}
|
||||
|
@ -3207,17 +3245,24 @@ impl AccountsDb {
|
|||
|
||||
// Must be kept private!, does sensitive cleanup that should only be called from
|
||||
// supported pipelines in AccountsDb
|
||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||
fn process_dead_slots(
|
||||
&self,
|
||||
dead_slots: &HashSet<Slot>,
|
||||
purged_account_slots: Option<&mut AccountSlots>,
|
||||
purge_stats: &PurgeStats,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
if dead_slots.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut clean_dead_slots = Measure::start("reclaims::clean_dead_slots");
|
||||
self.clean_stored_dead_slots(dead_slots, purged_account_slots);
|
||||
self.clean_stored_dead_slots(
|
||||
dead_slots,
|
||||
purged_account_slots,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
clean_dead_slots.stop();
|
||||
|
||||
let mut purge_removed_slots = Measure::start("reclaims::purge_removed_slots");
|
||||
|
@ -5396,7 +5441,13 @@ impl AccountsDb {
|
|||
(*account.key(), purged_slot)
|
||||
})
|
||||
.collect();
|
||||
self.purge_slot_cache_pubkeys(purged_slot, purged_slot_pubkeys, pubkey_to_slot_set, true);
|
||||
self.purge_slot_cache_pubkeys(
|
||||
purged_slot,
|
||||
purged_slot_pubkeys,
|
||||
pubkey_to_slot_set,
|
||||
true,
|
||||
&HashSet::default(),
|
||||
);
|
||||
}
|
||||
|
||||
fn purge_slot_cache_pubkeys(
|
||||
|
@ -5405,17 +5456,19 @@ impl AccountsDb {
|
|||
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
||||
pubkey_to_slot_set: Vec<(Pubkey, Slot)>,
|
||||
is_dead: bool,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
// Slot purged from cache should not exist in the backing store
|
||||
assert!(self.storage.get_slot_stores(purged_slot).is_none());
|
||||
let num_purged_keys = pubkey_to_slot_set.len();
|
||||
let reclaims = self.purge_keys_exact(pubkey_to_slot_set.iter());
|
||||
let (reclaims, _) = self.purge_keys_exact(pubkey_to_slot_set.iter());
|
||||
assert_eq!(reclaims.len(), num_purged_keys);
|
||||
if is_dead {
|
||||
self.remove_dead_slots_metadata(
|
||||
std::iter::once(&purged_slot),
|
||||
purged_slot_pubkeys,
|
||||
None,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -5445,7 +5498,7 @@ impl AccountsDb {
|
|||
.fetch_add(scan_storages_elasped.as_us(), Ordering::Relaxed);
|
||||
|
||||
let mut purge_accounts_index_elapsed = Measure::start("purge_accounts_index_elapsed");
|
||||
let reclaims = match scan_result {
|
||||
let (reclaims, pubkeys_removed_from_accounts_index) = match scan_result {
|
||||
ScanStorageResult::Cached(_) => {
|
||||
panic!("Should not see cached keys in this `else` branch, since we checked this slot did not exist in the cache above");
|
||||
}
|
||||
|
@ -5469,6 +5522,7 @@ impl AccountsDb {
|
|||
expected_dead_slot,
|
||||
Some((purge_stats, &mut ReclaimResult::default())),
|
||||
false,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
handle_reclaims_elapsed.stop();
|
||||
purge_stats
|
||||
|
@ -6022,7 +6076,13 @@ impl AccountsDb {
|
|||
let is_dead_slot = accounts.is_empty();
|
||||
// Remove the account index entries from earlier roots that are outdated by later roots.
|
||||
// Safe because queries to the index will be reading updates from later roots.
|
||||
self.purge_slot_cache_pubkeys(slot, purged_slot_pubkeys, pubkey_to_slot_set, is_dead_slot);
|
||||
self.purge_slot_cache_pubkeys(
|
||||
slot,
|
||||
purged_slot_pubkeys,
|
||||
pubkey_to_slot_set,
|
||||
is_dead_slot,
|
||||
&HashSet::default(),
|
||||
);
|
||||
|
||||
if !is_dead_slot {
|
||||
let aligned_total_size = Self::page_align(total_size);
|
||||
|
@ -7710,18 +7770,22 @@ impl AccountsDb {
|
|||
dead_slots
|
||||
}
|
||||
|
||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||
fn remove_dead_slots_metadata<'a>(
|
||||
&'a self,
|
||||
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
|
||||
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
||||
// Should only be `Some` for non-cached slots
|
||||
purged_stored_account_slots: Option<&mut AccountSlots>,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
let mut measure = Measure::start("remove_dead_slots_metadata-ms");
|
||||
self.clean_dead_slots_from_accounts_index(
|
||||
dead_slots_iter.clone(),
|
||||
purged_slot_pubkeys,
|
||||
purged_stored_account_slots,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
{
|
||||
let mut bank_hashes = self.bank_hashes.write().unwrap();
|
||||
|
@ -7735,10 +7799,13 @@ impl AccountsDb {
|
|||
|
||||
/// lookup each pubkey in 'purged_slot_pubkeys' and unref it in the accounts index
|
||||
/// populate 'purged_stored_account_slots' by grouping 'purged_slot_pubkeys' by pubkey
|
||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||
fn unref_accounts(
|
||||
&self,
|
||||
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
||||
purged_stored_account_slots: &mut AccountSlots,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
let len = purged_slot_pubkeys.len();
|
||||
let batches = 1 + (len / UNREF_ACCOUNTS_BATCH_SIZE);
|
||||
|
@ -7750,11 +7817,19 @@ impl AccountsDb {
|
|||
.iter()
|
||||
.skip(skip)
|
||||
.take(UNREF_ACCOUNTS_BATCH_SIZE)
|
||||
.map(|(_slot, pubkey)| pubkey),
|
||||
|_pubkey, _slots_refs| /* unused */AccountsIndexScanResult::Unref,
|
||||
.filter_map(|(_slot, pubkey)| {
|
||||
// filter out pubkeys that have already been removed from the accounts index in a previous step
|
||||
let already_removed =
|
||||
pubkeys_removed_from_accounts_index.contains(pubkey);
|
||||
(!already_removed).then_some(pubkey)
|
||||
}),
|
||||
|_pubkey, _slots_refs| {
|
||||
/* unused */
|
||||
AccountsIndexScanResult::Unref
|
||||
},
|
||||
Some(AccountsIndexScanResult::Unref),
|
||||
)
|
||||
})
|
||||
});
|
||||
});
|
||||
for (slot, pubkey) in purged_slot_pubkeys {
|
||||
purged_stored_account_slots
|
||||
|
@ -7764,17 +7839,24 @@ impl AccountsDb {
|
|||
}
|
||||
}
|
||||
|
||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||
fn clean_dead_slots_from_accounts_index<'a>(
|
||||
&'a self,
|
||||
dead_slots_iter: impl Iterator<Item = &'a Slot> + Clone,
|
||||
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
||||
// Should only be `Some` for non-cached slots
|
||||
purged_stored_account_slots: Option<&mut AccountSlots>,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
let mut accounts_index_root_stats = AccountsIndexRootsStats::default();
|
||||
let mut measure = Measure::start("unref_from_storage");
|
||||
if let Some(purged_stored_account_slots) = purged_stored_account_slots {
|
||||
self.unref_accounts(purged_slot_pubkeys, purged_stored_account_slots);
|
||||
self.unref_accounts(
|
||||
purged_slot_pubkeys,
|
||||
purged_stored_account_slots,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
}
|
||||
measure.stop();
|
||||
accounts_index_root_stats.clean_unref_from_storage_us += measure.as_us();
|
||||
|
@ -7809,10 +7891,13 @@ impl AccountsDb {
|
|||
.update(&accounts_index_root_stats);
|
||||
}
|
||||
|
||||
// pubkeys_removed_from_accounts_index - These keys have already been removed from the accounts index
|
||||
// and should not be unref'd. If they exist in the accounts index, they are NEW.
|
||||
fn clean_stored_dead_slots(
|
||||
&self,
|
||||
dead_slots: &HashSet<Slot>,
|
||||
purged_account_slots: Option<&mut AccountSlots>,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
let mut measure = Measure::start("clean_stored_dead_slots-ms");
|
||||
let mut stores: Vec<Arc<AccountStorageEntry>> = vec![];
|
||||
|
@ -7845,6 +7930,7 @@ impl AccountsDb {
|
|||
dead_slots.iter(),
|
||||
purged_slot_pubkeys,
|
||||
purged_account_slots,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
measure.stop();
|
||||
inc_new_counter_info!("clean_stored_dead_slots-ms", measure.as_ms() as usize);
|
||||
|
@ -8183,6 +8269,7 @@ impl AccountsDb {
|
|||
expected_single_dead_slot,
|
||||
None,
|
||||
reset_accounts,
|
||||
&HashSet::default(),
|
||||
);
|
||||
handle_reclaims_time.stop();
|
||||
self.stats
|
||||
|
@ -11708,7 +11795,7 @@ pub mod tests {
|
|||
|
||||
let slots: HashSet<Slot> = vec![1].into_iter().collect();
|
||||
let purge_keys = vec![(key1, slots)];
|
||||
db.purge_keys_exact(purge_keys.iter());
|
||||
let _ = db.purge_keys_exact(purge_keys.iter());
|
||||
|
||||
let account2 = AccountSharedData::new(3, 0, &key);
|
||||
db.store_uncached(2, &[(&key1, &account2)]);
|
||||
|
@ -12616,7 +12703,7 @@ pub mod tests {
|
|||
let accounts = AccountsDb::new_single_for_tests();
|
||||
let mut dead_slots = HashSet::new();
|
||||
dead_slots.insert(10);
|
||||
accounts.clean_stored_dead_slots(&dead_slots, None);
|
||||
accounts.clean_stored_dead_slots(&dead_slots, None, &HashSet::default());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -15713,22 +15800,72 @@ pub mod tests {
|
|||
call_unref: bool,
|
||||
purged_slot_pubkeys: HashSet<(Slot, Pubkey)>,
|
||||
purged_stored_account_slots: &mut AccountSlots,
|
||||
pubkeys_removed_from_accounts_index: &PubkeysRemovedFromAccountsIndex,
|
||||
) {
|
||||
if call_unref {
|
||||
self.unref_accounts(purged_slot_pubkeys, purged_stored_account_slots);
|
||||
self.unref_accounts(
|
||||
purged_slot_pubkeys,
|
||||
purged_stored_account_slots,
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
} else {
|
||||
let empty_vec = Vec::default();
|
||||
self.clean_dead_slots_from_accounts_index(
|
||||
empty_vec.iter(),
|
||||
purged_slot_pubkeys,
|
||||
Some(purged_stored_account_slots),
|
||||
pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
/// test 'unref' parameter 'pubkeys_removed_from_accounts_index'
|
||||
fn test_unref_pubkeys_removed_from_accounts_index() {
|
||||
let slot1 = 1;
|
||||
let pk1 = Pubkey::new(&[1; 32]);
|
||||
for already_removed in [false, true] {
|
||||
let mut pubkeys_removed_from_accounts_index =
|
||||
PubkeysRemovedFromAccountsIndex::default();
|
||||
if already_removed {
|
||||
pubkeys_removed_from_accounts_index.insert(pk1);
|
||||
}
|
||||
// pk1 in slot1, purge it
|
||||
let db = AccountsDb::new_single_for_tests();
|
||||
let mut purged_slot_pubkeys = HashSet::default();
|
||||
purged_slot_pubkeys.insert((slot1, pk1));
|
||||
let mut reclaims = SlotList::default();
|
||||
db.accounts_index.upsert(
|
||||
slot1,
|
||||
slot1,
|
||||
&pk1,
|
||||
&AccountSharedData::default(),
|
||||
&AccountSecondaryIndexes::default(),
|
||||
AccountInfo::default(),
|
||||
&mut reclaims,
|
||||
UpsertReclaim::IgnoreReclaims,
|
||||
);
|
||||
|
||||
let mut purged_stored_account_slots = AccountSlots::default();
|
||||
db.test_unref(
|
||||
true,
|
||||
purged_slot_pubkeys,
|
||||
&mut purged_stored_account_slots,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
assert_eq!(
|
||||
vec![(pk1, vec![slot1].into_iter().collect::<HashSet<_>>())],
|
||||
purged_stored_account_slots.into_iter().collect::<Vec<_>>()
|
||||
);
|
||||
let expected = if already_removed { 1 } else { 0 };
|
||||
assert_eq!(db.accounts_index.ref_count_from_storage(&pk1), expected);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_unref_accounts() {
|
||||
let pubkeys_removed_from_accounts_index = PubkeysRemovedFromAccountsIndex::default();
|
||||
for call_unref in [false, true] {
|
||||
{
|
||||
let db = AccountsDb::new_single_for_tests();
|
||||
|
@ -15738,6 +15875,7 @@ pub mod tests {
|
|||
call_unref,
|
||||
HashSet::default(),
|
||||
&mut purged_stored_account_slots,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
assert!(purged_stored_account_slots.is_empty());
|
||||
}
|
||||
|
@ -15768,6 +15906,7 @@ pub mod tests {
|
|||
call_unref,
|
||||
purged_slot_pubkeys,
|
||||
&mut purged_stored_account_slots,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
assert_eq!(
|
||||
vec![(pk1, vec![slot1].into_iter().collect::<HashSet<_>>())],
|
||||
|
@ -15804,6 +15943,7 @@ pub mod tests {
|
|||
call_unref,
|
||||
purged_slot_pubkeys,
|
||||
&mut purged_stored_account_slots,
|
||||
&pubkeys_removed_from_accounts_index,
|
||||
);
|
||||
for (pk, slots) in vec![(pk1, vec![slot1, slot2]), (pk2, vec![slot1])] {
|
||||
let result = purged_stored_account_slots.remove(&pk).unwrap();
|
||||
|
@ -15843,7 +15983,11 @@ pub mod tests {
|
|||
|
||||
assert_eq!(db.accounts_index.ref_count_from_storage(&pk1), n);
|
||||
// unref all 'n' slots
|
||||
db.unref_accounts(purged_slot_pubkeys, &mut purged_stored_account_slots);
|
||||
db.unref_accounts(
|
||||
purged_slot_pubkeys,
|
||||
&mut purged_stored_account_slots,
|
||||
&HashSet::default(),
|
||||
);
|
||||
assert_eq!(db.accounts_index.ref_count_from_storage(&pk1), 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1137,15 +1137,20 @@ impl<T: IndexValue> AccountsIndex<T> {
|
|||
read_lock.slot_list_mut(pubkey, user)
|
||||
}
|
||||
|
||||
/// Remove keys from the account index if the key's slot list is empty.
|
||||
/// Returns the keys that were removed from the index. These keys should not be accessed again in the current code path.
|
||||
#[must_use]
|
||||
pub fn handle_dead_keys(
|
||||
&self,
|
||||
dead_keys: &[&Pubkey],
|
||||
account_indexes: &AccountSecondaryIndexes,
|
||||
) {
|
||||
) -> HashSet<Pubkey> {
|
||||
let mut pubkeys_removed_from_accounts_index = HashSet::default();
|
||||
if !dead_keys.is_empty() {
|
||||
for key in dead_keys.iter() {
|
||||
let w_index = self.get_bin(key);
|
||||
if w_index.remove_if_slot_list_empty(**key) {
|
||||
pubkeys_removed_from_accounts_index.insert(**key);
|
||||
// Note it's only safe to remove all the entries for this key
|
||||
// because we have the lock for this key's entry in the AccountsIndex,
|
||||
// so no other thread is also updating the index
|
||||
|
@ -1153,6 +1158,7 @@ impl<T: IndexValue> AccountsIndex<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
pubkeys_removed_from_accounts_index
|
||||
}
|
||||
|
||||
/// call func with every pubkey and index visible from a given set of ancestors
|
||||
|
@ -1738,26 +1744,34 @@ impl<T: IndexValue> AccountsIndex<T> {
|
|||
});
|
||||
}
|
||||
|
||||
/// return true if pubkey was removed from the accounts index
|
||||
/// or does not exist in the accounts index
|
||||
/// This means it should NOT be unref'd later.
|
||||
#[must_use]
|
||||
pub fn clean_rooted_entries(
|
||||
&self,
|
||||
pubkey: &Pubkey,
|
||||
reclaims: &mut SlotList<T>,
|
||||
max_clean_root_inclusive: Option<Slot>,
|
||||
) {
|
||||
) -> bool {
|
||||
let mut is_slot_list_empty = false;
|
||||
self.slot_list_mut(pubkey, |slot_list| {
|
||||
self.purge_older_root_entries(slot_list, reclaims, max_clean_root_inclusive);
|
||||
is_slot_list_empty = slot_list.is_empty();
|
||||
});
|
||||
let missing_in_accounts_index = self
|
||||
.slot_list_mut(pubkey, |slot_list| {
|
||||
self.purge_older_root_entries(slot_list, reclaims, max_clean_root_inclusive);
|
||||
is_slot_list_empty = slot_list.is_empty();
|
||||
})
|
||||
.is_none();
|
||||
|
||||
let mut removed = false;
|
||||
// If the slot list is empty, remove the pubkey from `account_maps`. Make sure to grab the
|
||||
// lock and double check the slot list is still empty, because another writer could have
|
||||
// locked and inserted the pubkey in-between when `is_slot_list_empty=true` and the call to
|
||||
// remove() below.
|
||||
if is_slot_list_empty {
|
||||
let w_maps = self.get_bin(pubkey);
|
||||
w_maps.remove_if_slot_list_empty(*pubkey);
|
||||
removed = w_maps.remove_if_slot_list_empty(*pubkey);
|
||||
}
|
||||
removed || missing_in_accounts_index
|
||||
}
|
||||
|
||||
/// When can an entry be purged?
|
||||
|
@ -3510,7 +3524,7 @@ pub mod tests {
|
|||
&mut vec![],
|
||||
);
|
||||
|
||||
index.handle_dead_keys(&[&account_key], secondary_indexes);
|
||||
let _ = index.handle_dead_keys(&[&account_key], secondary_indexes);
|
||||
assert!(secondary_index.index.is_empty());
|
||||
assert!(secondary_index.reverse_index.is_empty());
|
||||
}
|
||||
|
@ -3728,7 +3742,7 @@ pub mod tests {
|
|||
index.slot_list_mut(&account_key, |slot_list| slot_list.clear());
|
||||
|
||||
// Everything should be deleted
|
||||
index.handle_dead_keys(&[&account_key], &secondary_indexes);
|
||||
let _ = index.handle_dead_keys(&[&account_key], &secondary_indexes);
|
||||
assert!(secondary_index.index.is_empty());
|
||||
assert!(secondary_index.reverse_index.is_empty());
|
||||
}
|
||||
|
@ -3849,7 +3863,7 @@ pub mod tests {
|
|||
// pubkey as dead and finally remove all the secondary indexes
|
||||
let mut reclaims = vec![];
|
||||
index.purge_exact(&account_key, &later_slot, &mut reclaims);
|
||||
index.handle_dead_keys(&[&account_key], secondary_indexes);
|
||||
let _ = index.handle_dead_keys(&[&account_key], secondary_indexes);
|
||||
assert!(secondary_index.index.is_empty());
|
||||
assert!(secondary_index.reverse_index.is_empty());
|
||||
}
|
||||
|
@ -3959,6 +3973,209 @@ pub mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_newest_root_in_slot_list() {
|
||||
let index = AccountsIndex::<bool>::default_for_tests();
|
||||
let return_0 = 0;
|
||||
let slot1 = 1;
|
||||
let slot2 = 2;
|
||||
let slot99 = 99;
|
||||
|
||||
// no roots, so always 0
|
||||
{
|
||||
let roots_tracker = &index.roots_tracker.read().unwrap();
|
||||
let slot_list = Vec::<(Slot, bool)>::default();
|
||||
assert_eq!(
|
||||
return_0,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(slot1),
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
return_0,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(slot2),
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
return_0,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(slot99),
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
index.add_root(slot2, true);
|
||||
|
||||
{
|
||||
let roots_tracker = &index.roots_tracker.read().unwrap();
|
||||
let slot_list = vec![(slot2, true)];
|
||||
assert_eq!(
|
||||
slot2,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(slot2),
|
||||
)
|
||||
);
|
||||
// no newest root
|
||||
assert_eq!(
|
||||
return_0,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(slot1),
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
slot2,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(slot99),
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: IndexValue> AccountsIndex<T> {
|
||||
fn upsert_simple_test(&self, key: &Pubkey, slot: Slot, value: T) {
|
||||
let mut gc = Vec::new();
|
||||
self.upsert(
|
||||
slot,
|
||||
slot,
|
||||
key,
|
||||
&AccountSharedData::default(),
|
||||
&AccountSecondaryIndexes::default(),
|
||||
value,
|
||||
&mut gc,
|
||||
UPSERT_PREVIOUS_SLOT_ENTRY_WAS_CACHED_FALSE,
|
||||
);
|
||||
assert!(gc.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_clean_rooted_entries_return() {
|
||||
solana_logger::setup();
|
||||
let value = true;
|
||||
let key = solana_sdk::pubkey::new_rand();
|
||||
let key_unknown = solana_sdk::pubkey::new_rand();
|
||||
let index = AccountsIndex::<bool>::default_for_tests();
|
||||
let slot1 = 1;
|
||||
|
||||
let mut gc = Vec::new();
|
||||
// return true if we don't know anything about 'key_unknown'
|
||||
// the item did not exist in the accounts index at all, so index is up to date
|
||||
assert!(index.clean_rooted_entries(&key_unknown, &mut gc, None));
|
||||
|
||||
index.upsert_simple_test(&key, slot1, value);
|
||||
|
||||
let slot2 = 2;
|
||||
// none for max root because we don't want to delete the entry yet
|
||||
assert!(!index.clean_rooted_entries(&key, &mut gc, None));
|
||||
// this is because of inclusive vs exclusive in the call to can_purge_older_entries
|
||||
assert!(!index.clean_rooted_entries(&key, &mut gc, Some(slot1)));
|
||||
// this will delete the entry because it is <= max_root_inclusive and NOT a root
|
||||
// note this has to be slot2 because of inclusive vs exclusive in the call to can_purge_older_entries
|
||||
{
|
||||
let mut gc = Vec::new();
|
||||
assert!(index.clean_rooted_entries(&key, &mut gc, Some(slot2)));
|
||||
assert_eq!(gc, vec![(slot1, value)]);
|
||||
}
|
||||
|
||||
// re-add it
|
||||
index.upsert_simple_test(&key, slot1, value);
|
||||
|
||||
index.add_root(slot1, value);
|
||||
assert!(!index.clean_rooted_entries(&key, &mut gc, Some(slot2)));
|
||||
index.upsert_simple_test(&key, slot2, value);
|
||||
|
||||
assert_eq!(
|
||||
2,
|
||||
index
|
||||
.get_account_read_entry(&key)
|
||||
.unwrap()
|
||||
.slot_list()
|
||||
.len()
|
||||
);
|
||||
assert_eq!(
|
||||
&vec![(slot1, value), (slot2, value)],
|
||||
index.get_account_read_entry(&key).unwrap().slot_list()
|
||||
);
|
||||
assert!(!index.clean_rooted_entries(&key, &mut gc, Some(slot2)));
|
||||
assert_eq!(
|
||||
2,
|
||||
index
|
||||
.get_account_read_entry(&key)
|
||||
.unwrap()
|
||||
.slot_list()
|
||||
.len()
|
||||
);
|
||||
assert!(gc.is_empty());
|
||||
{
|
||||
{
|
||||
let roots_tracker = &index.roots_tracker.read().unwrap();
|
||||
let slot_list = vec![(slot2, value)];
|
||||
assert_eq!(
|
||||
0,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
None,
|
||||
)
|
||||
);
|
||||
}
|
||||
index.add_root(slot2, true);
|
||||
{
|
||||
let roots_tracker = &index.roots_tracker.read().unwrap();
|
||||
let slot_list = vec![(slot2, value)];
|
||||
assert_eq!(
|
||||
slot2,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
None,
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
0,
|
||||
AccountsIndex::get_newest_root_in_slot_list(
|
||||
&roots_tracker.alive_roots,
|
||||
&slot_list,
|
||||
Some(0),
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
assert!(gc.is_empty());
|
||||
assert!(!index.clean_rooted_entries(&key, &mut gc, Some(slot2)));
|
||||
assert_eq!(gc, vec![(slot1, value)]);
|
||||
gc.clear();
|
||||
index.clean_dead_slot(slot2, &mut AccountsIndexRootsStats::default());
|
||||
let slot3 = 3;
|
||||
assert!(index.clean_rooted_entries(&key, &mut gc, Some(slot3)));
|
||||
assert_eq!(gc, vec![(slot2, value)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_handle_dead_keys_return() {
|
||||
let key = solana_sdk::pubkey::new_rand();
|
||||
let index = AccountsIndex::<bool>::default_for_tests();
|
||||
|
||||
assert_eq!(
|
||||
index.handle_dead_keys(&[&key], &AccountSecondaryIndexes::default()),
|
||||
vec![key].into_iter().collect::<HashSet<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_start_end_bin() {
|
||||
let index = AccountsIndex::<bool>::default_for_tests();
|
||||
|
|
|
@ -363,6 +363,8 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// return false if the entry is in the index (disk or memory) and has a slot list len > 0
|
||||
/// return true in all other cases, including if the entry is NOT in the index at all
|
||||
fn remove_if_slot_list_empty_entry(&self, entry: Entry<K, AccountMapEntry<T>>) -> bool {
|
||||
match entry {
|
||||
Entry::Occupied(occupied) => {
|
||||
|
@ -396,7 +398,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
false
|
||||
}
|
||||
}
|
||||
None => false, // not in cache or on disk
|
||||
None => true, // not in cache or on disk, but slot list is 'empty' and entry is not in index, so return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1895,6 +1897,54 @@ mod tests {
|
|||
assert!(!flushing_active.load(Ordering::Acquire));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove_if_slot_list_empty_entry() {
|
||||
let key = solana_sdk::pubkey::new_rand();
|
||||
let unknown_key = solana_sdk::pubkey::new_rand();
|
||||
|
||||
let test = new_for_test::<u64>();
|
||||
|
||||
let mut map = test.map_internal.write().unwrap();
|
||||
|
||||
{
|
||||
// item is NOT in index at all, still return true from remove_if_slot_list_empty_entry
|
||||
// make sure not initially in index
|
||||
let entry = map.entry(unknown_key);
|
||||
assert!(matches!(entry, Entry::Vacant(_)));
|
||||
let entry = map.entry(unknown_key);
|
||||
assert!(test.remove_if_slot_list_empty_entry(entry));
|
||||
// make sure still not in index
|
||||
let entry = map.entry(unknown_key);
|
||||
assert!(matches!(entry, Entry::Vacant(_)));
|
||||
}
|
||||
|
||||
{
|
||||
// add an entry with an empty slot list
|
||||
let val = Arc::new(AccountMapEntryInner::<u64>::default());
|
||||
map.insert(key, val);
|
||||
let entry = map.entry(key);
|
||||
assert!(matches!(entry, Entry::Occupied(_)));
|
||||
// should have removed it since it had an empty slot list
|
||||
assert!(test.remove_if_slot_list_empty_entry(entry));
|
||||
let entry = map.entry(key);
|
||||
assert!(matches!(entry, Entry::Vacant(_)));
|
||||
// return true - item is not in index at all now
|
||||
assert!(test.remove_if_slot_list_empty_entry(entry));
|
||||
}
|
||||
|
||||
{
|
||||
// add an entry with a NON empty slot list - it will NOT get removed
|
||||
let val = Arc::new(AccountMapEntryInner::<u64>::default());
|
||||
val.slot_list.write().unwrap().push((1, 1));
|
||||
map.insert(key, val);
|
||||
// does NOT remove it since it has a non-empty slot list
|
||||
let entry = map.entry(key);
|
||||
assert!(!test.remove_if_slot_list_empty_entry(entry));
|
||||
let entry = map.entry(key);
|
||||
assert!(matches!(entry, Entry::Occupied(_)));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lock_and_update_slot_list() {
|
||||
let test = AccountMapEntryInner::<u64>::default();
|
||||
|
|
|
@ -353,7 +353,7 @@ impl<'a> SnapshotMinimizer<'a> {
|
|||
.into_iter()
|
||||
.map(|pubkey| (*pubkey, slot))
|
||||
.collect();
|
||||
self.accounts_db().purge_keys_exact(purge_pubkeys.iter());
|
||||
let _ = self.accounts_db().purge_keys_exact(purge_pubkeys.iter());
|
||||
|
||||
let aligned_total: u64 = AccountsDb::page_align(total_bytes as u64);
|
||||
if aligned_total > 0 {
|
||||
|
|
Loading…
Reference in New Issue