From bf97627021331913ca7c26c5e1465c94799becd8 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Mon, 21 Jun 2021 15:32:03 -0500 Subject: [PATCH] eliminate flatten and sort in hash calculation (#17802) * eliminate flatten and sort in hash calculation * reduce critical section time * remove now no-longer necessary test code * conflict with reset bins to 0 pr --- runtime/src/accounts_db.rs | 25 +- runtime/src/accounts_hash.rs | 755 +++++++++++++---------------------- 2 files changed, 295 insertions(+), 485 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 03b4b5b33a..8a8768b381 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -4613,7 +4613,7 @@ impl AccountsDb { } /// Scan through all the account storage in parallel - fn scan_account_storage_no_bank( + fn scan_account_storage_no_bank( accounts_cache_and_ancestors: Option<( &AccountsCache, &Ancestors, @@ -4621,10 +4621,13 @@ impl AccountsDb { )>, snapshot_storages: &SortedStorages, scan_func: F, - ) -> Vec + after_func: F2, + ) -> Vec where F: Fn(LoadedAccount, &mut B, Slot) + Send + Sync, + F2: Fn(B) -> C + Send + Sync, B: Send + Default, + C: Send + Default, { // Without chunks, we end up with 1 output vec for each outer snapshot storage. // This results in too many vectors to be efficient. @@ -4669,7 +4672,7 @@ impl AccountsDb { } } } - retval + after_func(retval) }) .collect() } @@ -4847,6 +4850,7 @@ impl AccountsDb { } accum[pubkey_to_bin_index].push(source_item); }, + Self::sort_slot_storage_scan, ); if check_hash && mismatch_found.load(Ordering::Relaxed) > 0 { @@ -4863,6 +4867,19 @@ impl AccountsDb { Ok(result) } + fn sort_slot_storage_scan( + accum: Vec>, + ) -> Vec> { + accum + .into_par_iter() + .map(|mut items| { + // sort_by vs unstable because slot and write_version are already in order + items.sort_by(AccountsHash::compare_two_hash_entries); + items + }) + .collect() + } + // modeled after get_accounts_delta_hash // intended to be faster than calculate_accounts_hash pub fn calculate_accounts_hash_without_index( @@ -4915,6 +4932,7 @@ impl AccountsDb { &mut stats, pass == num_scan_passes - 1, previous_pass, + bins_per_pass, ); previous_pass = for_next_pass; final_result = (hash, lamports); @@ -6643,6 +6661,7 @@ pub mod tests { assert_eq!(slot_expected, slot); accum.push(expected); }, + |a| a, ); assert_eq!(calls.load(Ordering::Relaxed), 1); assert_eq!(result, vec![vec![expected]]); diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index 6aad8ea9cf..b0014d3b0c 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -5,7 +5,11 @@ use solana_sdk::{ hash::{Hash, Hasher}, pubkey::Pubkey, }; -use std::{convert::TryInto, sync::Mutex}; +use std::{ + convert::TryInto, + sync::atomic::{AtomicUsize, Ordering}, + sync::Mutex, +}; pub const ZERO_RAW_LAMPORTS_SENTINEL: u64 = std::u64::MAX; pub const MERKLE_FANOUT: usize = 16; @@ -488,64 +492,6 @@ impl AccountsHash { hashes.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); } - fn flatten_hash_intermediate( - mut data_sections_by_pubkey: Vec>>, - stats: &mut HashStats, - ) -> Vec> - where - T: Clone, - { - // flatten this: - // vec: just a level of hierarchy - // vec: 1 vec per PUBKEY_BINS_FOR_CALCULATING_HASHES - // vec: Intermediate data whose pubkey belongs in this division - // into this: - // vec: 1 vec per PUBKEY_BINS_FOR_CALCULATING_HASHES - // vec: Intermediate data whose pubkey belongs in this division - let mut flatten_time = Measure::start("flatten"); - let mut data_by_pubkey: Vec> = vec![]; - let mut raw_len = 0; - let mut lens = vec![]; - // pass=0: calculate final lens, then allocate vecs with capacity - // pass=1: copy data into vecs with correct capacity - for pass in 0..2 { - for outer in &mut data_sections_by_pubkey { - let outer_len = outer.len(); - for pubkey_index in 0..outer_len { - let this_len = outer[pubkey_index].len(); - if this_len == 0 { - continue; - } - if pass == 0 { - raw_len += this_len; - if lens.len() <= pubkey_index { - lens.extend(vec![0; pubkey_index - lens.len() + 1]); - } - - lens[pubkey_index] += outer[pubkey_index].len(); - } else { - let mut data = vec![]; - std::mem::swap(&mut data, &mut outer[pubkey_index]); - - data_by_pubkey[pubkey_index].extend(data); - } - } - } - - if pass == 0 { - data_by_pubkey = lens - .iter() - .map(|len| Vec::with_capacity(*len)) - .collect::>(); - lens = vec![]; // we don't need this anymore - } - } - flatten_time.stop(); - stats.flatten_time_total_us += flatten_time.as_us(); - stats.unreduced_entries += raw_len; - data_by_pubkey - } - pub fn compare_two_hash_entries( a: &CalculateHashIntermediate, b: &CalculateHashIntermediate, @@ -554,25 +500,6 @@ impl AccountsHash { a.pubkey.partial_cmp(&b.pubkey).unwrap() } - fn sort_hash_intermediate( - data_by_pubkey: Vec>, - stats: &mut HashStats, - ) -> Vec> { - // sort each PUBKEY_DIVISION vec - let mut sort_time = Measure::start("sort"); - let sorted_data_by_pubkey: Vec> = data_by_pubkey - .into_par_iter() - .map(|mut pk_range| { - // has to be a stable sort because items are in slot order already - pk_range.sort_by(Self::compare_two_hash_entries); - pk_range - }) - .collect(); - sort_time.stop(); - stats.sort_time_total_us += sort_time.as_us(); - sorted_data_by_pubkey - } - pub fn checked_cast_for_capitalization(balance: u128) -> u64 { balance .try_into() @@ -580,162 +507,186 @@ impl AccountsHash { } fn de_dup_and_eliminate_zeros( - sorted_data_by_pubkey: Vec>, + sorted_data_by_pubkey: Vec>>, stats: &mut HashStats, - ) -> (Vec>>, u64) { + max_bin: usize, + ) -> (Vec>, u64) { // 1. eliminate zero lamport accounts // 2. pick the highest slot or (slot = and highest version) of each pubkey // 3. produce this output: - // vec: PUBKEY_BINS_FOR_CALCULATING_HASHES in pubkey order - // vec: sorted sections from parallelism, in pubkey order - // vec: individual hashes in pubkey order + // a. vec: PUBKEY_BINS_FOR_CALCULATING_HASHES in pubkey order + // vec: individual hashes in pubkey order, 1 hash per + // b. lamports let mut zeros = Measure::start("eliminate zeros"); let overall_sum = Mutex::new(0u64); - const CHUNKS: usize = 10; - let hashes: Vec>> = sorted_data_by_pubkey + let unreduced_entries = AtomicUsize::new(0); + let hashes: Vec> = (0..max_bin) .into_par_iter() - .map(|pubkey_division| { - let (hashes, sum) = Self::de_dup_accounts_in_parallel(&pubkey_division, CHUNKS); - let mut overall = overall_sum.lock().unwrap(); - *overall = Self::checked_cast_for_capitalization(sum as u128 + *overall as u128); + .map(|bin| { + let (hashes, sum, unreduced_entries_count) = + Self::de_dup_accounts_in_parallel(&sorted_data_by_pubkey, bin); + { + let mut overall = overall_sum.lock().unwrap(); + *overall = + Self::checked_cast_for_capitalization(sum as u128 + *overall as u128); + } + unreduced_entries.fetch_add(unreduced_entries_count, Ordering::Relaxed); hashes }) .collect(); zeros.stop(); stats.zeros_time_total_us += zeros.as_us(); let sum = *overall_sum.lock().unwrap(); + stats.unreduced_entries += unreduced_entries.load(Ordering::Relaxed); (hashes, sum) } + // returns true if this vector was exhausted + fn get_item<'a, 'b>( + min_index: usize, + bin: usize, + first_items: &'a mut Vec<(Pubkey, usize)>, + pubkey_division: &'b [Vec>], + indexes: &'a mut Vec, + ) -> (bool, &'b CalculateHashIntermediate) { + let first_item = first_items[min_index]; + let key = &first_item.0; + let division_index = first_item.1; + let bin = &pubkey_division[division_index][bin]; + let mut index = indexes[division_index]; + index += 1; + while index < bin.len() { + // still more items where we found the previous key, so just increment the index for that slot group, skipping all pubkeys that are equal + if &bin[index].pubkey == key { + index += 1; + continue; // duplicate entries of same pubkey, so keep skipping + } + + // point to the next pubkey > key + first_items[min_index] = (bin[index].pubkey, division_index); + indexes[division_index] = index; + break; + } + + ( + if index >= bin.len() { + first_items.remove(min_index); // stop looking in this vector - we exhausted it + true + } else { + false + }, // this is the last item with this pubkey + &bin[index - 1], + ) + } + + // go through: [..][pubkey_bin][..] and return hashes and lamport sum + // slot groups^ ^accounts found in a slot group, sorted by pubkey, higher slot, write_version // 1. eliminate zero lamport accounts // 2. pick the highest slot or (slot = and highest version) of each pubkey // 3. produce this output: - // vec: sorted sections from parallelism, in pubkey order - // vec: individual hashes in pubkey order + // a. vec: individual hashes in pubkey order + // b. lamport sum + // c. unreduced count (ie. including duplicates and zero lamport) fn de_dup_accounts_in_parallel( - pubkey_division: &[CalculateHashIntermediate], - chunk_count: usize, - ) -> (Vec>, u64) { + pubkey_division: &[Vec>], + pubkey_bin: usize, + ) -> (Vec, u64, usize) { let len = pubkey_division.len(); - let max = if len > chunk_count { - std::cmp::max(chunk_count, 1) - } else { - 1 - }; - let chunk_size = len / max; - let overall_sum = Mutex::new(0u64); - let hashes: Vec> = (0..max) - .into_par_iter() - .map(|chunk_index| { - let mut start_index = chunk_index * chunk_size; - let mut end_index = start_index + chunk_size; - let last = chunk_index == max - 1; - if last { - end_index = len; - } + let mut item_len = 0; + let mut indexes = vec![0; len]; + let mut first_items = Vec::with_capacity(len); - let is_first_slice = chunk_index == 0; - if !is_first_slice { - // note that this causes all regions after region 0 to have 1 item that overlaps with the previous region - start_index -= 1; + pubkey_division.iter().enumerate().for_each(|(i, bins)| { + if bins.len() > pubkey_bin { + let sub = &bins[pubkey_bin]; + if !sub.is_empty() { + item_len += bins[pubkey_bin].len(); + first_items.push((bins[pubkey_bin][0].pubkey, i)); } + } + }); + let mut overall_sum = 0; + let mut hashes: Vec = Vec::with_capacity(item_len); - let (result, sum) = Self::de_dup_accounts_from_stores( - last, - &pubkey_division[start_index..end_index], + while !first_items.is_empty() { + let mut loop_stop = { first_items.len() - 1 }; // we increment at the beginning of the loop + let mut min_index = 0; + let mut min_pubkey = first_items[min_index].0; + let mut first_item_index = 0; // we will start iterating at item 1. +=1 is first instruction in loop + + while first_item_index < loop_stop { + first_item_index += 1; + let (key, _) = first_items[first_item_index]; + let cmp = min_pubkey.cmp(&key); + match cmp { + std::cmp::Ordering::Less => { + continue; // we still have the min item + } + std::cmp::Ordering::Equal => { + // we found an item that masks an earlier slot, so skip the earlier item + let (exhausted, _) = Self::get_item( + min_index, + pubkey_bin, + &mut first_items, + pubkey_division, + &mut indexes, + ); + if exhausted { + // this whole vector is exhausted, so we have to back our indices up since our search set has been reduced out from under us + first_item_index -= 1; + loop_stop -= 1; + } + } + std::cmp::Ordering::Greater => (), + } + // this is the new min pubkey + min_index = first_item_index; + min_pubkey = key; + } + + // get the min item, add lamports, get hash + let (_, item) = Self::get_item( + min_index, + pubkey_bin, + &mut first_items, + pubkey_division, + &mut indexes, + ); + if item.lamports != ZERO_RAW_LAMPORTS_SENTINEL { + overall_sum = Self::checked_cast_for_capitalization( + item.lamports as u128 + overall_sum as u128, ); - let mut overall = overall_sum.lock().unwrap(); - *overall = Self::checked_cast_for_capitalization(sum + *overall as u128); - - result - }) - .collect(); - - let sum = *overall_sum.lock().unwrap(); - (hashes, sum) - } - - fn de_dup_accounts_from_stores( - is_last_slice: bool, - slice: &[CalculateHashIntermediate], - ) -> (Vec, u128) { - let len = slice.len(); - let mut result: Vec = Vec::with_capacity(len); - - let mut sum: u128 = 0; - if len > 0 { - let mut i = 0; - let mut insert_item = false; - // look_for_first_key means the first key we find in our slice may be a - // continuation of accounts belonging to a key that started in the last slice. - // so, look_for_first_key=true means we have to find the first key different than - // the first key we encounter in our slice. Note that if this is true, - // our slice begins one index prior to the 'actual' start of our logical range. - 'outer: loop { - // at start of loop, item at 'i' is the first entry for a given pubkey - unless look_for_first - let mut now = &slice[i]; - let mut last = now.pubkey; - if insert_item { - if now.lamports != ZERO_RAW_LAMPORTS_SENTINEL { - // first entry for this key that starts in our slice - result.push(now.hash); - sum += now.lamports as u128; - } - if i + 1 == len { - break; - } - i += 1; - now = &slice[i]; - last = now.pubkey; - } - for (k, now) in slice.iter().enumerate().skip(i + 1) { - if now.pubkey != last { - i = k - 1; - insert_item = true; - continue 'outer; - } - } - - if is_last_slice { - insert_item = true; - i = len - 1; - continue 'outer; - } - - break; // ran out of items in our slice, so our slice is done + hashes.push(item.hash); } } - (result, sum) + (hashes, overall_sum, item_len) } // input: - // vec: unordered, created by parallelism - // vec: [0..bins] - where bins are pubkey ranges - // vec: [..] - items which fit in the containing bin, unordered within this vec - // so, assumption is middle vec is bins sorted by pubkey + // vec: group of slot data, ordered by Slot (low to high) + // vec: [0..bins] - where bins are pubkey ranges (these are ordered by Pubkey range) + // vec: [..] - items which fit in the containing bin. Sorted by: Pubkey, higher Slot, higher Write version (if pubkey =) pub fn rest_of_hash_calculation( data_sections_by_pubkey: Vec>>, mut stats: &mut HashStats, is_last_pass: bool, mut previous_state: PreviousPass, + max_bin: usize, ) -> (Hash, u64, PreviousPass) { - let outer = Self::flatten_hash_intermediate(data_sections_by_pubkey, &mut stats); - - let sorted_data_by_pubkey = Self::sort_hash_intermediate(outer, &mut stats); - let (mut hashes, mut total_lamports) = - Self::de_dup_and_eliminate_zeros(sorted_data_by_pubkey, &mut stats); + Self::de_dup_and_eliminate_zeros(data_sections_by_pubkey, &mut stats, max_bin); total_lamports += previous_state.lamports; if !previous_state.remaining_unhashed.is_empty() { - // these items were not hashed last iteration because they didn't divide evenly - hashes.insert(0, vec![previous_state.remaining_unhashed]); + // These items were not hashed last iteration because they didn't divide evenly. + // These are hashes for pubkeys that are < the pubkeys we are looking at now, so their hashes go first in order. + hashes.insert(0, previous_state.remaining_unhashed); previous_state.remaining_unhashed = Vec::new(); } let mut next_pass = PreviousPass::default(); - let cumulative = CumulativeOffsets::from_raw_2d(&hashes); + let cumulative = CumulativeOffsets::from_raw(&hashes); let mut hash_total = cumulative.total_count; stats.hash_total += hash_total; next_pass.reduced_hashes = previous_state.reduced_hashes; @@ -785,11 +736,9 @@ impl AccountsHash { if no_progress { // we never made partial progress, so hash everything now hashes.into_iter().for_each(|v| { - v.into_iter().for_each(|v| { - if !v.is_empty() { - next_pass.reduced_hashes.push(v); - } - }); + if !v.is_empty() { + next_pass.reduced_hashes.push(v); + } }); } @@ -846,6 +795,12 @@ pub mod tests { assert_eq!(AccountsHash::div_ceil(10, 0), 0); } + fn for_rest( + original: Vec, + ) -> Vec>> { + vec![vec![original]] + } + #[test] fn test_accountsdb_rest_of_hash_calculation() { solana_logger::setup(); @@ -865,10 +820,11 @@ pub mod tests { account_maps.push(val); let result = AccountsHash::rest_of_hash_calculation( - vec![vec![account_maps.clone()]], + for_rest(account_maps.clone()), &mut HashStats::default(), true, PreviousPass::default(), + one_range(), ); let expected_hash = Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap(); assert_eq!((result.0, result.1), (expected_hash, 88)); @@ -877,13 +833,14 @@ pub mod tests { let key = Pubkey::new(&[10u8; 32]); let hash = Hash::new(&[2u8; 32]); let val = CalculateHashIntermediate::new_without_slot(hash, 20, key); - account_maps.push(val); + account_maps.insert(0, val); let result = AccountsHash::rest_of_hash_calculation( - vec![vec![account_maps.clone()]], + for_rest(account_maps.clone()), &mut HashStats::default(), true, PreviousPass::default(), + one_range(), ); let expected_hash = Hash::from_str("EHv9C5vX7xQjjMpsJMzudnDTzoTSRwYkqLzY8tVMihGj").unwrap(); assert_eq!((result.0, result.1), (expected_hash, 108)); @@ -892,18 +849,27 @@ pub mod tests { let key = Pubkey::new(&[10u8; 32]); let hash = Hash::new(&[99u8; 32]); let val = CalculateHashIntermediate::new_without_slot(hash, 30, key); - account_maps.push(val); + account_maps.insert(1, val); let result = AccountsHash::rest_of_hash_calculation( - vec![vec![account_maps]], + for_rest(account_maps), &mut HashStats::default(), true, PreviousPass::default(), + one_range(), ); let expected_hash = Hash::from_str("7NNPg5A8Xsg1uv4UFm6KZNwsipyyUnmgCrznP6MBWoBZ").unwrap(); assert_eq!((result.0, result.1), (expected_hash, 118)); } + fn one_range() -> usize { + 1 + } + + fn zero_range() -> usize { + 0 + } + #[test] fn test_accountsdb_multi_pass_rest_of_hash_calculation() { solana_logger::setup(); @@ -936,6 +902,7 @@ pub mod tests { &mut HashStats::default(), false, // not last pass previous_pass, + one_range(), ); assert_eq!(result.0, Hash::default()); assert_eq!(result.1, 0); @@ -946,10 +913,11 @@ pub mod tests { } let result = AccountsHash::rest_of_hash_calculation( - vec![vec![account_maps.clone()]], + for_rest(account_maps.clone()), &mut HashStats::default(), false, // not last pass previous_pass, + one_range(), ); assert_eq!(result.0, Hash::default()); @@ -967,6 +935,7 @@ pub mod tests { &mut HashStats::default(), false, previous_pass, + one_range(), ); previous_pass = result.2; @@ -980,6 +949,7 @@ pub mod tests { &mut HashStats::default(), true, // finally, last pass previous_pass, + one_range(), ); let previous_pass = result.2; @@ -1008,10 +978,11 @@ pub mod tests { account_maps.push(val); let result = AccountsHash::rest_of_hash_calculation( - vec![vec![vec![account_maps[0].clone()]]], + for_rest(vec![account_maps[0].clone()]), &mut HashStats::default(), false, // not last pass PreviousPass::default(), + one_range(), ); assert_eq!(result.0, Hash::default()); @@ -1022,10 +993,11 @@ pub mod tests { assert_eq!(previous_pass.lamports, account_maps[0].lamports); let result = AccountsHash::rest_of_hash_calculation( - vec![vec![vec![account_maps[1].clone()]]], + for_rest(vec![account_maps[1].clone()]), &mut HashStats::default(), false, // not last pass previous_pass, + one_range(), ); assert_eq!(result.0, Hash::default()); @@ -1044,6 +1016,7 @@ pub mod tests { &mut HashStats::default(), true, previous_pass, + one_range(), ); let previous_pass = result.2; @@ -1084,16 +1057,17 @@ pub mod tests { account_maps.push(val); } - let chunk = account_maps[0..plus1].to_vec(); - let mut sorted = chunk.clone(); - sorted.sort_by(AccountsHash::compare_two_hash_entries); + let mut chunk = account_maps[0..plus1].to_vec(); + chunk.sort_by(AccountsHash::compare_two_hash_entries); + let sorted = chunk.clone(); // first 4097 hashes (1 left over) let result = AccountsHash::rest_of_hash_calculation( - vec![vec![chunk]], + for_rest(chunk), &mut HashStats::default(), false, // not last pass PreviousPass::default(), + one_range(), ); assert_eq!(result.0, Hash::default()); @@ -1118,9 +1092,9 @@ pub mod tests { .sum::() ); - let chunk = account_maps[plus1..plus1 * 2].to_vec(); - let mut sorted2 = chunk.clone(); - sorted2.sort_by(AccountsHash::compare_two_hash_entries); + let mut chunk = account_maps[plus1..plus1 * 2].to_vec(); + chunk.sort_by(AccountsHash::compare_two_hash_entries); + let sorted2 = chunk.clone(); let mut with_left_over = vec![left_over_1]; with_left_over.extend(sorted2[0..plus1 - 2].to_vec().into_iter().map(|i| i.hash)); @@ -1134,10 +1108,11 @@ pub mod tests { // second 4097 hashes (2 left over) let result = AccountsHash::rest_of_hash_calculation( - vec![vec![chunk]], + for_rest(chunk), &mut HashStats::default(), false, // not last pass previous_pass, + one_range(), ); assert_eq!(result.0, Hash::default()); @@ -1165,6 +1140,7 @@ pub mod tests { &mut HashStats::default(), true, previous_pass, + one_range(), ); let previous_pass = result.2; @@ -1190,9 +1166,11 @@ pub mod tests { #[test] fn test_accountsdb_de_dup_accounts_zero_chunks() { - let (hashes, lamports) = - AccountsHash::de_dup_accounts_in_parallel(&[CalculateHashIntermediate::default()], 0); - assert_eq!(vec![vec![Hash::default()]], hashes); + let (hashes, lamports, _) = AccountsHash::de_dup_accounts_in_parallel( + &[vec![vec![CalculateHashIntermediate::default()]]], + 0, + ); + assert_eq!(vec![Hash::default()], hashes); assert_eq!(lamports, 0); } @@ -1203,31 +1181,29 @@ pub mod tests { let (hashes, lamports) = AccountsHash::de_dup_and_eliminate_zeros( vec![vec![], vec![]], &mut HashStats::default(), + one_range(), ); assert_eq!( - vec![vec![Hash::default(); 0], vec![]], + vec![Hash::default(); 0], hashes.into_iter().flatten().collect::>() ); assert_eq!(lamports, 0); - let (hashes, lamports) = - AccountsHash::de_dup_and_eliminate_zeros(vec![], &mut HashStats::default()); - let empty: Vec>> = Vec::default(); + let (hashes, lamports) = AccountsHash::de_dup_and_eliminate_zeros( + vec![], + &mut HashStats::default(), + zero_range(), + ); + let empty: Vec> = Vec::default(); assert_eq!(empty, hashes); assert_eq!(lamports, 0); - let (hashes, lamports) = AccountsHash::de_dup_accounts_in_parallel(&[], 1); - assert_eq!( - vec![Hash::default(); 0], - hashes.into_iter().flatten().collect::>() - ); + let (hashes, lamports, _) = AccountsHash::de_dup_accounts_in_parallel(&[], 1); + assert_eq!(vec![Hash::default(); 0], hashes); assert_eq!(lamports, 0); - let (hashes, lamports) = AccountsHash::de_dup_accounts_in_parallel(&[], 2); - assert_eq!( - vec![Hash::default(); 0], - hashes.into_iter().flatten().collect::>() - ); + let (hashes, lamports, _) = AccountsHash::de_dup_accounts_in_parallel(&[], 2); + assert_eq!(vec![Hash::default(); 0], hashes); assert_eq!(lamports, 0); } @@ -1239,16 +1215,12 @@ pub mod tests { let key_b = Pubkey::new(&[2u8; 32]); let key_c = Pubkey::new(&[3u8; 32]); const COUNT: usize = 6; - let hashes: Vec<_> = (0..COUNT) - .into_iter() - .map(|i| Hash::new(&[i as u8; 32])) - .collect(); + let hashes = (0..COUNT).into_iter().map(|i| Hash::new(&[i as u8; 32])); // create this vector // abbbcc let keys = [key_a, key_b, key_b, key_b, key_c, key_c]; let accounts: Vec<_> = hashes - .into_iter() .zip(keys.iter()) .enumerate() .map(|(i, (hash, key))| { @@ -1263,21 +1235,21 @@ pub mod tests { // result lamports // result hashes) // "a5" = key_a, 5 lamports - ("a1", false, 0, "[]"), - ("a1b2", false, 1, "[11111111111111111111111111111111]"), - ("a1b2b3", false, 1, "[11111111111111111111111111111111]"), - ("a1b2b3b4", false, 1, "[11111111111111111111111111111111]"), - ("a1b2b3b4c5", false, 5, "[11111111111111111111111111111111, CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), - ("b2", false, 0, "[]"), - ("b2b3", false, 0, "[]"), - ("b2b3b4", false, 0, "[]"), - ("b2b3b4c5", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), - ("b3", false, 0, "[]"), - ("b3b4", false, 0, "[]"), - ("b3b4c5", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), - ("b4", false, 0, "[]"), - ("b4c5", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), - ("c5", false, 0, "[]"), + ("a1", false, 1, "[11111111111111111111111111111111]"), + ("a1b2", false, 3, "[11111111111111111111111111111111, 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), + ("a1b2b3", false, 4, "[11111111111111111111111111111111, 8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), + ("a1b2b3b4", false, 5, "[11111111111111111111111111111111, CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("a1b2b3b4c5", false, 10, "[11111111111111111111111111111111, CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b2", false, 2, "[4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), + ("b2b3", false, 3, "[8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), + ("b2b3b4", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("b2b3b4c5", false, 9, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b3", false, 3, "[8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), + ("b3b4", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("b3b4c5", false, 9, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("b4", false, 4, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8]"), + ("b4c5", false, 9, "[CktRuQ2mttgRGkXJtyksdKHjUdc2C4TgDzyB98oEzy8, GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), + ("c5", false, 5, "[GgBaCs3NCBuZN12kCJgAW63ydqohFkHEdfdEXBPzLHq]"), ("a1", true, 1, "[11111111111111111111111111111111]"), ("a1b2", true, 3, "[11111111111111111111111111111111, 4vJ9JU1bJJE96FWSJKvHsmmFADCg4gpZQff4P3bkLKi]"), ("a1b2b3", true, 4, "[11111111111111111111111111111111, 8qbHbw2BbbTHBW1sbeqakYXVKRQM8Ne7pLK7m6CVfeR]"), @@ -1311,52 +1283,58 @@ pub mod tests { let accounts = accounts.clone(); let slice = &accounts[start..end]; - let result = AccountsHash::de_dup_accounts_from_stores(is_last_slice, slice); - let (hashes2, lamports2) = AccountsHash::de_dup_accounts_in_parallel(slice, 1); - let (hashes3, lamports3) = AccountsHash::de_dup_accounts_in_parallel(slice, 2); + let slice2 = vec![vec![slice.to_vec()]]; + let slice = &slice2[..]; + let (hashes2, lamports2, _) = + AccountsHash::de_dup_accounts_in_parallel(slice, 0); + let (hashes3, lamports3, _) = + AccountsHash::de_dup_accounts_in_parallel(slice, 0); let (hashes4, lamports4) = AccountsHash::de_dup_and_eliminate_zeros( - vec![slice.to_vec()], + slice.to_vec(), &mut HashStats::default(), + end - start, ); let (hashes5, lamports5) = AccountsHash::de_dup_and_eliminate_zeros( - vec![slice.to_vec(), slice.to_vec()], + slice.to_vec(), &mut HashStats::default(), + end - start, ); let (hashes6, lamports6) = AccountsHash::de_dup_and_eliminate_zeros( - vec![vec![], slice.to_vec()], + slice.to_vec(), &mut HashStats::default(), + end - start, ); - assert_eq!( - hashes2.iter().flatten().collect::>(), - hashes3.iter().flatten().collect::>() - ); - let expected2 = hashes2.clone().into_iter().flatten().collect::>(); + assert_eq!(hashes2, hashes3); + let expected2 = hashes2.clone(); assert_eq!( expected2, - hashes4 - .into_iter() - .flatten() - .into_iter() - .flatten() - .collect::>() + hashes4.into_iter().flatten().collect::>(), + "last_slice: {}, start: {}, end: {}, slice: {:?}", + last_slice, + start, + end, + slice ); assert_eq!( - vec![expected2.clone(), expected2.clone()], - hashes5.into_iter().flatten().collect::>() + expected2.clone(), + hashes5.iter().flatten().copied().collect::>(), + "last_slice: {}, start: {}, end: {}, slice: {:?}", + last_slice, + start, + end, + slice ); assert_eq!( - vec![vec![], expected2.clone()], - hashes6.into_iter().flatten().collect::>() + expected2.clone(), + hashes6.iter().flatten().copied().collect::>() ); assert_eq!(lamports2, lamports3); assert_eq!(lamports2, lamports4); - assert_eq!(lamports2 * 2, lamports5); + assert_eq!(lamports2, lamports5); assert_eq!(lamports2, lamports6); - let hashes: Vec<_> = hashes2.into_iter().flatten().collect(); - - let human_readable = slice + let human_readable = slice[0][0] .iter() .map(|v| { let mut s = (if v.pubkey == key_a { @@ -1373,77 +1351,22 @@ pub mod tests { }) .collect::(); - let hash_result_as_string = format!("{:?}", result.0); + let hash_result_as_string = format!("{:?}", hashes2); let packaged_result: ExpectedType = ( human_readable, is_last_slice, - result.1 as u64, + lamports2 as u64, hash_result_as_string, ); - - if is_last_slice { - // the parallel version always starts with 'first slice' - assert_eq!( - result.0, hashes, - "description: {:?}, expected index: {}", - packaged_result, expected_index - ); - assert_eq!( - result.1 as u64, lamports2, - "description: {:?}, expected index: {}", - packaged_result, expected_index - ); - } - assert_eq!(expected[expected_index], packaged_result); // for generating expected results - //error!("{:?},", packaged_result); + // error!("{:?},", packaged_result); expected_index += 1; } } } - - for first_slice in 0..2 { - let result = AccountsHash::de_dup_accounts_from_stores(first_slice == 1, &[]); - assert_eq!((vec![Hash::default(); 0], 0), result); - } - } - - #[test] - fn test_sort_hash_intermediate() { - solana_logger::setup(); - let mut stats = HashStats::default(); - let key = Pubkey::new_unique(); - let hash = Hash::new_unique(); - let val = CalculateHashIntermediate::new_without_slot(hash, 1, key); - - // slot same, version < - let hash2 = Hash::new_unique(); - let val2 = CalculateHashIntermediate::new_without_slot(hash2, 4, key); - let val3 = CalculateHashIntermediate::new_without_slot(hash2, 4, key); - let val4 = CalculateHashIntermediate::new_without_slot(hash2, 4, key); - - let src = vec![vec![val2.clone()], vec![val.clone()]]; - let result = AccountsHash::sort_hash_intermediate(src.clone(), &mut stats); - assert_eq!(result, src); - - let src = vec![ - vec![val2.clone(), val.clone()], - vec![val3.clone(), val4.clone()], - ]; - let sorted = vec![vec![val2, val], vec![val3, val4]]; - let result = AccountsHash::sort_hash_intermediate(src, &mut stats); - assert_eq!(result, sorted); - - let src = vec![vec![]]; - let result = AccountsHash::sort_hash_intermediate(src.clone(), &mut stats); - assert_eq!(result, src); - - let src = vec![]; - let result = AccountsHash::sort_hash_intermediate(src.clone(), &mut stats); - assert_eq!(result, src); } #[test] @@ -1461,18 +1384,6 @@ pub mod tests { AccountsHash::compare_two_hash_entries(&val, &val2) ); - let list = vec![val.clone(), val2.clone()]; - let mut list_bkup = list.clone(); - list_bkup.sort_by(AccountsHash::compare_two_hash_entries); - let list = AccountsHash::sort_hash_intermediate(vec![list], &mut HashStats::default()); - assert_eq!(list, vec![list_bkup]); - - let list = vec![val2, val.clone()]; // reverse args - let mut list_bkup = list.clone(); - list_bkup.sort_by(AccountsHash::compare_two_hash_entries); - let list = AccountsHash::sort_hash_intermediate(vec![list], &mut HashStats::default()); - assert_eq!(list, vec![list_bkup]); - // slot same, vers = let hash3 = Hash::new_unique(); let val3 = CalculateHashIntermediate::new_without_slot(hash3, 2, key); @@ -1498,6 +1409,12 @@ pub mod tests { ); } + fn test_de_dup_accounts_in_parallel( + account_maps: &[CalculateHashIntermediate], + ) -> (Vec, u64, usize) { + AccountsHash::de_dup_accounts_in_parallel(&vec![vec![account_maps.to_vec()]][..], 0) + } + #[test] fn test_accountsdb_remove_zero_balance_accounts() { solana_logger::setup(); @@ -1508,16 +1425,16 @@ pub mod tests { let val = CalculateHashIntermediate::new_without_slot(hash, 1, key); account_maps.push(val.clone()); - let result = AccountsHash::de_dup_accounts_from_stores(true, &account_maps[..]); - assert_eq!(result, (vec![val.hash], val.lamports as u128)); + let result = test_de_dup_accounts_in_parallel(&account_maps[..]); + assert_eq!(result, (vec![val.hash], val.lamports as u64, 1)); // zero original lamports, higher version let val = CalculateHashIntermediate::new_without_slot(hash, ZERO_RAW_LAMPORTS_SENTINEL, key); account_maps.push(val); // has to be after previous entry since account_maps are in slot order - let result = AccountsHash::de_dup_accounts_from_stores(true, &account_maps[..]); - assert_eq!(result, (vec![], 0)); + let result = test_de_dup_accounts_in_parallel(&account_maps[..]); + assert_eq!(result, (vec![], 0, 2)); } #[test] @@ -1718,136 +1635,6 @@ pub mod tests { } } - #[test] - fn test_accountsdb_flatten_hash_intermediate() { - solana_logger::setup(); - let test = vec![vec![vec![CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), - 2, - Pubkey::new_unique(), - )]]]; - let mut stats = HashStats::default(); - let result = AccountsHash::flatten_hash_intermediate(test.clone(), &mut stats); - assert_eq!(result, test[0]); - assert_eq!(stats.unreduced_entries, 1); - - let mut stats = HashStats::default(); - let result = AccountsHash::flatten_hash_intermediate( - vec![vec![vec![CalculateHashIntermediate::default(); 0]]], - &mut stats, - ); - assert_eq!(result.iter().flatten().count(), 0); - assert_eq!(stats.unreduced_entries, 0); - - let test = vec![ - vec![vec![ - CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), - 2, - Pubkey::new_unique(), - ), - CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), - 9, - Pubkey::new_unique(), - ), - ]], - vec![vec![CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), - 5, - Pubkey::new_unique(), - )]], - ]; - let mut stats = HashStats::default(); - let result = AccountsHash::flatten_hash_intermediate(test.clone(), &mut stats); - let expected = test - .into_iter() - .flatten() - .into_iter() - .flatten() - .collect::>(); - assert_eq!(result.into_iter().flatten().collect::>(), expected); - assert_eq!(stats.unreduced_entries, expected.len()); - } - - #[test] - fn test_accountsdb_flatten_hash_intermediate2() { - solana_logger::setup(); - // data is ordered: - // vec: just a level of hierarchy - // vec: 1 vec per PUBKEY_BINS_FOR_CALCULATING_HASHES - // vec: Intermediate data whose pubkey belongs in this division - let binned_data = vec![ - vec![vec![1, 2], vec![3, 4], vec![], vec![5]], - vec![vec![], vec![11, 12]], - ]; - let mut combined: Vec>> = vec![vec![]]; - binned_data.iter().enumerate().for_each(|(bin, v)| { - v.iter() - .enumerate() - .for_each(|(dimension0, v): (usize, &Vec)| { - while combined.len() <= dimension0 { - combined.push(vec![]); - } - let vec: &mut Vec> = &mut combined[dimension0]; - while vec.len() <= bin { - vec.push(vec![]); - } - vec[bin].extend(v.clone()); - }); - }); - - let mut stats = HashStats::default(); - let result = AccountsHash::flatten_hash_intermediate(combined, &mut stats); - assert_eq!( - result, - binned_data - .clone() - .into_iter() - .map(|x| x.into_iter().flatten().collect::>()) - .collect::>() - ); - assert_eq!( - stats.unreduced_entries, - binned_data - .into_iter() - .flatten() - .into_iter() - .flatten() - .count() - ); - - let src = vec![vec![vec![0]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - assert_eq!(result, vec![vec![0]]); - - let src = vec![vec![vec![0], vec![1]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - assert_eq!(result, vec![vec![0], vec![1]]); - - let src = vec![vec![vec![]], vec![vec![], vec![1]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - assert_eq!(result, vec![vec![], vec![1]]); - - let src: Vec>> = vec![vec![vec![], vec![]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - let expected: Vec> = vec![]; - assert_eq!(result, expected); - - let src: Vec>> = vec![vec![vec![], vec![]], vec![vec![], vec![]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - assert_eq!(result, expected); - - let src: Vec>> = vec![vec![vec![], vec![]], vec![vec![]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - assert_eq!(result, expected); - - let src: Vec>> = vec![vec![], vec![vec![]]]; - let result = AccountsHash::flatten_hash_intermediate(src, &mut stats); - let expected: Vec> = vec![]; - assert_eq!(result, expected); - } - fn test_hashing_larger(hashes: Vec<(Pubkey, Hash)>, fanout: usize) -> Hash { let result = AccountsHash::compute_merkle_root(hashes.clone(), fanout); let reduced: Vec<_> = hashes.iter().map(|x| x.1).collect(); @@ -2012,17 +1799,17 @@ pub mod tests { let offset = 2; let input = vec![ CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), + Hash::new(&[1u8; 32]), u64::MAX - offset, Pubkey::new_unique(), ), CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), + Hash::new(&[2u8; 32]), offset + 1, Pubkey::new_unique(), ), ]; - AccountsHash::de_dup_accounts_in_parallel(&input, 1); + AccountsHash::de_dup_accounts_in_parallel(&[vec![input]], 0); } #[test] @@ -2033,16 +1820,20 @@ pub mod tests { let offset = 2; let input = vec![ vec![CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), + Hash::new(&[1u8; 32]), u64::MAX - offset, Pubkey::new_unique(), )], vec![CalculateHashIntermediate::new_without_slot( - Hash::new_unique(), + Hash::new(&[2u8; 32]), offset + 1, Pubkey::new_unique(), )], ]; - AccountsHash::de_dup_and_eliminate_zeros(input, &mut HashStats::default()); + AccountsHash::de_dup_and_eliminate_zeros( + vec![input], + &mut HashStats::default(), + 2, // accounts above are in 2 groups + ); } }