diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 66149f3194..a8a708450c 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -37,10 +37,11 @@ use { ZeroLamportAccounts, }, accounts_index::{ - AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig, - AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue, IndexKey, IndexValue, - IsCached, RefCount, ScanConfig, ScanResult, SlotList, UpsertReclaim, ZeroLamport, - ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING, + AccountIndexGetResult, AccountMapEntry, AccountSecondaryIndexes, AccountsIndex, + AccountsIndexConfig, AccountsIndexRootsStats, AccountsIndexScanResult, DiskIndexValue, + IndexKey, IndexValue, IsCached, RefCount, ScanConfig, ScanResult, SlotList, + UpsertReclaim, ZeroLamport, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, + ACCOUNTS_INDEX_CONFIG_FOR_TESTING, }, accounts_index_storage::Startup, accounts_update_notifier_interface::AccountsUpdateNotifier, @@ -465,6 +466,9 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> { pub(crate) total_starting_accounts: usize, /// true if all alive accounts are zero lamports pub(crate) all_are_zero_lamports: bool, + /// index entries that need to be held in memory while shrink is in progress + /// These aren't read - they are just held so that entries cannot be flushed. + pub(crate) _index_entries_being_shrunk: Vec>, } pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { @@ -499,6 +503,8 @@ struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> { unrefed_pubkeys: Vec<&'a Pubkey>, /// true if all alive accounts are zero lamport accounts all_are_zero_lamports: bool, + /// index entries we need to hold onto to keep them from getting flushed + index_entries_being_shrunk: Vec>, } pub struct GetUniqueAccountsResult<'a> { @@ -3189,7 +3195,7 @@ impl AccountsDb { let mut useful = 0; self.accounts_index.scan( pubkeys.iter(), - |pubkey, slots_refs| { + |pubkey, slots_refs, _entry| { let mut useless = true; if let Some((slot_list, ref_count)) = slots_refs { let index_in_slot_list = self.accounts_index.latest_slot( @@ -3255,6 +3261,7 @@ impl AccountsDb { } }, None, + false, ); found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed); not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); @@ -3741,9 +3748,10 @@ impl AccountsDb { let mut dead = 0; let mut index = 0; let mut all_are_zero_lamports = true; + let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len()); self.accounts_index.scan( accounts.iter().map(|account| account.pubkey()), - |pubkey, slots_refs| { + |pubkey, slots_refs, entry| { let mut result = AccountsIndexScanResult::None; if let Some((slot_list, ref_count)) = slots_refs { let stored_account = &accounts[index]; @@ -3760,6 +3768,9 @@ impl AccountsDb { result = AccountsIndexScanResult::Unref; dead += 1; } else { + // Hold onto the index entry arc so that it cannot be flushed. + // Since we are shrinking these entries, we need to disambiguate append_vec_ids during this period and those only exist in the in-memory accounts index. + index_entries_being_shrunk.push(Arc::clone(entry.unwrap())); all_are_zero_lamports &= stored_account.lamports() == 0; alive_accounts.add(ref_count, stored_account); alive += 1; @@ -3769,6 +3780,7 @@ impl AccountsDb { result }, None, + true, ); assert_eq!(index, std::cmp::min(accounts.len(), count)); stats.alive_accounts.fetch_add(alive, Ordering::Relaxed); @@ -3778,6 +3790,7 @@ impl AccountsDb { alive_accounts, unrefed_pubkeys, all_are_zero_lamports, + index_entries_being_shrunk, } } @@ -3840,6 +3853,7 @@ impl AccountsDb { .accounts_loaded .fetch_add(len as u64, Ordering::Relaxed); let all_are_zero_lamports_collect = Mutex::new(true); + let index_entries_being_shrunk_outer = Mutex::new(Vec::default()); self.thread_pool_clean.install(|| { stored_accounts .par_chunks(SHRINK_COLLECT_CHUNK_SIZE) @@ -3848,6 +3862,7 @@ impl AccountsDb { alive_accounts, mut unrefed_pubkeys, all_are_zero_lamports, + mut index_entries_being_shrunk, } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot); // collect @@ -3859,6 +3874,10 @@ impl AccountsDb { .lock() .unwrap() .append(&mut unrefed_pubkeys); + index_entries_being_shrunk_outer + .lock() + .unwrap() + .append(&mut index_entries_being_shrunk); if !all_are_zero_lamports { *all_are_zero_lamports_collect.lock().unwrap() = false; } @@ -3897,6 +3916,7 @@ impl AccountsDb { alive_total_bytes, total_starting_accounts: len, all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(), + _index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(), } } @@ -8012,11 +8032,12 @@ impl AccountsDb { pubkeys_removed_from_accounts_index.contains(pubkey); !already_removed }), - |_pubkey, _slots_refs| { + |_pubkey, _slots_refs, _entry| { /* unused */ AccountsIndexScanResult::Unref }, Some(AccountsIndexScanResult::Unref), + false, ) }); }); diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0e150a3849..6c6d6f3014 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1356,11 +1356,13 @@ impl + Into> AccountsIndex { /// For each pubkey, find the slot list in the accounts index /// apply 'avoid_callback_result' if specified. /// otherwise, call `callback` + /// if 'provide_entry_in_callback' is true, populate callback with the Arc of the entry itself. pub(crate) fn scan<'a, F, I>( &self, pubkeys: I, mut callback: F, avoid_callback_result: Option, + provide_entry_in_callback: bool, ) where // params: // pubkey looked up @@ -1368,9 +1370,14 @@ impl + Into> AccountsIndex { // None if 'pubkey' is not in accounts index. // slot_list: comes from accounts index for 'pubkey' // ref_count: refcount of entry in index + // entry, if 'provide_entry_in_callback' is true // if 'avoid_callback_result' is Some(_), then callback is NOT called // and _ is returned as if callback were called. - F: FnMut(&'a Pubkey, Option<(&SlotList, RefCount)>) -> AccountsIndexScanResult, + F: FnMut( + &'a Pubkey, + Option<(&SlotList, RefCount)>, + Option<&AccountMapEntry>, + ) -> AccountsIndexScanResult, I: Iterator, { let mut lock = None; @@ -1390,7 +1397,11 @@ impl + Into> AccountsIndex { *result } else { let slot_list = &locked_entry.slot_list.read().unwrap(); - callback(pubkey, Some((slot_list, locked_entry.ref_count()))) + callback( + pubkey, + Some((slot_list, locked_entry.ref_count())), + provide_entry_in_callback.then_some(locked_entry), + ) }; cache = match result { AccountsIndexScanResult::Unref => { @@ -1404,7 +1415,7 @@ impl + Into> AccountsIndex { }; } None => { - avoid_callback_result.unwrap_or_else(|| callback(pubkey, None)); + avoid_callback_result.unwrap_or_else(|| callback(pubkey, None, None)); } } (cache, ())