From a2df1eb5025264c1c89cc90cc69fde9ccb5119a4 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Thu, 9 Dec 2021 11:54:14 -0600 Subject: [PATCH] AcctIdx: disk generate index and filler accounts use more threads (#21566) --- runtime/src/accounts_db.rs | 91 ++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index df9078bd64..ae59f396f4 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -6798,51 +6798,49 @@ impl AccountsDb { .skip(pass * per_pass) .take(per_pass) .collect::>(); - self.thread_pool.install(|| { - roots_in_this_pass.into_par_iter().for_each(|slot| { - let storage_maps: Vec> = self - .storage - .get_slot_storage_entries(*slot) - .unwrap_or_default(); - if storage_maps.is_empty() { - return; - } + roots_in_this_pass.into_par_iter().for_each(|slot| { + let storage_maps: Vec> = self + .storage + .get_slot_storage_entries(*slot) + .unwrap_or_default(); + if storage_maps.is_empty() { + return; + } - let partition = crate::bank::Bank::variable_cycle_partition_from_previous_slot( - epoch_schedule, - *slot, - ); - let subrange = crate::bank::Bank::pubkey_range_from_partition(partition); + let partition = crate::bank::Bank::variable_cycle_partition_from_previous_slot( + epoch_schedule, + *slot, + ); + let subrange = crate::bank::Bank::pubkey_range_from_partition(partition); - let idx = overall_index.fetch_add(1, Ordering::Relaxed); - let filler_entries = (idx + 1) * self.filler_account_count / root_count - - idx * self.filler_account_count / root_count; - let accounts = (0..filler_entries) - .map(|_| { - let my_id = added.fetch_add(1, Ordering::Relaxed); - let my_id_bytes = u32::to_be_bytes(my_id as u32); + let idx = overall_index.fetch_add(1, Ordering::Relaxed); + let filler_entries = (idx + 1) * self.filler_account_count / root_count + - idx * self.filler_account_count / root_count; + let accounts = (0..filler_entries) + .map(|_| { + let my_id = added.fetch_add(1, Ordering::Relaxed); + let my_id_bytes = u32::to_be_bytes(my_id as u32); - // pubkey begins life as entire filler 'suffix' pubkey - let mut key = self.filler_account_suffix.unwrap(); - let rent_prefix_bytes = Self::filler_rent_partition_prefix_bytes(); - // first bytes are replaced with rent partition range: filler_rent_partition_prefix_bytes - key.as_mut()[0..rent_prefix_bytes] - .copy_from_slice(&subrange.start().as_ref()[0..rent_prefix_bytes]); - // next bytes are replaced with my_id: filler_unique_id_bytes - key.as_mut()[rent_prefix_bytes - ..(rent_prefix_bytes + Self::filler_unique_id_bytes())] - .copy_from_slice(&my_id_bytes); - assert!(subrange.contains(&key)); - key - }) - .collect::>(); - let add = accounts - .iter() - .map(|key| (key, &account)) - .collect::>(); - let hashes = (0..filler_entries).map(|_| hash).collect::>(); - self.store_accounts_frozen(*slot, &add[..], Some(&hashes[..]), None, None); - }) + // pubkey begins life as entire filler 'suffix' pubkey + let mut key = self.filler_account_suffix.unwrap(); + let rent_prefix_bytes = Self::filler_rent_partition_prefix_bytes(); + // first bytes are replaced with rent partition range: filler_rent_partition_prefix_bytes + key.as_mut()[0..rent_prefix_bytes] + .copy_from_slice(&subrange.start().as_ref()[0..rent_prefix_bytes]); + // next bytes are replaced with my_id: filler_unique_id_bytes + key.as_mut()[rent_prefix_bytes + ..(rent_prefix_bytes + Self::filler_unique_id_bytes())] + .copy_from_slice(&my_id_bytes); + assert!(subrange.contains(&key)); + key + }) + .collect::>(); + let add = accounts + .iter() + .map(|key| (key, &account)) + .collect::>(); + let hashes = (0..filler_entries).map(|_| hash).collect::>(); + self.store_accounts_frozen(*slot, &add[..], Some(&hashes[..]), None, None); }); self.accounts_index.set_startup(false); } @@ -6882,7 +6880,14 @@ impl AccountsDb { let storage_info = StorageSizeAndCountMap::default(); let total_processed_slots_across_all_threads = AtomicU64::new(0); let outer_slots_len = slots.len(); - let chunk_size = (outer_slots_len / 7) + 1; // approximately 400k slots in a snapshot + let threads = if self.accounts_index.is_disk_index_enabled() { + // these write directly to disk, so the more threads, the better + num_cpus::get() + } else { + // seems to be a good hueristic given varying # cpus for in-mem disk index + 8 + }; + let chunk_size = (outer_slots_len / (std::cmp::max(1, threads.saturating_sub(1)))) + 1; // approximately 400k slots in a snapshot let mut index_time = Measure::start("index"); let insertion_time_us = AtomicU64::new(0); let rent_exempt = AtomicU64::new(0);