in gen index, stop using big hashmap (#33252)

* in gen index, stop using big hashmap

* update accounts_data_len

* remove approx_stored_count
This commit is contained in:
Jeff Washington (jwash) 2023-09-18 11:13:18 -07:00 committed by GitHub
parent 402981e3c1
commit 17c3930bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 178 additions and 59 deletions

View File

@ -2322,8 +2322,6 @@ impl<'a> ZeroLamport for StoredAccountMeta<'a> {
}
}
type GenerateIndexAccountsMap<'a> = HashMap<Pubkey, StoredAccountMeta<'a>>;
/// called on a struct while scanning append vecs
trait AppendVecScan: Send + Sync + Clone {
/// return true if this pubkey should be included
@ -8914,20 +8912,6 @@ impl AccountsDb {
(result, slots)
}
fn process_storage_slot<'a>(
&self,
storage: &'a Arc<AccountStorageEntry>,
) -> GenerateIndexAccountsMap<'a> {
let num_accounts = storage.approx_stored_count();
let mut accounts_map = GenerateIndexAccountsMap::with_capacity(num_accounts);
storage.accounts.account_iter().for_each(|stored_account| {
let pubkey = stored_account.pubkey();
assert!(!self.is_filler_account(pubkey));
accounts_map.insert(*pubkey, stored_account);
});
accounts_map
}
/// return Some(lamports_to_top_off) if 'account' would collect rent
fn stats_for_rent_payers<T: ReadableAccount>(
pubkey: &Pubkey,
@ -8948,30 +8932,32 @@ impl AccountsDb {
fn generate_index_for_slot(
&self,
accounts_map: GenerateIndexAccountsMap<'_>,
storage: &Arc<AccountStorageEntry>,
slot: Slot,
store_id: AppendVecId,
rent_collector: &RentCollector,
storage_info: &StorageSizeAndCountMap,
) -> SlotIndexGenerationInfo {
if accounts_map.is_empty() {
let mut accounts = storage.accounts.account_iter();
if accounts.next().is_none() {
return SlotIndexGenerationInfo::default();
}
let accounts = storage.accounts.account_iter();
let secondary = !self.account_indexes.is_empty();
let mut rent_paying_accounts_by_partition = Vec::default();
let mut accounts_data_len = 0;
let mut num_accounts_rent_paying = 0;
let num_accounts = accounts_map.len();
let mut amount_to_top_off_rent = 0;
let mut stored_size_alive = 0;
let items = accounts_map.into_iter().map(|(pubkey, stored_account)| {
let items = accounts.map(|stored_account| {
stored_size_alive += stored_account.stored_size();
let pubkey = stored_account.pubkey();
if secondary {
self.accounts_index.update_secondary_indexes(
&pubkey,
pubkey,
&stored_account,
&self.account_indexes,
);
@ -8981,16 +8967,16 @@ impl AccountsDb {
}
if let Some(amount_to_top_off_rent_this_account) =
Self::stats_for_rent_payers(&pubkey, &stored_account, rent_collector)
Self::stats_for_rent_payers(pubkey, &stored_account, rent_collector)
{
amount_to_top_off_rent += amount_to_top_off_rent_this_account;
num_accounts_rent_paying += 1;
// remember this rent-paying account pubkey
rent_paying_accounts_by_partition.push(pubkey);
rent_paying_accounts_by_partition.push(*pubkey);
}
(
pubkey,
*pubkey,
AccountInfo::new(
StorageLocation::AppendVec(store_id, stored_account.offset()), // will never be cached
stored_account.lamports(),
@ -8998,15 +8984,31 @@ impl AccountsDb {
)
});
let (dirty_pubkeys, insert_time_us, generate_index_count) = self
let (dirty_pubkeys, insert_time_us, mut generate_index_results) = self
.accounts_index
.insert_new_if_missing_into_primary_index(slot, num_accounts, items);
.insert_new_if_missing_into_primary_index(slot, storage.approx_stored_count(), items);
if let Some(duplicates_this_slot) = std::mem::take(&mut generate_index_results.duplicates) {
// there were duplicate pubkeys in this same slot
// Some were not inserted. This means some info like stored data is off.
duplicates_this_slot
.into_iter()
.for_each(|(pubkey, (_slot, info))| {
let duplicate = storage.accounts.get_account(info.offset()).unwrap().0;
assert_eq!(&pubkey, duplicate.pubkey());
stored_size_alive = stored_size_alive.saturating_sub(duplicate.stored_size());
if !duplicate.is_zero_lamport() {
accounts_data_len =
accounts_data_len.saturating_sub(duplicate.data().len() as u64);
}
});
}
{
// second, collect into the shared DashMap once we've figured out all the info per store_id
let mut info = storage_info.entry(store_id).or_default();
info.stored_size += stored_size_alive;
info.count += generate_index_count.count;
info.count += generate_index_results.count;
}
// dirty_pubkeys will contain a pubkey if an item has multiple rooted entries for
@ -9017,7 +9019,7 @@ impl AccountsDb {
}
SlotIndexGenerationInfo {
insert_time_us,
num_accounts: num_accounts as u64,
num_accounts: generate_index_results.count as u64,
num_accounts_rent_paying,
accounts_data_len,
amount_to_top_off_rent,
@ -9176,7 +9178,6 @@ impl AccountsDb {
// no storage at this slot, no information to pull out
continue;
};
let accounts_map = self.process_storage_slot(&storage);
let store_id = storage.append_vec_id();
scan_time.stop();
@ -9194,12 +9195,13 @@ impl AccountsDb {
rent_paying_accounts_by_partition:
rent_paying_accounts_by_partition_this_slot,
} = self.generate_index_for_slot(
accounts_map,
&storage,
*slot,
store_id,
&rent_collector,
&storage_info,
);
rent_paying.fetch_add(rent_paying_this_slot, Ordering::Relaxed);
amount_to_top_off_rent
.fetch_add(amount_to_top_off_rent_this_slot, Ordering::Relaxed);
@ -9220,10 +9222,10 @@ impl AccountsDb {
// verify index matches expected and measure the time to get all items
assert!(verify);
let mut lookup_time = Measure::start("lookup_time");
for account in accounts_map.into_iter() {
let (key, account_info) = account;
let lock = self.accounts_index.get_bin(&key);
let x = lock.get(&key).unwrap();
for account_info in storage.accounts.account_iter() {
let key = account_info.pubkey();
let lock = self.accounts_index.get_bin(key);
let x = lock.get(key).unwrap();
let sl = x.slot_list.read().unwrap();
let mut count = 0;
for (slot2, account_info2) in sl.iter() {
@ -15818,9 +15820,8 @@ pub mod tests {
let storage = accounts.storage.get_slot_storage_entry(slot0).unwrap();
let storage_info = StorageSizeAndCountMap::default();
let accounts_map = accounts.process_storage_slot(&storage);
accounts.generate_index_for_slot(
accounts_map,
&storage,
slot0,
0,
&RentCollector::default(),
@ -15842,14 +15843,7 @@ pub mod tests {
// empty store
let storage = accounts.create_and_insert_store(0, 1, "test");
let storage_info = StorageSizeAndCountMap::default();
let accounts_map = accounts.process_storage_slot(&storage);
accounts.generate_index_for_slot(
accounts_map,
0,
0,
&RentCollector::default(),
&storage_info,
);
accounts.generate_index_for_slot(&storage, 0, 0, &RentCollector::default(), &storage_info);
assert!(storage_info.is_empty());
}
@ -15890,14 +15884,7 @@ pub mod tests {
);
let storage_info = StorageSizeAndCountMap::default();
let accounts_map = accounts.process_storage_slot(&storage);
accounts.generate_index_for_slot(
accounts_map,
0,
0,
&RentCollector::default(),
&storage_info,
);
accounts.generate_index_for_slot(&storage, 0, 0, &RentCollector::default(), &storage_info);
assert_eq!(storage_info.len(), 1);
for entry in storage_info.iter() {
assert_eq!(

View File

@ -72,9 +72,11 @@ pub type RefCount = u64;
pub type AccountMap<T, U> = Arc<InMemAccountsIndex<T, U>>;
#[derive(Default, Debug, PartialEq, Eq)]
pub(crate) struct GenerateIndexCount {
pub(crate) struct GenerateIndexResult<T: IndexValue> {
/// number of accounts inserted in the index
pub count: usize,
/// pubkeys which were present multiple times in the insertion request.
pub duplicates: Option<Vec<(Pubkey, (Slot, T))>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -1586,24 +1588,55 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
self.account_maps.len()
}
/// remove the earlier instances of each pubkey when the pubkey exists later in the `Vec`.
/// Could also be done with HashSet.
/// Returns `HashSet` of duplicate pubkeys.
fn remove_older_duplicate_pubkeys(
items: &mut Vec<(Pubkey, (Slot, T))>,
) -> Option<Vec<(Pubkey, (Slot, T))>> {
if items.len() < 2 {
return None;
}
// stable sort by pubkey.
// Earlier entries are overwritten by later entries
items.sort_by(|a, b| a.0.cmp(&b.0));
let mut duplicates = None::<Vec<(Pubkey, (Slot, T))>>;
let mut i = 0;
while i < items.len().saturating_sub(1) {
let this_key = &items[i].0;
// look at next entry. If it is same pubkey as this one, then remove this one.
if this_key == &items[i + 1].0 {
let mut duplicates_insert = duplicates.unwrap_or_default();
// i+1 is same pubkey as i, so remove i
duplicates_insert.push(items.remove(i));
duplicates = Some(duplicates_insert);
// `items` got smaller, so `i` remains the same.
// There could also be several duplicate pubkeys.
} else {
i += 1;
}
}
duplicates
}
// Same functionally to upsert, but:
// 1. operates on a batch of items
// 2. holds the write lock for the duration of adding the items
// Can save time when inserting lots of new keys.
// But, does NOT update secondary index
// This is designed to be called at startup time.
// returns (dirty_pubkeys, insertion_time_us, GenerateIndexCount)
// returns (dirty_pubkeys, insertion_time_us, GenerateIndexResult)
#[allow(clippy::needless_collect)]
pub(crate) fn insert_new_if_missing_into_primary_index(
&self,
slot: Slot,
item_len: usize,
approx_items_len: usize,
items: impl Iterator<Item = (Pubkey, T)>,
) -> (Vec<Pubkey>, u64, GenerateIndexCount) {
) -> (Vec<Pubkey>, u64, GenerateIndexResult<T>) {
// big enough so not likely to re-allocate, small enough to not over-allocate by too much
// this assumes the largest bin contains twice the expected amount of the average size per bin
let bins = self.bins();
let expected_items_per_bin = item_len * 2 / bins;
let expected_items_per_bin = approx_items_len * 2 / bins;
let use_disk = self.storage.storage.disk.is_some();
let mut binned = (0..bins)
.map(|_| Vec::with_capacity(expected_items_per_bin))
@ -1627,14 +1660,22 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
// This results in calls to insert_new_entry_if_missing_with_lock from different threads starting at different bins to avoid
// lock contention.
let random_offset = thread_rng().gen_range(0..bins);
let mut duplicates = Vec::default();
(0..bins).for_each(|pubkey_bin| {
let pubkey_bin = (pubkey_bin + random_offset) % bins;
let items = std::mem::take(&mut binned[pubkey_bin]);
let mut items = std::mem::take(&mut binned[pubkey_bin]);
if items.is_empty() {
return;
}
let these_duplicates = Self::remove_older_duplicate_pubkeys(&mut items);
if let Some(mut these_duplicates) = these_duplicates {
duplicates.append(&mut these_duplicates);
}
let r_account_maps = &self.account_maps[pubkey_bin];
let mut insert_time = Measure::start("insert_into_primary_index");
// count only considers non-duplicate accounts
count += items.len();
if use_disk {
r_account_maps.startup_insert_only(items.into_iter());
@ -1668,7 +1709,10 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
(
dirty_pubkeys,
insertion_time.load(Ordering::Relaxed),
GenerateIndexCount { count },
GenerateIndexResult {
count,
duplicates: (!duplicates.is_empty()).then_some(duplicates),
},
)
}
@ -2101,6 +2145,56 @@ pub mod tests {
assert_eq!(num, 0);
}
#[test]
fn test_remove_older_duplicate_pubkeys() {
let pk1 = Pubkey::new_from_array([0; 32]);
let pk2 = Pubkey::new_from_array([1; 32]);
let slot0 = 0;
let info2 = 55;
let mut items = vec![];
let removed = AccountsIndex::<u64, u64>::remove_older_duplicate_pubkeys(&mut items);
assert!(items.is_empty());
assert!(removed.is_none());
let mut items = vec![(pk1, (slot0, 1u64)), (pk2, (slot0, 2))];
let expected = items.clone();
let removed = AccountsIndex::<u64, u64>::remove_older_duplicate_pubkeys(&mut items);
assert_eq!(items, expected);
assert!(removed.is_none());
for dup in 0..3 {
for other in 0..dup + 2 {
let first_info = 10u64;
let mut items = vec![(pk1, (slot0, first_info))];
let mut expected_dups = items.clone();
for i in 0..dup {
let this_dup = (pk1, (slot0, i + 10u64 + 1));
if i < dup.saturating_sub(1) {
expected_dups.push(this_dup);
}
items.push(this_dup);
}
let mut expected = vec![*items.last().unwrap()];
let other_item = (pk2, (slot0, info2));
if other == dup + 1 {
// don't insert
} else if other == dup {
expected.push(other_item);
items.push(other_item);
} else {
expected.push(other_item);
items.insert(other as usize, other_item);
}
let result = AccountsIndex::<u64, u64>::remove_older_duplicate_pubkeys(&mut items);
assert_eq!(items, expected);
if dup != 0 {
assert_eq!(result.unwrap(), expected_dups);
} else {
assert!(result.is_none());
}
}
}
}
#[test]
fn test_secondary_index_include_exclude() {
let pk1 = Pubkey::new_unique();
@ -2194,6 +2288,44 @@ pub mod tests {
true
}
}
#[test]
fn test_insert_duplicates() {
let key = solana_sdk::pubkey::new_rand();
let pubkey = &key;
let slot = 0;
let mut ancestors = Ancestors::default();
ancestors.insert(slot, 0);
let account_info = true;
let index = AccountsIndex::<bool, bool>::default_for_tests();
let account_info2: bool = !account_info;
let items = vec![(*pubkey, account_info), (*pubkey, account_info2)];
index.set_startup(Startup::Startup);
let (_, _, result) =
index.insert_new_if_missing_into_primary_index(slot, items.len(), items.into_iter());
assert_eq!(result.count, 1);
index.set_startup(Startup::Normal);
if let AccountIndexGetResult::Found(entry, index) =
// the entry for
index.get_for_tests(pubkey, Some(&ancestors), None)
{
// make sure the one with the correct info is added
assert_eq!(entry.slot_list()[index], (slot, account_info2));
// make sure it wasn't inserted twice
assert_eq!(
entry
.slot_list()
.iter()
.filter_map(|(entry_slot, _)| (entry_slot == &slot).then_some(true))
.count(),
1
);
} else {
panic!("failed");
}
}
#[test]
fn test_insert_new_with_lock_no_ancestors() {
let key = solana_sdk::pubkey::new_rand();