hash dedup vec (#33246)

* hash_dedup vec algo

* reviews

* reviews

* more reviews

* simplify working_set init with add_next_item

* refactor to remove special case "new" from add_item.

The new change is that, even the new item is the new min, it will still be added to working_set.
This change will make init working_set code simpler and the loop loop check simpler.
Since the item is inserted in at the end of the vector, the cost of push into and pop from the working will be O(1), shouldn't affect performance much.

* comments

* refactor unnamed tuple in working set to SlotGroupPointer type

* use SlotGroupPointer in ItemLocation

* Add Copy traits to avoid explicty call of clone on SlotGroupPointer

* consume next in add_next_item fn (credit to jeff).

note that the old code is still correct, since before call to
add_next_item, we will have already overwritten `next` to correct value.

---------

Co-authored-by: HaoranYi <haoran.yi@solana.com>
This commit is contained in:
HaoranYi 2023-09-28 08:58:08 -05:00 committed by GitHub
parent 5b9a167c51
commit 25c27d452c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 195 additions and 118 deletions

View File

@ -435,6 +435,24 @@ pub struct AccountsHasher<'a> {
pub(crate) active_stats: &'a ActiveStats,
}
/// Pointer to a specific item in chunked accounts hash slices.
#[derive(Debug, Clone, Copy)]
struct SlotGroupPointer {
/// slot group index
slot_group_index: usize,
/// offset within a slot group
offset: usize,
}
/// A struct for the location of an account hash item inside chunked accounts hash slices.
#[derive(Debug)]
struct ItemLocation<'a> {
/// account's pubkey
key: &'a Pubkey,
/// pointer to the item in slot group slices
pointer: SlotGroupPointer,
}
impl<'a> AccountsHasher<'a> {
/// true if it is possible that there are filler accounts present
pub fn filler_accounts_enabled(&self) -> bool {
@ -813,59 +831,45 @@ impl<'a> AccountsHasher<'a> {
(hashes, lamports_total)
}
/// returns the item referenced by `min_index`
/// updates `indexes` to skip over the pubkey and its duplicates
/// updates `first_items` to point to the next pubkey
/// or removes the entire pubkey division entries (for `min_index`) if the referenced pubkey is the last entry in the same `bin`
/// removed from: `first_items`, `indexes`, and `first_item_pubkey_division`
/// Given the item location, return the item in the `CalculatedHashIntermediate` slices and the next item location in the same bin.
/// If the end of the `CalculatedHashIntermediate` slice is reached or all the accounts in current bin have been exhausted, return `None` for next item location.
fn get_item<'b>(
min_index: usize,
bin: usize,
first_items: &mut Vec<Pubkey>,
sorted_data_by_pubkey: &[&'b [CalculateHashIntermediate]],
indexes: &mut Vec<usize>,
first_item_to_pubkey_division: &mut Vec<usize>,
bin: usize,
binner: &PubkeyBinCalculator24,
) -> &'b CalculateHashIntermediate {
let first_item = first_items[min_index];
let key = &first_item;
let division_index = first_item_to_pubkey_division[min_index];
let division_data = &sorted_data_by_pubkey[division_index];
let mut index = indexes[min_index];
item_loc: &ItemLocation<'b>,
) -> (&'b CalculateHashIntermediate, Option<ItemLocation<'b>>) {
let division_data = &sorted_data_by_pubkey[item_loc.pointer.slot_group_index];
let mut index = item_loc.pointer.offset;
index += 1;
let mut end;
loop {
end = index >= division_data.len();
if end {
break;
}
let mut next = None;
while index < division_data.len() {
// still more items where we found the previous key, so just increment the index for that slot group, skipping all pubkeys that are equal
let next_key = &division_data[index].pubkey;
if next_key == key {
if next_key == item_loc.key {
index += 1;
continue; // duplicate entries of same pubkey, so keep skipping
}
if binner.bin_from_pubkey(next_key) > bin {
// the next pubkey is not in our bin
end = true;
break;
}
// point to the next pubkey > key
first_items[min_index] = *next_key;
indexes[min_index] = index;
next = Some(ItemLocation {
key: next_key,
pointer: SlotGroupPointer {
slot_group_index: item_loc.pointer.slot_group_index,
offset: index,
},
});
break;
}
if end {
// stop looking in this vector - we exhausted it
first_items.remove(min_index);
first_item_to_pubkey_division.remove(min_index);
indexes.remove(min_index);
}
// this is the previous first item that was requested
&division_data[index - 1]
(&division_data[index - 1], next)
}
/// `hash_data` must be sorted by `binner.bin_from_pubkey()`
@ -944,6 +948,144 @@ impl<'a> AccountsHasher<'a> {
result
}
/// Return the working_set and max number of pubkeys for hash dedup.
/// `working_set` holds SlotGroupPointer {slot_group_index, offset} for items in account's pubkey descending order.
fn initialize_dedup_working_set(
sorted_data_by_pubkey: &[&[CalculateHashIntermediate]],
pubkey_bin: usize,
bins: usize,
binner: &PubkeyBinCalculator24,
stats: &HashStats,
) -> (
Vec<SlotGroupPointer>, /* working_set */
usize, /* max_inclusive_num_pubkeys */
) {
// working_set holds the lowest items for each slot_group sorted by pubkey descending (min_key is the last)
let mut working_set: Vec<SlotGroupPointer> = Vec::default();
// Initialize 'working_set', which holds the current lowest item in each slot group.
// `working_set` should be initialized in reverse order of slot_groups. Later slot_groups are
// processed first. For each slot_group, if the lowest item for current slot group is
// already in working_set (i.e. inserted by a later slot group), the next lowest item
// in this slot group is searched and checked, until either one that is `not` in the
// working_set is found, which will then be inserted, or no next lowest item is found.
// Iterating in reverse order of slot_group will guarantee that each slot group will be
// scanned only once and scanned continuously. Therefore, it can achieve better data
// locality during the scan.
let max_inclusive_num_pubkeys = sorted_data_by_pubkey
.iter()
.enumerate()
.rev()
.map(|(i, hash_data)| {
let first_pubkey_in_bin =
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, binner, stats);
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
let mut next = Some(ItemLocation {
key: &hash_data[first_pubkey_in_bin].pubkey,
pointer: SlotGroupPointer {
slot_group_index: i,
offset: first_pubkey_in_bin,
},
});
Self::add_next_item(
&mut next,
&mut working_set,
sorted_data_by_pubkey,
pubkey_bin,
binner,
);
let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1;
while first_pubkey_in_next_bin < hash_data.len() {
if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey)
!= pubkey_bin
{
break;
}
first_pubkey_in_next_bin += 1;
}
first_pubkey_in_next_bin - first_pubkey_in_bin
} else {
0
}
})
.sum::<usize>();
(working_set, max_inclusive_num_pubkeys)
}
/// Add next item into hash dedup working set
fn add_next_item<'b>(
next: &mut Option<ItemLocation<'b>>,
working_set: &mut Vec<SlotGroupPointer>,
sorted_data_by_pubkey: &[&'b [CalculateHashIntermediate]],
pubkey_bin: usize,
binner: &PubkeyBinCalculator24,
) {
// looping to add next item to working set
while let Some(ItemLocation { key, pointer }) = std::mem::take(next) {
// if `new key` is less than the min key in the working set, skip binary search and
// insert item to the end vec directly
if let Some(SlotGroupPointer {
slot_group_index: current_min_slot_group_index,
offset: current_min_offset,
}) = working_set.last()
{
let current_min_key = &sorted_data_by_pubkey[*current_min_slot_group_index]
[*current_min_offset]
.pubkey;
if key < current_min_key {
working_set.push(pointer);
break;
}
}
let found = working_set.binary_search_by(|pointer| {
let prob = &sorted_data_by_pubkey[pointer.slot_group_index][pointer.offset].pubkey;
(*key).cmp(prob)
});
match found {
Err(index) => {
// found a new new key, insert into the working_set. This is O(n/2) on
// average. Theoretically, this operation could be expensive and may be further
// optimized in future.
working_set.insert(index, pointer);
break;
}
Ok(index) => {
let found = &mut working_set[index];
if found.slot_group_index > pointer.slot_group_index {
// There is already a later slot group that contains this key in the working_set,
// look up again.
let (_item, new_next) = Self::get_item(
sorted_data_by_pubkey,
pubkey_bin,
binner,
&ItemLocation { key, pointer },
);
*next = new_next;
} else {
// A previous slot contains this key, replace it, and look for next item in the previous slot group.
let (_item, new_next) = Self::get_item(
sorted_data_by_pubkey,
pubkey_bin,
binner,
&ItemLocation {
key,
pointer: *found,
},
);
*found = pointer;
*next = new_next;
}
}
}
}
}
// go through: [..][pubkey_bin][..] and return hashes and lamport sum
// slot groups^ ^accounts found in a slot group, sorted by pubkey, higher slot, write_version
// 1. handle zero lamport accounts
@ -960,40 +1102,15 @@ impl<'a> AccountsHasher<'a> {
) -> (AccountHashesFile, u64) {
let binner = PubkeyBinCalculator24::new(bins);
let len = sorted_data_by_pubkey.len();
let mut indexes = Vec::with_capacity(len);
let mut first_items = Vec::with_capacity(len);
// map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[]
// this will change as items in sorted_data_by_pubkey[] are exhausted
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
// working_set hold the lowest items for each slot_group sorted by pubkey descending (min_key is the last)
let (mut working_set, max_inclusive_num_pubkeys) = Self::initialize_dedup_working_set(
sorted_data_by_pubkey,
pubkey_bin,
bins,
&binner,
stats,
);
// initialize 'first_items', which holds the current lowest item in each slot group
let max_inclusive_num_pubkeys = sorted_data_by_pubkey
.iter()
.enumerate()
.map(|(i, hash_data)| {
let first_pubkey_in_bin =
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats);
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
let k = hash_data[first_pubkey_in_bin].pubkey;
first_items.push(k);
first_item_to_pubkey_division.push(i);
indexes.push(first_pubkey_in_bin);
let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1;
while first_pubkey_in_next_bin < hash_data.len() {
if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey)
!= pubkey_bin
{
break;
}
first_pubkey_in_next_bin += 1;
}
first_pubkey_in_next_bin - first_pubkey_in_bin
} else {
0
}
})
.sum::<usize>();
let mut hashes = AccountHashesFile {
writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
@ -1001,47 +1118,17 @@ impl<'a> AccountsHasher<'a> {
};
let mut overall_sum = 0;
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
let filler_accounts_enabled = self.filler_accounts_enabled();
// this loop runs once per unique pubkey contained in any slot group
while !first_items.is_empty() {
let loop_stop = { first_items.len() - 1 }; // we increment at the beginning of the loop
let mut min_index = 0;
let mut min_pubkey = first_items[min_index];
let mut first_item_index = 0; // we will start iterating at item 1. +=1 is first instruction in loop
while let Some(pointer) = working_set.pop() {
let key = &sorted_data_by_pubkey[pointer.slot_group_index][pointer.offset].pubkey;
// this loop iterates over each slot group to find the minimum pubkey at the maximum slot
// it also identifies duplicate pubkey entries at lower slots and remembers those to skip them after
while first_item_index < loop_stop {
first_item_index += 1;
let key = &first_items[first_item_index];
let cmp = min_pubkey.cmp(key);
match cmp {
std::cmp::Ordering::Less => {
continue; // we still have the min item
}
std::cmp::Ordering::Equal => {
// we found the same pubkey in a later slot, so remember the lower slot as a duplicate
duplicate_pubkey_indexes.push(min_index);
}
std::cmp::Ordering::Greater => {
// this is the new min pubkey
min_pubkey = *key;
}
}
// this is the new index of the min entry
min_index = first_item_index;
}
// get the min item, add lamports, get hash
let item = Self::get_item(
min_index,
pubkey_bin,
&mut first_items,
let (item, mut next) = Self::get_item(
sorted_data_by_pubkey,
&mut indexes,
&mut first_item_to_pubkey_division,
pubkey_bin,
&binner,
&ItemLocation { key, pointer },
);
// add lamports and get hash
@ -1064,23 +1151,13 @@ impl<'a> AccountsHasher<'a> {
}
}
if !duplicate_pubkey_indexes.is_empty() {
// skip past duplicate keys in earlier slots
// reverse this list because get_item can remove first_items[*i] when *i is exhausted
// and that would mess up subsequent *i values
duplicate_pubkey_indexes.iter().rev().for_each(|i| {
Self::get_item(
*i,
pubkey_bin,
&mut first_items,
sorted_data_by_pubkey,
&mut indexes,
&mut first_item_to_pubkey_division,
&binner,
);
});
duplicate_pubkey_indexes.clear();
}
Self::add_next_item(
&mut next,
&mut working_set,
sorted_data_by_pubkey,
pubkey_bin,
&binner,
);
}
(hashes, overall_sum)