From 25c27d452cc389ef4cf43a10e47b71c3000283df Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Thu, 28 Sep 2023 08:58:08 -0500 Subject: [PATCH] hash dedup vec (#33246) * hash_dedup vec algo * reviews * reviews * more reviews * simplify working_set init with add_next_item * refactor to remove special case "new" from add_item. The new change is that, even the new item is the new min, it will still be added to working_set. This change will make init working_set code simpler and the loop loop check simpler. Since the item is inserted in at the end of the vector, the cost of push into and pop from the working will be O(1), shouldn't affect performance much. * comments * refactor unnamed tuple in working set to SlotGroupPointer type * use SlotGroupPointer in ItemLocation * Add Copy traits to avoid explicty call of clone on SlotGroupPointer * consume next in add_next_item fn (credit to jeff). note that the old code is still correct, since before call to add_next_item, we will have already overwritten `next` to correct value. --------- Co-authored-by: HaoranYi --- accounts-db/src/accounts_hash.rs | 313 +++++++++++++++++++------------ 1 file changed, 195 insertions(+), 118 deletions(-) diff --git a/accounts-db/src/accounts_hash.rs b/accounts-db/src/accounts_hash.rs index 74d1cfa8a1..f5c0a78ffa 100644 --- a/accounts-db/src/accounts_hash.rs +++ b/accounts-db/src/accounts_hash.rs @@ -435,6 +435,24 @@ pub struct AccountsHasher<'a> { pub(crate) active_stats: &'a ActiveStats, } +/// Pointer to a specific item in chunked accounts hash slices. +#[derive(Debug, Clone, Copy)] +struct SlotGroupPointer { + /// slot group index + slot_group_index: usize, + /// offset within a slot group + offset: usize, +} + +/// A struct for the location of an account hash item inside chunked accounts hash slices. +#[derive(Debug)] +struct ItemLocation<'a> { + /// account's pubkey + key: &'a Pubkey, + /// pointer to the item in slot group slices + pointer: SlotGroupPointer, +} + impl<'a> AccountsHasher<'a> { /// true if it is possible that there are filler accounts present pub fn filler_accounts_enabled(&self) -> bool { @@ -813,59 +831,45 @@ impl<'a> AccountsHasher<'a> { (hashes, lamports_total) } - /// returns the item referenced by `min_index` - /// updates `indexes` to skip over the pubkey and its duplicates - /// updates `first_items` to point to the next pubkey - /// or removes the entire pubkey division entries (for `min_index`) if the referenced pubkey is the last entry in the same `bin` - /// removed from: `first_items`, `indexes`, and `first_item_pubkey_division` + /// Given the item location, return the item in the `CalculatedHashIntermediate` slices and the next item location in the same bin. + /// If the end of the `CalculatedHashIntermediate` slice is reached or all the accounts in current bin have been exhausted, return `None` for next item location. fn get_item<'b>( - min_index: usize, - bin: usize, - first_items: &mut Vec, sorted_data_by_pubkey: &[&'b [CalculateHashIntermediate]], - indexes: &mut Vec, - first_item_to_pubkey_division: &mut Vec, + bin: usize, binner: &PubkeyBinCalculator24, - ) -> &'b CalculateHashIntermediate { - let first_item = first_items[min_index]; - let key = &first_item; - let division_index = first_item_to_pubkey_division[min_index]; - let division_data = &sorted_data_by_pubkey[division_index]; - let mut index = indexes[min_index]; + item_loc: &ItemLocation<'b>, + ) -> (&'b CalculateHashIntermediate, Option>) { + let division_data = &sorted_data_by_pubkey[item_loc.pointer.slot_group_index]; + let mut index = item_loc.pointer.offset; index += 1; - let mut end; - loop { - end = index >= division_data.len(); - if end { - break; - } + let mut next = None; + + while index < division_data.len() { // still more items where we found the previous key, so just increment the index for that slot group, skipping all pubkeys that are equal let next_key = &division_data[index].pubkey; - if next_key == key { + if next_key == item_loc.key { index += 1; continue; // duplicate entries of same pubkey, so keep skipping } if binner.bin_from_pubkey(next_key) > bin { // the next pubkey is not in our bin - end = true; break; } // point to the next pubkey > key - first_items[min_index] = *next_key; - indexes[min_index] = index; + next = Some(ItemLocation { + key: next_key, + pointer: SlotGroupPointer { + slot_group_index: item_loc.pointer.slot_group_index, + offset: index, + }, + }); break; } - if end { - // stop looking in this vector - we exhausted it - first_items.remove(min_index); - first_item_to_pubkey_division.remove(min_index); - indexes.remove(min_index); - } // this is the previous first item that was requested - &division_data[index - 1] + (&division_data[index - 1], next) } /// `hash_data` must be sorted by `binner.bin_from_pubkey()` @@ -944,6 +948,144 @@ impl<'a> AccountsHasher<'a> { result } + /// Return the working_set and max number of pubkeys for hash dedup. + /// `working_set` holds SlotGroupPointer {slot_group_index, offset} for items in account's pubkey descending order. + fn initialize_dedup_working_set( + sorted_data_by_pubkey: &[&[CalculateHashIntermediate]], + pubkey_bin: usize, + bins: usize, + binner: &PubkeyBinCalculator24, + stats: &HashStats, + ) -> ( + Vec, /* working_set */ + usize, /* max_inclusive_num_pubkeys */ + ) { + // working_set holds the lowest items for each slot_group sorted by pubkey descending (min_key is the last) + let mut working_set: Vec = Vec::default(); + + // Initialize 'working_set', which holds the current lowest item in each slot group. + // `working_set` should be initialized in reverse order of slot_groups. Later slot_groups are + // processed first. For each slot_group, if the lowest item for current slot group is + // already in working_set (i.e. inserted by a later slot group), the next lowest item + // in this slot group is searched and checked, until either one that is `not` in the + // working_set is found, which will then be inserted, or no next lowest item is found. + // Iterating in reverse order of slot_group will guarantee that each slot group will be + // scanned only once and scanned continuously. Therefore, it can achieve better data + // locality during the scan. + let max_inclusive_num_pubkeys = sorted_data_by_pubkey + .iter() + .enumerate() + .rev() + .map(|(i, hash_data)| { + let first_pubkey_in_bin = + Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, binner, stats); + + if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { + let mut next = Some(ItemLocation { + key: &hash_data[first_pubkey_in_bin].pubkey, + pointer: SlotGroupPointer { + slot_group_index: i, + offset: first_pubkey_in_bin, + }, + }); + + Self::add_next_item( + &mut next, + &mut working_set, + sorted_data_by_pubkey, + pubkey_bin, + binner, + ); + + let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; + while first_pubkey_in_next_bin < hash_data.len() { + if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) + != pubkey_bin + { + break; + } + first_pubkey_in_next_bin += 1; + } + first_pubkey_in_next_bin - first_pubkey_in_bin + } else { + 0 + } + }) + .sum::(); + + (working_set, max_inclusive_num_pubkeys) + } + + /// Add next item into hash dedup working set + fn add_next_item<'b>( + next: &mut Option>, + working_set: &mut Vec, + sorted_data_by_pubkey: &[&'b [CalculateHashIntermediate]], + pubkey_bin: usize, + binner: &PubkeyBinCalculator24, + ) { + // looping to add next item to working set + while let Some(ItemLocation { key, pointer }) = std::mem::take(next) { + // if `new key` is less than the min key in the working set, skip binary search and + // insert item to the end vec directly + if let Some(SlotGroupPointer { + slot_group_index: current_min_slot_group_index, + offset: current_min_offset, + }) = working_set.last() + { + let current_min_key = &sorted_data_by_pubkey[*current_min_slot_group_index] + [*current_min_offset] + .pubkey; + if key < current_min_key { + working_set.push(pointer); + break; + } + } + + let found = working_set.binary_search_by(|pointer| { + let prob = &sorted_data_by_pubkey[pointer.slot_group_index][pointer.offset].pubkey; + (*key).cmp(prob) + }); + + match found { + Err(index) => { + // found a new new key, insert into the working_set. This is O(n/2) on + // average. Theoretically, this operation could be expensive and may be further + // optimized in future. + working_set.insert(index, pointer); + break; + } + Ok(index) => { + let found = &mut working_set[index]; + if found.slot_group_index > pointer.slot_group_index { + // There is already a later slot group that contains this key in the working_set, + // look up again. + let (_item, new_next) = Self::get_item( + sorted_data_by_pubkey, + pubkey_bin, + binner, + &ItemLocation { key, pointer }, + ); + *next = new_next; + } else { + // A previous slot contains this key, replace it, and look for next item in the previous slot group. + let (_item, new_next) = Self::get_item( + sorted_data_by_pubkey, + pubkey_bin, + binner, + &ItemLocation { + key, + pointer: *found, + }, + ); + *found = pointer; + *next = new_next; + } + } + } + } + } + // go through: [..][pubkey_bin][..] and return hashes and lamport sum // slot groups^ ^accounts found in a slot group, sorted by pubkey, higher slot, write_version // 1. handle zero lamport accounts @@ -960,40 +1102,15 @@ impl<'a> AccountsHasher<'a> { ) -> (AccountHashesFile, u64) { let binner = PubkeyBinCalculator24::new(bins); - let len = sorted_data_by_pubkey.len(); - let mut indexes = Vec::with_capacity(len); - let mut first_items = Vec::with_capacity(len); - // map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[] - // this will change as items in sorted_data_by_pubkey[] are exhausted - let mut first_item_to_pubkey_division = Vec::with_capacity(len); + // working_set hold the lowest items for each slot_group sorted by pubkey descending (min_key is the last) + let (mut working_set, max_inclusive_num_pubkeys) = Self::initialize_dedup_working_set( + sorted_data_by_pubkey, + pubkey_bin, + bins, + &binner, + stats, + ); - // initialize 'first_items', which holds the current lowest item in each slot group - let max_inclusive_num_pubkeys = sorted_data_by_pubkey - .iter() - .enumerate() - .map(|(i, hash_data)| { - let first_pubkey_in_bin = - Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats); - if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { - let k = hash_data[first_pubkey_in_bin].pubkey; - first_items.push(k); - first_item_to_pubkey_division.push(i); - indexes.push(first_pubkey_in_bin); - let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; - while first_pubkey_in_next_bin < hash_data.len() { - if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) - != pubkey_bin - { - break; - } - first_pubkey_in_next_bin += 1; - } - first_pubkey_in_next_bin - first_pubkey_in_bin - } else { - 0 - } - }) - .sum::(); let mut hashes = AccountHashesFile { writer: None, dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), @@ -1001,47 +1118,17 @@ impl<'a> AccountsHasher<'a> { }; let mut overall_sum = 0; - let mut duplicate_pubkey_indexes = Vec::with_capacity(len); let filler_accounts_enabled = self.filler_accounts_enabled(); - // this loop runs once per unique pubkey contained in any slot group - while !first_items.is_empty() { - let loop_stop = { first_items.len() - 1 }; // we increment at the beginning of the loop - let mut min_index = 0; - let mut min_pubkey = first_items[min_index]; - let mut first_item_index = 0; // we will start iterating at item 1. +=1 is first instruction in loop + while let Some(pointer) = working_set.pop() { + let key = &sorted_data_by_pubkey[pointer.slot_group_index][pointer.offset].pubkey; - // this loop iterates over each slot group to find the minimum pubkey at the maximum slot - // it also identifies duplicate pubkey entries at lower slots and remembers those to skip them after - while first_item_index < loop_stop { - first_item_index += 1; - let key = &first_items[first_item_index]; - let cmp = min_pubkey.cmp(key); - match cmp { - std::cmp::Ordering::Less => { - continue; // we still have the min item - } - std::cmp::Ordering::Equal => { - // we found the same pubkey in a later slot, so remember the lower slot as a duplicate - duplicate_pubkey_indexes.push(min_index); - } - std::cmp::Ordering::Greater => { - // this is the new min pubkey - min_pubkey = *key; - } - } - // this is the new index of the min entry - min_index = first_item_index; - } // get the min item, add lamports, get hash - let item = Self::get_item( - min_index, - pubkey_bin, - &mut first_items, + let (item, mut next) = Self::get_item( sorted_data_by_pubkey, - &mut indexes, - &mut first_item_to_pubkey_division, + pubkey_bin, &binner, + &ItemLocation { key, pointer }, ); // add lamports and get hash @@ -1064,23 +1151,13 @@ impl<'a> AccountsHasher<'a> { } } - if !duplicate_pubkey_indexes.is_empty() { - // skip past duplicate keys in earlier slots - // reverse this list because get_item can remove first_items[*i] when *i is exhausted - // and that would mess up subsequent *i values - duplicate_pubkey_indexes.iter().rev().for_each(|i| { - Self::get_item( - *i, - pubkey_bin, - &mut first_items, - sorted_data_by_pubkey, - &mut indexes, - &mut first_item_to_pubkey_division, - &binner, - ); - }); - duplicate_pubkey_indexes.clear(); - } + Self::add_next_item( + &mut next, + &mut working_set, + sorted_data_by_pubkey, + pubkey_bin, + &binner, + ); } (hashes, overall_sum)