diff --git a/accounts-db/src/accounts_hash.rs b/accounts-db/src/accounts_hash.rs index 74d1cfa8a1..f5c0a78ffa 100644 --- a/accounts-db/src/accounts_hash.rs +++ b/accounts-db/src/accounts_hash.rs @@ -435,6 +435,24 @@ pub struct AccountsHasher<'a> { pub(crate) active_stats: &'a ActiveStats, } +/// Pointer to a specific item in chunked accounts hash slices. +#[derive(Debug, Clone, Copy)] +struct SlotGroupPointer { + /// slot group index + slot_group_index: usize, + /// offset within a slot group + offset: usize, +} + +/// A struct for the location of an account hash item inside chunked accounts hash slices. +#[derive(Debug)] +struct ItemLocation<'a> { + /// account's pubkey + key: &'a Pubkey, + /// pointer to the item in slot group slices + pointer: SlotGroupPointer, +} + impl<'a> AccountsHasher<'a> { /// true if it is possible that there are filler accounts present pub fn filler_accounts_enabled(&self) -> bool { @@ -813,59 +831,45 @@ impl<'a> AccountsHasher<'a> { (hashes, lamports_total) } - /// returns the item referenced by `min_index` - /// updates `indexes` to skip over the pubkey and its duplicates - /// updates `first_items` to point to the next pubkey - /// or removes the entire pubkey division entries (for `min_index`) if the referenced pubkey is the last entry in the same `bin` - /// removed from: `first_items`, `indexes`, and `first_item_pubkey_division` + /// Given the item location, return the item in the `CalculatedHashIntermediate` slices and the next item location in the same bin. + /// If the end of the `CalculatedHashIntermediate` slice is reached or all the accounts in current bin have been exhausted, return `None` for next item location. fn get_item<'b>( - min_index: usize, - bin: usize, - first_items: &mut Vec, sorted_data_by_pubkey: &[&'b [CalculateHashIntermediate]], - indexes: &mut Vec, - first_item_to_pubkey_division: &mut Vec, + bin: usize, binner: &PubkeyBinCalculator24, - ) -> &'b CalculateHashIntermediate { - let first_item = first_items[min_index]; - let key = &first_item; - let division_index = first_item_to_pubkey_division[min_index]; - let division_data = &sorted_data_by_pubkey[division_index]; - let mut index = indexes[min_index]; + item_loc: &ItemLocation<'b>, + ) -> (&'b CalculateHashIntermediate, Option>) { + let division_data = &sorted_data_by_pubkey[item_loc.pointer.slot_group_index]; + let mut index = item_loc.pointer.offset; index += 1; - let mut end; - loop { - end = index >= division_data.len(); - if end { - break; - } + let mut next = None; + + while index < division_data.len() { // still more items where we found the previous key, so just increment the index for that slot group, skipping all pubkeys that are equal let next_key = &division_data[index].pubkey; - if next_key == key { + if next_key == item_loc.key { index += 1; continue; // duplicate entries of same pubkey, so keep skipping } if binner.bin_from_pubkey(next_key) > bin { // the next pubkey is not in our bin - end = true; break; } // point to the next pubkey > key - first_items[min_index] = *next_key; - indexes[min_index] = index; + next = Some(ItemLocation { + key: next_key, + pointer: SlotGroupPointer { + slot_group_index: item_loc.pointer.slot_group_index, + offset: index, + }, + }); break; } - if end { - // stop looking in this vector - we exhausted it - first_items.remove(min_index); - first_item_to_pubkey_division.remove(min_index); - indexes.remove(min_index); - } // this is the previous first item that was requested - &division_data[index - 1] + (&division_data[index - 1], next) } /// `hash_data` must be sorted by `binner.bin_from_pubkey()` @@ -944,6 +948,144 @@ impl<'a> AccountsHasher<'a> { result } + /// Return the working_set and max number of pubkeys for hash dedup. + /// `working_set` holds SlotGroupPointer {slot_group_index, offset} for items in account's pubkey descending order. + fn initialize_dedup_working_set( + sorted_data_by_pubkey: &[&[CalculateHashIntermediate]], + pubkey_bin: usize, + bins: usize, + binner: &PubkeyBinCalculator24, + stats: &HashStats, + ) -> ( + Vec, /* working_set */ + usize, /* max_inclusive_num_pubkeys */ + ) { + // working_set holds the lowest items for each slot_group sorted by pubkey descending (min_key is the last) + let mut working_set: Vec = Vec::default(); + + // Initialize 'working_set', which holds the current lowest item in each slot group. + // `working_set` should be initialized in reverse order of slot_groups. Later slot_groups are + // processed first. For each slot_group, if the lowest item for current slot group is + // already in working_set (i.e. inserted by a later slot group), the next lowest item + // in this slot group is searched and checked, until either one that is `not` in the + // working_set is found, which will then be inserted, or no next lowest item is found. + // Iterating in reverse order of slot_group will guarantee that each slot group will be + // scanned only once and scanned continuously. Therefore, it can achieve better data + // locality during the scan. + let max_inclusive_num_pubkeys = sorted_data_by_pubkey + .iter() + .enumerate() + .rev() + .map(|(i, hash_data)| { + let first_pubkey_in_bin = + Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, binner, stats); + + if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { + let mut next = Some(ItemLocation { + key: &hash_data[first_pubkey_in_bin].pubkey, + pointer: SlotGroupPointer { + slot_group_index: i, + offset: first_pubkey_in_bin, + }, + }); + + Self::add_next_item( + &mut next, + &mut working_set, + sorted_data_by_pubkey, + pubkey_bin, + binner, + ); + + let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; + while first_pubkey_in_next_bin < hash_data.len() { + if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) + != pubkey_bin + { + break; + } + first_pubkey_in_next_bin += 1; + } + first_pubkey_in_next_bin - first_pubkey_in_bin + } else { + 0 + } + }) + .sum::(); + + (working_set, max_inclusive_num_pubkeys) + } + + /// Add next item into hash dedup working set + fn add_next_item<'b>( + next: &mut Option>, + working_set: &mut Vec, + sorted_data_by_pubkey: &[&'b [CalculateHashIntermediate]], + pubkey_bin: usize, + binner: &PubkeyBinCalculator24, + ) { + // looping to add next item to working set + while let Some(ItemLocation { key, pointer }) = std::mem::take(next) { + // if `new key` is less than the min key in the working set, skip binary search and + // insert item to the end vec directly + if let Some(SlotGroupPointer { + slot_group_index: current_min_slot_group_index, + offset: current_min_offset, + }) = working_set.last() + { + let current_min_key = &sorted_data_by_pubkey[*current_min_slot_group_index] + [*current_min_offset] + .pubkey; + if key < current_min_key { + working_set.push(pointer); + break; + } + } + + let found = working_set.binary_search_by(|pointer| { + let prob = &sorted_data_by_pubkey[pointer.slot_group_index][pointer.offset].pubkey; + (*key).cmp(prob) + }); + + match found { + Err(index) => { + // found a new new key, insert into the working_set. This is O(n/2) on + // average. Theoretically, this operation could be expensive and may be further + // optimized in future. + working_set.insert(index, pointer); + break; + } + Ok(index) => { + let found = &mut working_set[index]; + if found.slot_group_index > pointer.slot_group_index { + // There is already a later slot group that contains this key in the working_set, + // look up again. + let (_item, new_next) = Self::get_item( + sorted_data_by_pubkey, + pubkey_bin, + binner, + &ItemLocation { key, pointer }, + ); + *next = new_next; + } else { + // A previous slot contains this key, replace it, and look for next item in the previous slot group. + let (_item, new_next) = Self::get_item( + sorted_data_by_pubkey, + pubkey_bin, + binner, + &ItemLocation { + key, + pointer: *found, + }, + ); + *found = pointer; + *next = new_next; + } + } + } + } + } + // go through: [..][pubkey_bin][..] and return hashes and lamport sum // slot groups^ ^accounts found in a slot group, sorted by pubkey, higher slot, write_version // 1. handle zero lamport accounts @@ -960,40 +1102,15 @@ impl<'a> AccountsHasher<'a> { ) -> (AccountHashesFile, u64) { let binner = PubkeyBinCalculator24::new(bins); - let len = sorted_data_by_pubkey.len(); - let mut indexes = Vec::with_capacity(len); - let mut first_items = Vec::with_capacity(len); - // map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[] - // this will change as items in sorted_data_by_pubkey[] are exhausted - let mut first_item_to_pubkey_division = Vec::with_capacity(len); + // working_set hold the lowest items for each slot_group sorted by pubkey descending (min_key is the last) + let (mut working_set, max_inclusive_num_pubkeys) = Self::initialize_dedup_working_set( + sorted_data_by_pubkey, + pubkey_bin, + bins, + &binner, + stats, + ); - // initialize 'first_items', which holds the current lowest item in each slot group - let max_inclusive_num_pubkeys = sorted_data_by_pubkey - .iter() - .enumerate() - .map(|(i, hash_data)| { - let first_pubkey_in_bin = - Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats); - if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { - let k = hash_data[first_pubkey_in_bin].pubkey; - first_items.push(k); - first_item_to_pubkey_division.push(i); - indexes.push(first_pubkey_in_bin); - let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; - while first_pubkey_in_next_bin < hash_data.len() { - if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) - != pubkey_bin - { - break; - } - first_pubkey_in_next_bin += 1; - } - first_pubkey_in_next_bin - first_pubkey_in_bin - } else { - 0 - } - }) - .sum::(); let mut hashes = AccountHashesFile { writer: None, dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), @@ -1001,47 +1118,17 @@ impl<'a> AccountsHasher<'a> { }; let mut overall_sum = 0; - let mut duplicate_pubkey_indexes = Vec::with_capacity(len); let filler_accounts_enabled = self.filler_accounts_enabled(); - // this loop runs once per unique pubkey contained in any slot group - while !first_items.is_empty() { - let loop_stop = { first_items.len() - 1 }; // we increment at the beginning of the loop - let mut min_index = 0; - let mut min_pubkey = first_items[min_index]; - let mut first_item_index = 0; // we will start iterating at item 1. +=1 is first instruction in loop + while let Some(pointer) = working_set.pop() { + let key = &sorted_data_by_pubkey[pointer.slot_group_index][pointer.offset].pubkey; - // this loop iterates over each slot group to find the minimum pubkey at the maximum slot - // it also identifies duplicate pubkey entries at lower slots and remembers those to skip them after - while first_item_index < loop_stop { - first_item_index += 1; - let key = &first_items[first_item_index]; - let cmp = min_pubkey.cmp(key); - match cmp { - std::cmp::Ordering::Less => { - continue; // we still have the min item - } - std::cmp::Ordering::Equal => { - // we found the same pubkey in a later slot, so remember the lower slot as a duplicate - duplicate_pubkey_indexes.push(min_index); - } - std::cmp::Ordering::Greater => { - // this is the new min pubkey - min_pubkey = *key; - } - } - // this is the new index of the min entry - min_index = first_item_index; - } // get the min item, add lamports, get hash - let item = Self::get_item( - min_index, - pubkey_bin, - &mut first_items, + let (item, mut next) = Self::get_item( sorted_data_by_pubkey, - &mut indexes, - &mut first_item_to_pubkey_division, + pubkey_bin, &binner, + &ItemLocation { key, pointer }, ); // add lamports and get hash @@ -1064,23 +1151,13 @@ impl<'a> AccountsHasher<'a> { } } - if !duplicate_pubkey_indexes.is_empty() { - // skip past duplicate keys in earlier slots - // reverse this list because get_item can remove first_items[*i] when *i is exhausted - // and that would mess up subsequent *i values - duplicate_pubkey_indexes.iter().rev().for_each(|i| { - Self::get_item( - *i, - pubkey_bin, - &mut first_items, - sorted_data_by_pubkey, - &mut indexes, - &mut first_item_to_pubkey_division, - &binner, - ); - }); - duplicate_pubkey_indexes.clear(); - } + Self::add_next_item( + &mut next, + &mut working_set, + sorted_data_by_pubkey, + pubkey_bin, + &binner, + ); } (hashes, overall_sum)