diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index f943831f50..637511370b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -151,6 +151,87 @@ pub enum IncludeSlotInHash { IrrelevantAssertOnUse, } +#[derive(Default)] +/// hold alive accounts and bytes used by those accounts +/// alive means in the accounts index +struct AliveAccounts<'a> { + accounts: Vec<&'a StoredAccountMeta<'a>>, + bytes: usize, +} + +/// separate pubkeys into those with a single refcount and those with > 1 refcount +struct ShrinkCollectAliveSeparatedByRefs<'a> { + one_ref: AliveAccounts<'a>, + many_refs: AliveAccounts<'a>, +} + +trait ShrinkCollectRefs<'a>: Sync + Send { + fn with_capacity(capacity: usize) -> Self; + fn collect(&mut self, other: Self); + fn add(&mut self, ref_count: u64, account: &'a StoredAccountMeta<'a>); + fn len(&self) -> usize; + fn alive_bytes(&self) -> usize; + fn alive_accounts(&self) -> &Vec<&'a StoredAccountMeta<'a>>; +} + +impl<'a> ShrinkCollectRefs<'a> for AliveAccounts<'a> { + fn collect(&mut self, mut other: Self) { + self.bytes = self.bytes.saturating_add(other.bytes); + self.accounts.append(&mut other.accounts); + } + fn with_capacity(capacity: usize) -> Self { + Self { + accounts: Vec::with_capacity(capacity), + bytes: 0, + } + } + fn add(&mut self, _ref_count: u64, account: &'a StoredAccountMeta<'a>) { + self.accounts.push(account); + self.bytes = self.bytes.saturating_add(account.stored_size); + } + fn len(&self) -> usize { + self.accounts.len() + } + fn alive_bytes(&self) -> usize { + self.bytes + } + fn alive_accounts(&self) -> &Vec<&'a StoredAccountMeta<'a>> { + &self.accounts + } +} + +impl<'a> ShrinkCollectRefs<'a> for ShrinkCollectAliveSeparatedByRefs<'a> { + fn collect(&mut self, other: Self) { + self.one_ref.collect(other.one_ref); + self.many_refs.collect(other.many_refs); + } + fn with_capacity(capacity: usize) -> Self { + Self { + one_ref: AliveAccounts::with_capacity(capacity), + many_refs: AliveAccounts::with_capacity(capacity), + } + } + fn add(&mut self, ref_count: u64, account: &'a StoredAccountMeta<'a>) { + let other = if ref_count == 1 { + &mut self.one_ref + } else { + &mut self.many_refs + }; + other.add(ref_count, account); + } + fn len(&self) -> usize { + self.one_ref.len().saturating_add(self.many_refs.len()) + } + fn alive_bytes(&self) -> usize { + self.one_ref + .alive_bytes() + .saturating_add(self.many_refs.alive_bytes()) + } + fn alive_accounts(&self) -> &Vec<&'a StoredAccountMeta<'a>> { + unimplemented!("illegal use"); + } +} + /// used by tests for 'include_slot_in_hash' parameter /// Tests just need to be self-consistent, so any value should work here. pub const INCLUDE_SLOT_IN_HASH_TESTS: IncludeSlotInHash = IncludeSlotInHash::IncludeSlot; @@ -322,11 +403,11 @@ impl AncientSlotPubkeys { } } -struct ShrinkCollect<'a> { +struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> { original_bytes: u64, aligned_total_bytes: u64, unrefed_pubkeys: Vec<&'a Pubkey>, - alive_accounts: Vec<&'a StoredAccountMeta<'a>>, + alive_accounts: T, /// total size in storage of all alive accounts alive_total_bytes: usize, total_starting_accounts: usize, @@ -355,11 +436,9 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig pub type BinnedHashData = Vec>; -struct LoadAccountsIndexForShrink<'a> { - /// total stored bytes for all alive accounts - alive_total_bytes: usize, - /// the specific alive accounts - alive_accounts: Vec<&'a StoredAccountMeta<'a>>, +struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> { + /// all alive accounts + alive_accounts: T, /// pubkeys that were unref'd in the accounts index because they were dead unrefed_pubkeys: Vec<&'a Pubkey>, /// true if all alive accounts are zero lamport accounts @@ -3539,18 +3618,16 @@ impl AccountsDb { /// store a reference to all alive accounts in `alive_accounts` /// unref and optionally store a reference to all pubkeys that are in the index, but dead in `unrefed_pubkeys` /// return sum of account size for all alive accounts - fn load_accounts_index_for_shrink<'a>( + fn load_accounts_index_for_shrink<'a, T: ShrinkCollectRefs<'a>>( &'a self, accounts: &'a [StoredAccountMeta<'a>], stats: &ShrinkStats, slot_to_shrink: Slot, - ) -> LoadAccountsIndexForShrink<'a> { + ) -> LoadAccountsIndexForShrink<'a, T> { let count = accounts.len(); - let mut alive_accounts = Vec::with_capacity(count); + let mut alive_accounts = T::with_capacity(count); let mut unrefed_pubkeys = Vec::with_capacity(count); - let mut alive_total_bytes = 0; - let mut alive = 0; let mut dead = 0; let mut index = 0; @@ -3559,7 +3636,7 @@ impl AccountsDb { accounts.iter().map(|account| account.pubkey()), |pubkey, slots_refs| { let mut result = AccountsIndexScanResult::None; - if let Some((slot_list, _ref_count)) = slots_refs { + if let Some((slot_list, ref_count)) = slots_refs { let stored_account = &accounts[index]; let is_alive = slot_list.iter().any(|(slot, _acct_info)| { // if the accounts index contains an entry at this slot, then the append vec we're asking about contains this item and thus, it is alive at this slot @@ -3575,8 +3652,7 @@ impl AccountsDb { dead += 1; } else { all_are_zero_lamports &= stored_account.lamports() == 0; - alive_accounts.push(stored_account); - alive_total_bytes += stored_account.stored_size; + alive_accounts.add(ref_count, stored_account); alive += 1; } } @@ -3590,7 +3666,6 @@ impl AccountsDb { stats.dead_accounts.fetch_add(dead, Ordering::Relaxed); LoadAccountsIndexForShrink { - alive_total_bytes, alive_accounts, unrefed_pubkeys, all_are_zero_lamports, @@ -3621,12 +3696,12 @@ impl AccountsDb { /// shared code for shrinking normal slots and combining into ancient append vecs /// note 'stored_accounts' is passed by ref so we can return references to data within it, avoiding self-references - fn shrink_collect<'a: 'b, 'b>( + fn shrink_collect<'a: 'b, 'b, T: ShrinkCollectRefs<'b>>( &'a self, store: &'a Arc, stored_accounts: &'b mut Vec>, stats: &ShrinkStats, - ) -> ShrinkCollect<'b> { + ) -> ShrinkCollect<'b, T> { let ( GetUniqueAccountsResult { stored_accounts: stored_accounts_temp, @@ -3641,10 +3716,9 @@ impl AccountsDb { *stored_accounts = stored_accounts_temp; let mut index_read_elapsed = Measure::start("index_read_elapsed"); - let alive_total_bytes_collect = AtomicUsize::new(0); let len = stored_accounts.len(); - let alive_accounts_collect = Mutex::new(Vec::with_capacity(len)); + let alive_accounts_collect = Mutex::new(T::with_capacity(len)); let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len)); stats .accounts_loaded @@ -3655,8 +3729,7 @@ impl AccountsDb { .par_chunks(SHRINK_COLLECT_CHUNK_SIZE) .for_each(|stored_accounts| { let LoadAccountsIndexForShrink { - alive_total_bytes, - mut alive_accounts, + alive_accounts, mut unrefed_pubkeys, all_are_zero_lamports, } = self.load_accounts_index_for_shrink(stored_accounts, stats, slot); @@ -3665,12 +3738,11 @@ impl AccountsDb { alive_accounts_collect .lock() .unwrap() - .append(&mut alive_accounts); + .collect(alive_accounts); unrefed_pubkeys_collect .lock() .unwrap() .append(&mut unrefed_pubkeys); - alive_total_bytes_collect.fetch_add(alive_total_bytes, Ordering::Relaxed); if !all_are_zero_lamports { *all_are_zero_lamports_collect.lock().unwrap() = false; } @@ -3679,13 +3751,14 @@ impl AccountsDb { let alive_accounts = alive_accounts_collect.into_inner().unwrap(); let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap(); - let alive_total_bytes = alive_total_bytes_collect.load(Ordering::Relaxed); index_read_elapsed.stop(); stats .index_read_elapsed .fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed); + let alive_total_bytes = alive_accounts.alive_bytes(); + let aligned_total_bytes: u64 = Self::page_align(alive_total_bytes as u64); stats @@ -3712,9 +3785,9 @@ impl AccountsDb { /// common code from shrink and combine_ancient_slots /// get rid of all original store_ids in the slot - fn remove_old_stores_shrink( + fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>( &self, - shrink_collect: &ShrinkCollect, + shrink_collect: &ShrinkCollect<'a, T>, slot: Slot, stats: &ShrinkStats, shrink_in_progress: Option, @@ -3755,7 +3828,11 @@ impl AccountsDb { } let mut stored_accounts = Vec::default(); debug!("do_shrink_slot_store: slot: {}", slot); - let shrink_collect = self.shrink_collect(store, &mut stored_accounts, &self.shrink_stats); + let shrink_collect = self.shrink_collect::>( + store, + &mut stored_accounts, + &self.shrink_stats, + ); // This shouldn't happen if alive_bytes/approx_stored_count are accurate if Self::should_not_shrink( @@ -3799,7 +3876,7 @@ impl AccountsDb { store_accounts_timing = self.store_accounts_frozen( ( slot, - &shrink_collect.alive_accounts[..], + &shrink_collect.alive_accounts.alive_accounts()[..], INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION, ), None::>, @@ -4324,7 +4401,7 @@ impl AccountsDb { dropped_roots: &mut Vec, ) { let mut stored_accounts = Vec::default(); - let shrink_collect = self.shrink_collect( + let shrink_collect = self.shrink_collect::>( old_storage, &mut stored_accounts, &self.shrink_ancient_stats.shrink_stats, @@ -4344,7 +4421,7 @@ impl AccountsDb { // 'Overflow', which will have to go into a new ancient append vec at 'slot' let (to_store, find_alive_elapsed) = measure!(AccountsToStore::new( available_bytes, - &shrink_collect.alive_accounts, + shrink_collect.alive_accounts.alive_accounts(), shrink_collect.alive_total_bytes, slot )); @@ -16664,7 +16741,7 @@ pub mod tests { let storage = db.get_storage_for_slot(slot5).unwrap(); let mut stored_accounts = Vec::default(); - let shrink_collect = db.shrink_collect( + let shrink_collect = db.shrink_collect::>( &storage, &mut stored_accounts, &ShrinkStats::default(), @@ -16700,6 +16777,7 @@ pub mod tests { assert_eq!( shrink_collect .alive_accounts + .accounts .iter() .map(|account| *account.pubkey()) .sorted()