trait ShrinkCollectRefs (#29830)

This commit is contained in:
Jeff Washington (jwash) 2023-01-24 10:43:50 -06:00 committed by GitHub
parent 8d4603edfb
commit cac52b0819
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 110 additions and 32 deletions

View File

@ -151,6 +151,87 @@ pub enum IncludeSlotInHash {
IrrelevantAssertOnUse,
}
#[derive(Default)]
/// hold alive accounts and bytes used by those accounts
/// alive means in the accounts index
struct AliveAccounts<'a> {
accounts: Vec<&'a StoredAccountMeta<'a>>,
bytes: usize,
}
/// separate pubkeys into those with a single refcount and those with > 1 refcount
struct ShrinkCollectAliveSeparatedByRefs<'a> {
one_ref: AliveAccounts<'a>,
many_refs: AliveAccounts<'a>,
}
trait ShrinkCollectRefs<'a>: Sync + Send {
fn with_capacity(capacity: usize) -> Self;
fn collect(&mut self, other: Self);
fn add(&mut self, ref_count: u64, account: &'a StoredAccountMeta<'a>);
fn len(&self) -> usize;
fn alive_bytes(&self) -> usize;
fn alive_accounts(&self) -> &Vec<&'a StoredAccountMeta<'a>>;
}
impl<'a> ShrinkCollectRefs<'a> for AliveAccounts<'a> {
fn collect(&mut self, mut other: Self) {
self.bytes = self.bytes.saturating_add(other.bytes);
self.accounts.append(&mut other.accounts);
}
fn with_capacity(capacity: usize) -> Self {
Self {
accounts: Vec::with_capacity(capacity),
bytes: 0,
}
}
fn add(&mut self, _ref_count: u64, account: &'a StoredAccountMeta<'a>) {
self.accounts.push(account);
self.bytes = self.bytes.saturating_add(account.stored_size);
}
fn len(&self) -> usize {
self.accounts.len()
}
fn alive_bytes(&self) -> usize {
self.bytes
}
fn alive_accounts(&self) -> &Vec<&'a StoredAccountMeta<'a>> {
&self.accounts
}
}
impl<'a> ShrinkCollectRefs<'a> for ShrinkCollectAliveSeparatedByRefs<'a> {
fn collect(&mut self, other: Self) {
self.one_ref.collect(other.one_ref);
self.many_refs.collect(other.many_refs);
}
fn with_capacity(capacity: usize) -> Self {
Self {
one_ref: AliveAccounts::with_capacity(capacity),
many_refs: AliveAccounts::with_capacity(capacity),
}
}
fn add(&mut self, ref_count: u64, account: &'a StoredAccountMeta<'a>) {
let other = if ref_count == 1 {
&mut self.one_ref
} else {
&mut self.many_refs
};
other.add(ref_count, account);
}
fn len(&self) -> usize {
self.one_ref.len().saturating_add(self.many_refs.len())
}
fn alive_bytes(&self) -> usize {
self.one_ref
.alive_bytes()
.saturating_add(self.many_refs.alive_bytes())
}
fn alive_accounts(&self) -> &Vec<&'a StoredAccountMeta<'a>> {
unimplemented!("illegal use");
}
}
/// used by tests for 'include_slot_in_hash' parameter
/// Tests just need to be self-consistent, so any value should work here.
pub const INCLUDE_SLOT_IN_HASH_TESTS: IncludeSlotInHash = IncludeSlotInHash::IncludeSlot;
@ -322,11 +403,11 @@ impl AncientSlotPubkeys {
}
}
struct ShrinkCollect<'a> {
struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
original_bytes: u64,
aligned_total_bytes: u64,
unrefed_pubkeys: Vec<&'a Pubkey>,
alive_accounts: Vec<&'a StoredAccountMeta<'a>>,
alive_accounts: T,
/// total size in storage of all alive accounts
alive_total_bytes: usize,
total_starting_accounts: usize,
@ -355,11 +436,9 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig
pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
struct LoadAccountsIndexForShrink<'a> {
/// total stored bytes for all alive accounts
alive_total_bytes: usize,
/// the specific alive accounts
alive_accounts: Vec<&'a StoredAccountMeta<'a>>,
struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
/// all alive accounts
alive_accounts: T,
/// pubkeys that were unref'd in the accounts index because they were dead
unrefed_pubkeys: Vec<&'a Pubkey>,
/// true if all alive accounts are zero lamport accounts
@ -3539,18 +3618,16 @@ impl AccountsDb {
/// store a reference to all alive accounts in `alive_accounts`
/// unref and optionally store a reference to all pubkeys that are in the index, but dead in `unrefed_pubkeys`
/// return sum of account size for all alive accounts
fn load_accounts_index_for_shrink<'a>(
fn load_accounts_index_for_shrink<'a, T: ShrinkCollectRefs<'a>>(
&'a self,
accounts: &'a [StoredAccountMeta<'a>],
stats: &ShrinkStats,
slot_to_shrink: Slot,
) -> LoadAccountsIndexForShrink<'a> {
) -> LoadAccountsIndexForShrink<'a, T> {
let count = accounts.len();
let mut alive_accounts = Vec::with_capacity(count);
let mut alive_accounts = T::with_capacity(count);
let mut unrefed_pubkeys = Vec::with_capacity(count);
let mut alive_total_bytes = 0;
let mut alive = 0;
let mut dead = 0;
let mut index = 0;
@ -3559,7 +3636,7 @@ impl AccountsDb {
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs| {
let mut result = AccountsIndexScanResult::None;
if let Some((slot_list, _ref_count)) = slots_refs {
if let Some((slot_list, ref_count)) = slots_refs {
let stored_account = &accounts[index];
let is_alive = slot_list.iter().any(|(slot, _acct_info)| {
// if the accounts index contains an entry at this slot, then the append vec we're asking about contains this item and thus, it is alive at this slot
@ -3575,8 +3652,7 @@ impl AccountsDb {
dead += 1;
} else {
all_are_zero_lamports &= stored_account.lamports() == 0;
alive_accounts.push(stored_account);
alive_total_bytes += stored_account.stored_size;
alive_accounts.add(ref_count, stored_account);
alive += 1;
}
}
@ -3590,7 +3666,6 @@ impl AccountsDb {
stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);
LoadAccountsIndexForShrink {
alive_total_bytes,
alive_accounts,
unrefed_pubkeys,
all_are_zero_lamports,
@ -3621,12 +3696,12 @@ impl AccountsDb {
/// shared code for shrinking normal slots and combining into ancient append vecs
/// note 'stored_accounts' is passed by ref so we can return references to data within it, avoiding self-references
fn shrink_collect<'a: 'b, 'b>(
fn shrink_collect<'a: 'b, 'b, T: ShrinkCollectRefs<'b>>(
&'a self,
store: &'a Arc<AccountStorageEntry>,
stored_accounts: &'b mut Vec<StoredAccountMeta<'b>>,
stats: &ShrinkStats,
) -> ShrinkCollect<'b> {
) -> ShrinkCollect<'b, T> {
let (
GetUniqueAccountsResult {
stored_accounts: stored_accounts_temp,
@ -3641,10 +3716,9 @@ impl AccountsDb {
*stored_accounts = stored_accounts_temp;
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let alive_total_bytes_collect = AtomicUsize::new(0);
let len = stored_accounts.len();
let alive_accounts_collect = Mutex::new(Vec::with_capacity(len));
let alive_accounts_collect = Mutex::new(T::with_capacity(len));
let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
stats
.accounts_loaded
@ -3655,8 +3729,7 @@ impl AccountsDb {
.par_chunks(SHRINK_COLLECT_CHUNK_SIZE)
.for_each(|stored_accounts| {
let LoadAccountsIndexForShrink {
alive_total_bytes,
mut alive_accounts,
alive_accounts,
mut unrefed_pubkeys,
all_are_zero_lamports,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
@ -3665,12 +3738,11 @@ impl AccountsDb {
alive_accounts_collect
.lock()
.unwrap()
.append(&mut alive_accounts);
.collect(alive_accounts);
unrefed_pubkeys_collect
.lock()
.unwrap()
.append(&mut unrefed_pubkeys);
alive_total_bytes_collect.fetch_add(alive_total_bytes, Ordering::Relaxed);
if !all_are_zero_lamports {
*all_are_zero_lamports_collect.lock().unwrap() = false;
}
@ -3679,13 +3751,14 @@ impl AccountsDb {
let alive_accounts = alive_accounts_collect.into_inner().unwrap();
let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap();
let alive_total_bytes = alive_total_bytes_collect.load(Ordering::Relaxed);
index_read_elapsed.stop();
stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
let alive_total_bytes = alive_accounts.alive_bytes();
let aligned_total_bytes: u64 = Self::page_align(alive_total_bytes as u64);
stats
@ -3712,9 +3785,9 @@ impl AccountsDb {
/// common code from shrink and combine_ancient_slots
/// get rid of all original store_ids in the slot
fn remove_old_stores_shrink(
fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>(
&self,
shrink_collect: &ShrinkCollect,
shrink_collect: &ShrinkCollect<'a, T>,
slot: Slot,
stats: &ShrinkStats,
shrink_in_progress: Option<ShrinkInProgress>,
@ -3755,7 +3828,11 @@ impl AccountsDb {
}
let mut stored_accounts = Vec::default();
debug!("do_shrink_slot_store: slot: {}", slot);
let shrink_collect = self.shrink_collect(store, &mut stored_accounts, &self.shrink_stats);
let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
store,
&mut stored_accounts,
&self.shrink_stats,
);
// This shouldn't happen if alive_bytes/approx_stored_count are accurate
if Self::should_not_shrink(
@ -3799,7 +3876,7 @@ impl AccountsDb {
store_accounts_timing = self.store_accounts_frozen(
(
slot,
&shrink_collect.alive_accounts[..],
&shrink_collect.alive_accounts.alive_accounts()[..],
INCLUDE_SLOT_IN_HASH_IRRELEVANT_APPEND_VEC_OPERATION,
),
None::<Vec<&Hash>>,
@ -4324,7 +4401,7 @@ impl AccountsDb {
dropped_roots: &mut Vec<Slot>,
) {
let mut stored_accounts = Vec::default();
let shrink_collect = self.shrink_collect(
let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
old_storage,
&mut stored_accounts,
&self.shrink_ancient_stats.shrink_stats,
@ -4344,7 +4421,7 @@ impl AccountsDb {
// 'Overflow', which will have to go into a new ancient append vec at 'slot'
let (to_store, find_alive_elapsed) = measure!(AccountsToStore::new(
available_bytes,
&shrink_collect.alive_accounts,
shrink_collect.alive_accounts.alive_accounts(),
shrink_collect.alive_total_bytes,
slot
));
@ -16664,7 +16741,7 @@ pub mod tests {
let storage = db.get_storage_for_slot(slot5).unwrap();
let mut stored_accounts = Vec::default();
let shrink_collect = db.shrink_collect(
let shrink_collect = db.shrink_collect::<AliveAccounts<'_>>(
&storage,
&mut stored_accounts,
&ShrinkStats::default(),
@ -16700,6 +16777,7 @@ pub mod tests {
assert_eq!(
shrink_collect
.alive_accounts
.accounts
.iter()
.map(|account| *account.pubkey())
.sorted()