calculate store info while generating index (#19107)

* calculate store info while generating index

* update store info during index generation pass
This commit is contained in:
Jeff Washington (jwash) 2021-08-10 07:39:59 -05:00 committed by GitHub
parent 47e0d9aa95
commit c18bd08021
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 81 additions and 88 deletions

View File

@ -180,6 +180,7 @@ struct StorageSizeAndCount {
pub stored_size: usize,
pub count: usize,
}
type StorageSizeAndCountMap = DashMap<AppendVecId, StorageSizeAndCount>;
impl GenerateIndexTimings {
pub fn report(&self) {
@ -5968,11 +5969,13 @@ impl AccountsDb {
// verify checks that all the expected items are in the accounts index and measures how long it takes to look them all up
let passes = if verify { 2 } else { 1 };
for pass in 0..passes {
let storage_info = StorageSizeAndCountMap::default();
let total_processed_slots_across_all_threads = AtomicU64::new(0);
let outer_slots_len = slots.len();
let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot
let mut index_time = Measure::start("index");
let insertion_time_us = AtomicU64::new(0);
let storage_info_timings = Mutex::new(GenerateIndexTimings::default());
let scan_time: u64 = slots
.par_chunks(chunk_size)
.map(|slots| {
@ -5992,6 +5995,11 @@ impl AccountsDb {
let accounts_map = Self::process_storage_slot(&storage_maps);
scan_time.stop();
scan_time_sum += scan_time.as_us();
Self::update_storage_info(
&storage_info,
&accounts_map,
&storage_info_timings,
);
let insert_us = if pass == 0 {
// generate index
@ -6047,6 +6055,8 @@ impl AccountsDb {
})
.sum();
let storage_info_timings = storage_info_timings.into_inner().unwrap();
let mut timings = GenerateIndexTimings {
scan_time,
index_time: index_time.as_us(),
@ -6054,6 +6064,9 @@ impl AccountsDb {
min_bin_size,
max_bin_size,
total_items,
storage_size_accounts_map_us: storage_info_timings.storage_size_accounts_map_us,
storage_size_accounts_map_flatten_us: storage_info_timings
.storage_size_accounts_map_flatten_us,
..GenerateIndexTimings::default()
};
@ -6063,66 +6076,49 @@ impl AccountsDb {
self.accounts_index.add_root(*slot, false);
}
self.initialize_storage_count_and_alive_bytes(&mut timings);
self.set_storage_count_and_alive_bytes(storage_info, &mut timings);
}
timings.report();
}
}
fn calculate_storage_count_and_alive_bytes(
&self,
timings: &mut GenerateIndexTimings,
) -> HashMap<AppendVecId, StorageSizeAndCount> {
// look at every account in the account index and calculate for each storage: stored_size and count
fn update_storage_info(
storage_info: &StorageSizeAndCountMap,
accounts_map: &GenerateIndexAccountsMap<'_>,
timings: &Mutex<GenerateIndexTimings>,
) {
let mut storage_size_accounts_map_time = Measure::start("storage_size_accounts_map");
let mut maps = self
.accounts_index
.account_maps
.par_iter()
.map(|bin_map| {
let mut stored_sizes_and_counts = HashMap::new();
bin_map.read().unwrap().values().for_each(|entry| {
entry
.slot_list
.read()
.unwrap()
.iter()
.for_each(|(_slot, account_entry)| {
let storage_entry_meta = stored_sizes_and_counts
.entry(account_entry.store_id)
.or_insert_with(StorageSizeAndCount::default);
storage_entry_meta.stored_size += account_entry.stored_size;
storage_entry_meta.count += 1;
})
});
stored_sizes_and_counts
})
.collect::<Vec<_>>();
storage_size_accounts_map_time.stop();
timings.storage_size_accounts_map_us = storage_size_accounts_map_time.as_us();
// flatten/merge the HashMaps from the parallel iteration above
let mut storage_info_local = HashMap::<AppendVecId, StorageSizeAndCount>::default();
// first collect into a local HashMap with no lock contention
for (_, v) in accounts_map.iter() {
let mut info = storage_info_local
.entry(v.store_id)
.or_insert_with(StorageSizeAndCount::default);
info.stored_size += v.stored_account.stored_size;
info.count += 1;
}
storage_size_accounts_map_time.stop();
// second, collect into the shared DashMap once we've figured out all the info per store_id
let mut storage_size_accounts_map_flatten_time =
Measure::start("storage_size_accounts_map_flatten_time");
let mut stored_sizes_and_counts = maps.pop().unwrap_or_default();
for map in maps {
for (store_id, meta) in map.into_iter() {
let storage_entry_meta = stored_sizes_and_counts
.entry(store_id)
.or_insert_with(StorageSizeAndCount::default);
storage_entry_meta.stored_size += meta.stored_size;
storage_entry_meta.count += meta.count;
}
for (store_id, v) in storage_info_local.into_iter() {
let mut info = storage_info
.entry(store_id)
.or_insert_with(StorageSizeAndCount::default);
info.stored_size += v.stored_size;
info.count += v.count;
}
storage_size_accounts_map_flatten_time.stop();
timings.storage_size_accounts_map_flatten_us =
storage_size_accounts_map_flatten_time.as_us();
stored_sizes_and_counts
}
let mut timings = timings.lock().unwrap();
timings.storage_size_accounts_map_us += storage_size_accounts_map_time.as_us();
timings.storage_size_accounts_map_flatten_us +=
storage_size_accounts_map_flatten_time.as_us();
}
fn set_storage_count_and_alive_bytes(
&self,
stored_sizes_and_counts: HashMap<usize, StorageSizeAndCount>,
stored_sizes_and_counts: StorageSizeAndCountMap,
timings: &mut GenerateIndexTimings,
) {
// store count and size for each storage
@ -6131,12 +6127,15 @@ impl AccountsDb {
for (id, store) in slot_stores.value().read().unwrap().iter() {
// Should be default at this point
assert_eq!(store.alive_bytes(), 0);
if let Some(StorageSizeAndCount { stored_size, count }) =
stored_sizes_and_counts.get(id)
{
trace!("id: {} setting count: {} cur: {}", id, count, store.count(),);
store.count_and_status.write().unwrap().0 = *count;
store.alive_bytes.store(*stored_size, Ordering::SeqCst);
if let Some(entry) = stored_sizes_and_counts.get(id) {
trace!(
"id: {} setting count: {} cur: {}",
id,
entry.count,
store.count(),
);
store.count_and_status.write().unwrap().0 = entry.count;
store.alive_bytes.store(entry.stored_size, Ordering::SeqCst);
} else {
trace!("id: {} clearing count", id);
store.count_and_status.write().unwrap().0 = 0;
@ -6147,11 +6146,6 @@ impl AccountsDb {
timings.storage_size_storages_us = storage_size_storages_time.as_us();
}
fn initialize_storage_count_and_alive_bytes(&self, timings: &mut GenerateIndexTimings) {
let stored_sizes_and_counts = self.calculate_storage_count_and_alive_bytes(timings);
self.set_storage_count_and_alive_bytes(stored_sizes_and_counts, timings);
}
pub(crate) fn print_accounts_stats(&self, label: &str) {
self.print_index(label);
self.print_count_and_status(label);
@ -11979,29 +11973,29 @@ pub mod tests {
let slot0 = 0;
accounts.store_uncached(slot0, &[(&shared_key, &account)]);
let result =
accounts.calculate_storage_count_and_alive_bytes(&mut GenerateIndexTimings::default());
assert_eq!(result.len(), 1);
for (k, v) in result.iter() {
let storage_maps = accounts
.storage
.get_slot_storage_entries(slot0)
.unwrap_or_default();
let storage_info = StorageSizeAndCountMap::default();
let accounts_map = AccountsDb::process_storage_slot(&storage_maps[..]);
AccountsDb::update_storage_info(&storage_info, &accounts_map, &Mutex::default());
assert_eq!(storage_info.len(), 1);
for entry in storage_info.iter() {
assert_eq!(
(k, v),
(
&0,
&StorageSizeAndCount {
stored_size: 144,
count: 1
}
)
(entry.key(), entry.value().count, entry.value().stored_size),
(&0, 1, 144)
);
}
}
#[test]
fn test_calculate_storage_count_and_alive_bytes_0_accounts() {
let accounts = AccountsDb::new_single_for_tests();
let result =
accounts.calculate_storage_count_and_alive_bytes(&mut GenerateIndexTimings::default());
assert!(result.is_empty());
let storage_maps = vec![];
let storage_info = StorageSizeAndCountMap::default();
let accounts_map = AccountsDb::process_storage_slot(&storage_maps[..]);
AccountsDb::update_storage_info(&storage_info, &accounts_map, &Mutex::default());
assert!(storage_info.is_empty());
}
#[test]
@ -12028,19 +12022,18 @@ pub mod tests {
accounts.store_uncached(slot0, &[(&keys[0], &account)]);
accounts.store_uncached(slot0, &[(&keys[1], &account_big)]);
let result =
accounts.calculate_storage_count_and_alive_bytes(&mut GenerateIndexTimings::default());
assert_eq!(result.len(), 1);
for (k, v) in result.iter() {
let storage_maps = accounts
.storage
.get_slot_storage_entries(slot0)
.unwrap_or_default();
let storage_info = StorageSizeAndCountMap::default();
let accounts_map = AccountsDb::process_storage_slot(&storage_maps[..]);
AccountsDb::update_storage_info(&storage_info, &accounts_map, &Mutex::default());
assert_eq!(storage_info.len(), 1);
for entry in storage_info.iter() {
assert_eq!(
(k, v),
(
&0,
&StorageSizeAndCount {
stored_size: 1280,
count: 2
}
)
(entry.key(), entry.value().count, entry.value().stored_size),
(&0, 2, 1280)
);
}
}
@ -12063,15 +12056,15 @@ pub mod tests {
}
// populate based on made up hash data
let mut hashmap = HashMap::default();
hashmap.insert(
let dashmap = DashMap::default();
dashmap.insert(
0,
StorageSizeAndCount {
stored_size: 2,
count: 3,
},
);
accounts.set_storage_count_and_alive_bytes(hashmap, &mut GenerateIndexTimings::default());
accounts.set_storage_count_and_alive_bytes(dashmap, &mut GenerateIndexTimings::default());
assert_eq!(accounts.storage.0.len(), 1);
for slot_stores in accounts.storage.0.iter() {
for (id, store) in slot_stores.value().read().unwrap().iter() {