stop removing empty bins during index generation (#33242)
This commit is contained in:
parent
886eabd74d
commit
3ad8394047
|
@ -1604,39 +1604,35 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
|
||||||
// this assumes the largest bin contains twice the expected amount of the average size per bin
|
// this assumes the largest bin contains twice the expected amount of the average size per bin
|
||||||
let bins = self.bins();
|
let bins = self.bins();
|
||||||
let expected_items_per_bin = item_len * 2 / bins;
|
let expected_items_per_bin = item_len * 2 / bins;
|
||||||
// offset bin 0 in the 'binned' array by a random amount.
|
|
||||||
// This results in calls to insert_new_entry_if_missing_with_lock from different threads starting at different bins.
|
|
||||||
let random_offset = thread_rng().gen_range(0..bins);
|
|
||||||
let use_disk = self.storage.storage.disk.is_some();
|
let use_disk = self.storage.storage.disk.is_some();
|
||||||
let mut binned = (0..bins)
|
let mut binned = (0..bins)
|
||||||
.map(|mut pubkey_bin| {
|
.map(|_| Vec::with_capacity(expected_items_per_bin))
|
||||||
// opposite of (pubkey_bin + random_offset) % bins
|
|
||||||
pubkey_bin = if pubkey_bin < random_offset {
|
|
||||||
pubkey_bin + bins - random_offset
|
|
||||||
} else {
|
|
||||||
pubkey_bin - random_offset
|
|
||||||
};
|
|
||||||
(pubkey_bin, Vec::with_capacity(expected_items_per_bin))
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
let mut dirty_pubkeys = items
|
let mut dirty_pubkeys = items
|
||||||
.filter_map(|(pubkey, account_info)| {
|
.filter_map(|(pubkey, account_info)| {
|
||||||
let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey);
|
let pubkey_bin = self.bin_calculator.bin_from_pubkey(&pubkey);
|
||||||
let binned_index = (pubkey_bin + random_offset) % bins;
|
|
||||||
// this value is equivalent to what update() below would have created if we inserted a new item
|
// this value is equivalent to what update() below would have created if we inserted a new item
|
||||||
let is_zero_lamport = account_info.is_zero_lamport();
|
let is_zero_lamport = account_info.is_zero_lamport();
|
||||||
let result = if is_zero_lamport { Some(pubkey) } else { None };
|
let result = if is_zero_lamport { Some(pubkey) } else { None };
|
||||||
|
|
||||||
binned[binned_index].1.push((pubkey, (slot, account_info)));
|
binned[pubkey_bin].push((pubkey, (slot, account_info)));
|
||||||
result
|
result
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
binned.retain(|x| !x.1.is_empty());
|
|
||||||
|
|
||||||
let insertion_time = AtomicU64::new(0);
|
let insertion_time = AtomicU64::new(0);
|
||||||
|
|
||||||
binned.into_iter().for_each(|(pubkey_bin, items)| {
|
// offset bin processing in the 'binned' array by a random amount.
|
||||||
|
// This results in calls to insert_new_entry_if_missing_with_lock from different threads starting at different bins to avoid
|
||||||
|
// lock contention.
|
||||||
|
let random_offset = thread_rng().gen_range(0..bins);
|
||||||
|
(0..bins).for_each(|pubkey_bin| {
|
||||||
|
let pubkey_bin = (pubkey_bin + random_offset) % bins;
|
||||||
|
let items = std::mem::take(&mut binned[pubkey_bin]);
|
||||||
|
if items.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
let r_account_maps = &self.account_maps[pubkey_bin];
|
let r_account_maps = &self.account_maps[pubkey_bin];
|
||||||
let mut insert_time = Measure::start("insert_into_primary_index");
|
let mut insert_time = Measure::start("insert_into_primary_index");
|
||||||
count += items.len();
|
count += items.len();
|
||||||
|
|
Loading…
Reference in New Issue