diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index efcd2cd5ea..2367bbf47b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -25,7 +25,7 @@ use { accounts_cache::{AccountsCache, CachedAccount, SlotCache}, accounts_hash::{ AccountsHash, AccountsHasher, CalcAccountsHashConfig, CalculateHashIntermediate, - HashStats, PreviousPass, + HashStats, }, accounts_index::{ AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig, @@ -116,7 +116,6 @@ pub const DEFAULT_NUM_DIRS: u32 = 4; // When calculating hashes, it is helpful to break the pubkeys found into bins based on the pubkey value. // More bins means smaller vectors to sort, copy, etc. pub const PUBKEY_BINS_FOR_CALCULATING_HASHES: usize = 65536; -pub const NUM_SCAN_PASSES_DEFAULT: usize = 2; // Without chunks, we end up with 1 output vec for each outer snapshot storage. // This results in too many vectors to be efficient. @@ -2277,23 +2276,6 @@ impl AccountsDb { Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None) } - /// return (num_hash_scan_passes, bins_per_pass) - fn bins_per_pass() -> (usize, usize) { - let num_hash_scan_passes = NUM_SCAN_PASSES_DEFAULT; - let bins_per_pass = PUBKEY_BINS_FOR_CALCULATING_HASHES / num_hash_scan_passes; - assert!( - num_hash_scan_passes <= PUBKEY_BINS_FOR_CALCULATING_HASHES, - "num_hash_scan_passes must be <= {}", - PUBKEY_BINS_FOR_CALCULATING_HASHES - ); - assert_eq!( - bins_per_pass * num_hash_scan_passes, - PUBKEY_BINS_FOR_CALCULATING_HASHES - ); // evenly divisible - - (num_hash_scan_passes, bins_per_pass) - } - fn default_with_accounts_index( accounts_index: AccountInfoAccountsIndex, accounts_hash_cache_path: Option, @@ -7615,63 +7597,49 @@ impl AccountsDb { self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats); - let (num_hash_scan_passes, bins_per_pass) = Self::bins_per_pass(); let use_bg_thread_pool = config.use_bg_thread_pool; let mut scan_and_hash = || { - let mut previous_pass = PreviousPass::default(); - let mut final_result = (Hash::default(), 0); - let cache_hash_data = self.get_cache_hash_data(config, storages.max_slot_inclusive()); - for pass in 0..num_hash_scan_passes { - let bounds = Range { - start: pass * bins_per_pass, - end: (pass + 1) * bins_per_pass, - }; + let bounds = Range { + start: 0, + end: PUBKEY_BINS_FOR_CALCULATING_HASHES, + }; - let hash = AccountsHasher { - filler_account_suffix: if self.filler_accounts_config.count > 0 { - self.filler_account_suffix - } else { - None - }, - }; + let hash = AccountsHasher { + filler_account_suffix: if self.filler_accounts_config.count > 0 { + self.filler_account_suffix + } else { + None + }, + }; - // get raw data by scanning - let result = self.scan_snapshot_stores_with_cache( - &cache_hash_data, - storages, - &mut stats, - PUBKEY_BINS_FOR_CALCULATING_HASHES, - &bounds, - config, - hash.filler_account_suffix.as_ref(), - )?; + // get raw data by scanning + let result = self.scan_snapshot_stores_with_cache( + &cache_hash_data, + storages, + &mut stats, + PUBKEY_BINS_FOR_CALCULATING_HASHES, + &bounds, + config, + hash.filler_account_suffix.as_ref(), + )?; - // convert mmapped cache files into slices of data - let slices = result - .iter() - .map(|d| d.get_cache_hash_data()) - .collect::>(); + // convert mmapped cache files into slices of data + let slices = result + .iter() + .map(|d| d.get_cache_hash_data()) + .collect::>(); - // rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation' - let result = AccountsHasher::get_binned_data( - &slices, - PUBKEY_BINS_FOR_CALCULATING_HASHES, - &bounds, - ); + // rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation' + let result = AccountsHasher::get_binned_data( + &slices, + PUBKEY_BINS_FOR_CALCULATING_HASHES, + &bounds, + ); - // turn raw data into merkle tree hashes and sum of lamports - let (hash, lamports, for_next_pass) = hash.rest_of_hash_calculation( - result, - &mut stats, - pass == num_hash_scan_passes - 1, - previous_pass, - bins_per_pass, - ); - previous_pass = for_next_pass; - final_result = (hash, lamports); - } + // turn raw data into merkle tree hashes and sum of lamports + let final_result = hash.rest_of_hash_calculation(result, &mut stats); info!( "calculate_accounts_hash_from_storages: slot: {} {:?}", diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index f497f2658b..8146eb1c25 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -1,7 +1,12 @@ use { - crate::{accounts_db::SnapshotStorages, ancestors::Ancestors, rent_collector::RentCollector}, + crate::{ + accounts_db::{SnapshotStorages, PUBKEY_BINS_FOR_CALCULATING_HASHES}, + ancestors::Ancestors, + rent_collector::RentCollector, + }, core::ops::Range, log::*, + memmap2::MmapMut, rayon::prelude::*, solana_measure::measure::Measure, solana_sdk::{ @@ -13,22 +18,81 @@ use { std::{ borrow::Borrow, convert::TryInto, + fs::File, + io::{BufWriter, Write}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Mutex, }, }, + tempfile::tempfile, }; pub const MERKLE_FANOUT: usize = 16; /// the data passed through the processing functions pub type SortedDataByPubkey<'a> = Vec<&'a [CalculateHashIntermediate]>; -#[derive(Default, Debug)] -pub struct PreviousPass { - pub reduced_hashes: Vec>, - pub remaining_unhashed: Vec, - pub lamports: u64, +/// 1 file containing account hashes sorted by pubkey, mapped into memory +struct MmapAccountHashesFile { + mmap: MmapMut, +} + +impl MmapAccountHashesFile { + /// return a slice of account hashes starting at 'index' + fn read(&self, index: usize) -> &[Hash] { + let start = std::mem::size_of::() * index; + let item_slice: &[u8] = &self.mmap[start..]; + let remaining_elements = item_slice.len() / std::mem::size_of::(); + unsafe { + let item = item_slice.as_ptr() as *const Hash; + std::slice::from_raw_parts(item, remaining_elements) + } + } +} + +/// 1 file containing account hashes sorted by pubkey +#[derive(Default)] +pub struct AccountHashesFile { + /// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file. + count_and_writer: Option<(usize, BufWriter)>, +} + +impl AccountHashesFile { + /// map the file into memory and return a reader that can access it by slice + fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> { + std::mem::take(&mut self.count_and_writer).map(|(count, writer)| { + let file = Some(writer.into_inner().unwrap()); + ( + count, + MmapAccountHashesFile { + mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() }, + }, + ) + }) + } + + /// # hashes stored in this file + pub fn count(&self) -> usize { + self.count_and_writer + .as_ref() + .map(|(count, _)| *count) + .unwrap_or_default() + } + + /// write 'hash' to the file + /// If the file isn't open, create it first. + pub fn write(&mut self, hash: &Hash) { + if self.count_and_writer.is_none() { + // we have hashes to write but no file yet, so create a file that will auto-delete on drop + self.count_and_writer = Some((0, BufWriter::new(tempfile().unwrap()))); + } + let mut count_and_writer = self.count_and_writer.as_mut().unwrap(); + assert_eq!( + std::mem::size_of::(), + count_and_writer.1.write(hash.as_ref()).unwrap() + ); + count_and_writer.0 += 1; + } } #[derive(Debug)] @@ -325,14 +389,57 @@ pub struct CumulativeOffsets { total_count: usize, } +/// used by merkle tree calculation to lookup account hashes by overall index +#[derive(Default)] +pub struct CumulativeHashesFromFiles { + /// source of hashes in order + readers: Vec, + /// look up reader index and offset by overall index + cumulative: CumulativeOffsets, +} + +impl CumulativeHashesFromFiles { + /// Calculate offset from overall index to which file and offset within that file based on the length of each hash file. + /// Also collect readers to access the data. + pub fn from_files(hashes: Vec) -> Self { + let mut readers = Vec::with_capacity(hashes.len()); + let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| { + // ignores all hashfiles that have zero entries + hash_file.get_reader().map(|(count, reader)| { + readers.push(reader); + count + }) + })); + Self { + cumulative, + readers, + } + } + + /// total # of items referenced + pub fn total_count(&self) -> usize { + self.cumulative.total_count + } + + // return the biggest slice possible that starts at the overall index 'start' + pub fn get_slice(&self, start: usize) -> &[Hash] { + let (start, offset) = self.cumulative.find(start); + let data_source_index = offset.index[0]; + let data = &self.readers[data_source_index]; + // unwrap here because we should never ask for data that doesn't exist. If we do, then cumulative calculated incorrectly. + data.read(start) + } +} + impl CumulativeOffsets { - pub fn from_raw(raw: &[Vec]) -> CumulativeOffsets { + pub fn new(iter: I) -> Self + where + I: Iterator, + { let mut total_count: usize = 0; - let cumulative_offsets: Vec<_> = raw - .iter() + let cumulative_offsets: Vec<_> = iter .enumerate() - .filter_map(|(i, v)| { - let len = v.len(); + .filter_map(|(i, len)| { if len > 0 { let result = CumulativeOffset::new(vec![i], total_count); total_count += len; @@ -349,7 +456,11 @@ impl CumulativeOffsets { } } - pub fn from_raw_2d(raw: &[Vec>]) -> CumulativeOffsets { + pub fn from_raw(raw: &[Vec]) -> Self { + Self::new(raw.iter().map(|v| v.len())) + } + + pub fn from_raw_2d(raw: &[Vec>]) -> Self { let mut total_count: usize = 0; let mut cumulative_offsets = Vec::with_capacity(0); for (i, v_outer) in raw.iter().enumerate() { @@ -372,6 +483,7 @@ impl CumulativeOffsets { } } + /// find the index of the data source that contains 'start' fn find_index(&self, start: usize) -> usize { assert!(!self.cumulative_offsets.is_empty()); match self.cumulative_offsets[..].binary_search_by(|index| index.start_offset.cmp(&start)) { @@ -380,6 +492,9 @@ impl CumulativeOffsets { } } + /// given overall start index 'start' + /// return ('start', which is the offset into the data source at 'index', + /// and 'index', which is the data source to use) fn find(&self, start: usize) -> (usize, &CumulativeOffset) { let index = self.find_index(start); let index = &self.cumulative_offsets[index]; @@ -517,6 +632,7 @@ impl AccountsHasher { specific_level_count: Option, ) -> (Hash, Vec) where + // returns a slice of hashes starting at the given overall index F: Fn(usize) -> &'a [T] + std::marker::Sync, T: Borrow + std::marker::Sync + 'a, { @@ -765,13 +881,13 @@ impl AccountsHasher { /// returns: /// Vec, with one entry per bin /// for each entry, Vec in pubkey order - /// If return Vec> was flattened, it would be all hashes, in pubkey order. + /// If return Vec was flattened, it would be all hashes, in pubkey order. fn de_dup_and_eliminate_zeros<'a>( &self, sorted_data_by_pubkey: &'a [SortedDataByPubkey<'a>], stats: &mut HashStats, max_bin: usize, - ) -> (Vec>, u64) { + ) -> (Vec, u64) { // 1. eliminate zero lamport accounts // 2. pick the highest slot or (slot = and highest version) of each pubkey // 3. produce this output: @@ -780,10 +896,10 @@ impl AccountsHasher { // b. lamports let mut zeros = Measure::start("eliminate zeros"); let min_max_sum_entries_hashes = Mutex::new((usize::MAX, usize::MIN, 0u64, 0usize, 0usize)); - let hashes: Vec> = (0..max_bin) + let hashes: Vec<_> = (0..max_bin) .into_par_iter() .map(|bin| { - let (hashes, lamports_bin, unreduced_entries_count) = + let (hashes_file, lamports_bin, unreduced_entries_count) = self.de_dup_accounts_in_parallel(sorted_data_by_pubkey, bin); { let mut lock = min_max_sum_entries_hashes.lock().unwrap(); @@ -794,10 +910,10 @@ impl AccountsHasher { lamports_sum as u128 + lamports_bin as u128, ); entries += unreduced_entries_count; - hash_total += hashes.len(); + hash_total += hashes_file.count(); *lock = (min, max, lamports_sum, entries, hash_total); } - hashes + hashes_file }) .collect(); zeros.stop(); @@ -854,14 +970,14 @@ impl AccountsHasher { // 1. eliminate zero lamport accounts // 2. pick the highest slot or (slot = and highest version) of each pubkey // 3. produce this output: - // a. vec: individual hashes in pubkey order + // a. AccountHashesFile: individual account hashes in pubkey order // b. lamport sum // c. unreduced count (ie. including duplicates and zero lamport) fn de_dup_accounts_in_parallel<'a>( &self, pubkey_division: &'a [SortedDataByPubkey<'a>], pubkey_bin: usize, - ) -> (Vec<&'a Hash>, u64, usize) { + ) -> (AccountHashesFile, u64, usize) { let len = pubkey_division.len(); let mut unreduced_count = 0; let mut indexes = vec![0; len]; @@ -869,7 +985,7 @@ impl AccountsHasher { // map from index of an item in first_items[] to index of the corresponding item in pubkey_division[] // this will change as items in pubkey_division[] are exhausted let mut first_item_to_pubkey_division = Vec::with_capacity(len); - + let mut hashes = AccountHashesFile::default(); // initialize 'first_items', which holds the current lowest item in each slot group pubkey_division.iter().enumerate().for_each(|(i, bins)| { // check to make sure we can do bins[pubkey_bin] @@ -883,7 +999,6 @@ impl AccountsHasher { } }); let mut overall_sum = 0; - let mut hashes: Vec<&Hash> = Vec::with_capacity(unreduced_count); let mut duplicate_pubkey_indexes = Vec::with_capacity(len); let filler_accounts_enabled = self.filler_accounts_enabled(); @@ -933,7 +1048,7 @@ impl AccountsHasher { overall_sum = Self::checked_cast_for_capitalization( item.lamports as u128 + overall_sum as u128, ); - hashes.push(&item.hash); + hashes.write(&item.hash); } if !duplicate_pubkey_indexes.is_empty() { // skip past duplicate keys in earlier slots @@ -970,115 +1085,26 @@ impl AccountsHasher { &self, data_sections_by_pubkey: Vec>, mut stats: &mut HashStats, - is_last_pass: bool, - mut previous_state: PreviousPass, - max_bin: usize, - ) -> (Hash, u64, PreviousPass) { - let (mut hashes, mut total_lamports) = - self.de_dup_and_eliminate_zeros(&data_sections_by_pubkey, stats, max_bin); + ) -> (Hash, u64) { + let (hashes, total_lamports) = self.de_dup_and_eliminate_zeros( + &data_sections_by_pubkey, + stats, + PUBKEY_BINS_FOR_CALCULATING_HASHES, + ); - total_lamports += previous_state.lamports; + let cumulative = CumulativeHashesFromFiles::from_files(hashes); - let mut _remaining_unhashed = None; - if !previous_state.remaining_unhashed.is_empty() { - // These items were not hashed last iteration because they didn't divide evenly. - // These are hashes for pubkeys that are < the pubkeys we are looking at now, so their hashes go first in order. - _remaining_unhashed = Some(previous_state.remaining_unhashed); - hashes.insert( - 0, - _remaining_unhashed - .as_ref() - .unwrap() - .iter() - .collect::>(), - ); - previous_state.remaining_unhashed = Vec::new(); - } - - let mut next_pass = PreviousPass::default(); - let cumulative = CumulativeOffsets::from_raw(&hashes); - let mut hash_total = cumulative.total_count; - next_pass.reduced_hashes = previous_state.reduced_hashes; - - const TARGET_FANOUT_LEVEL: usize = 3; - let target_fanout = MERKLE_FANOUT.pow(TARGET_FANOUT_LEVEL as u32); - - if !is_last_pass { - next_pass.lamports = total_lamports; - total_lamports = 0; - - // Save hashes that don't evenly hash. They will be combined with hashes from the next pass. - let left_over_hashes = hash_total % target_fanout; - - // move tail hashes that don't evenly hash into a 1d vector for next time - let mut i = hash_total - left_over_hashes; - while i < hash_total { - let data = cumulative.get_slice(&hashes, i); - next_pass.remaining_unhashed.extend(data.iter().cloned()); - i += data.len(); - } - - hash_total -= left_over_hashes; // this is enough to cause the hashes at the end of the data set to be ignored - } - - // if we have raw hashes to process and - // we are not the last pass (we already modded against target_fanout) OR - // we have previously surpassed target_fanout and hashed some already to the target_fanout level. In that case, we know - // we need to hash whatever is left here to the target_fanout level. - if hash_total != 0 && (!is_last_pass || !next_pass.reduced_hashes.is_empty()) { - let mut hash_time = Measure::start("hash"); - let partial_hashes = Self::compute_merkle_root_from_slices( - hash_total, // note this does not include the ones that didn't divide evenly, unless we're in the last iteration - MERKLE_FANOUT, - Some(TARGET_FANOUT_LEVEL), - |start| cumulative.get_slice(&hashes, start), - Some(TARGET_FANOUT_LEVEL), - ) - .1; - hash_time.stop(); - stats.hash_time_total_us += hash_time.as_us(); - stats.hash_time_pre_us += hash_time.as_us(); - next_pass.reduced_hashes.push(partial_hashes); - } - - let no_progress = is_last_pass && next_pass.reduced_hashes.is_empty() && !hashes.is_empty(); - if no_progress { - // we never made partial progress, so hash everything now - hashes.into_iter().for_each(|v| { - if !v.is_empty() { - next_pass - .reduced_hashes - .push(v.into_iter().cloned().collect()); - } - }); - } - - let hash = if is_last_pass { - let cumulative = CumulativeOffsets::from_raw(&next_pass.reduced_hashes); - - let hash = if cumulative.total_count == 1 && !no_progress { - // all the passes resulted in a single hash, that means we're done, so we had <= MERKLE_ROOT total hashes - cumulative.get_slice(&next_pass.reduced_hashes, 0)[0] - } else { - let mut hash_time = Measure::start("hash"); - // hash all the rest and combine and hash until we have only 1 hash left - let (hash, _) = Self::compute_merkle_root_from_slices( - cumulative.total_count, - MERKLE_FANOUT, - None, - |start| cumulative.get_slice(&next_pass.reduced_hashes, start), - None, - ); - hash_time.stop(); - stats.hash_time_total_us += hash_time.as_us(); - hash - }; - next_pass.reduced_hashes = Vec::new(); - hash - } else { - Hash::default() - }; - (hash, total_lamports, next_pass) + let mut hash_time = Measure::start("hash"); + let (hash, _) = Self::compute_merkle_root_from_slices( + cumulative.total_count(), + MERKLE_FANOUT, + None, + |start| cumulative.get_slice(start), + None, + ); + hash_time.stop(); + stats.hash_time_total_us += hash_time.as_us(); + (hash, total_lamports) } } @@ -1090,6 +1116,97 @@ pub struct AccountsHash(pub Hash); pub mod tests { use {super::*, std::str::FromStr}; + #[test] + fn test_account_hashes_file() { + // 0 hashes + let mut file = AccountHashesFile::default(); + assert!(file.get_reader().is_none()); + let hashes = (0..2).map(|i| Hash::new(&[i; 32])).collect::>(); + + // 1 hash + file.write(&hashes[0]); + let reader = file.get_reader().unwrap(); + assert_eq!(&[hashes[0]][..], reader.1.read(0)); + assert!(reader.1.read(1).is_empty()); + + // multiple hashes + let mut file = AccountHashesFile::default(); + assert!(file.get_reader().is_none()); + hashes.iter().for_each(|hash| file.write(hash)); + let reader = file.get_reader().unwrap(); + (0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i))); + assert!(reader.1.read(2).is_empty()); + } + + #[test] + fn test_cumulative_hashes_from_files() { + (0..4).for_each(|permutation| { + let hashes = (0..2).map(|i| Hash::new(&[i + 1; 32])).collect::>(); + + let mut combined = Vec::default(); + + // 0 hashes + let file0 = AccountHashesFile::default(); + + // 1 hash + let mut file1 = AccountHashesFile::default(); + file1.write(&hashes[0]); + combined.push(hashes[0]); + + // multiple hashes + let mut file2 = AccountHashesFile::default(); + hashes.iter().for_each(|hash| { + file2.write(hash); + combined.push(*hash); + }); + + let hashes = if permutation == 0 { + vec![file0, file1, file2] + } else if permutation == 1 { + // include more empty files + vec![ + file0, + file1, + AccountHashesFile::default(), + file2, + AccountHashesFile::default(), + ] + } else if permutation == 2 { + vec![file1, file2] + } else { + // swap file2 and 1 + let one = combined.remove(0); + combined.push(one); + vec![ + file2, + AccountHashesFile::default(), + AccountHashesFile::default(), + file1, + ] + }; + + let cumulative = CumulativeHashesFromFiles::from_files(hashes); + let len = combined.len(); + assert_eq!(cumulative.total_count(), len); + (0..combined.len()).for_each(|start| { + let mut retreived = Vec::default(); + let mut cumulative_start = start; + // read all data + while retreived.len() < (len - start) { + let this_one = cumulative.get_slice(cumulative_start); + retreived.extend(this_one.iter()); + cumulative_start += this_one.len(); + assert_ne!(0, this_one.len()); + } + assert_eq!( + &combined[start..], + &retreived[..], + "permutation: {permutation}" + ); + }); + }); + } + #[test] fn test_accountsdb_div_ceil() { assert_eq!(AccountsHasher::div_ceil(10, 3), 4); @@ -1127,13 +1244,8 @@ pub mod tests { account_maps.push(val); let accounts_hash = AccountsHasher::default(); - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&account_maps), - &mut HashStats::default(), - true, - PreviousPass::default(), - one_range(), - ); + let result = accounts_hash + .rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default()); let expected_hash = Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap(); assert_eq!((result.0, result.1), (expected_hash, 88)); @@ -1143,13 +1255,8 @@ pub mod tests { let val = CalculateHashIntermediate::new(hash, 20, key); account_maps.insert(0, val); - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&account_maps), - &mut HashStats::default(), - true, - PreviousPass::default(), - one_range(), - ); + let result = accounts_hash + .rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default()); let expected_hash = Hash::from_str("EHv9C5vX7xQjjMpsJMzudnDTzoTSRwYkqLzY8tVMihGj").unwrap(); assert_eq!((result.0, result.1), (expected_hash, 108)); @@ -1159,13 +1266,8 @@ pub mod tests { let val = CalculateHashIntermediate::new(hash, 30, key); account_maps.insert(1, val); - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&account_maps), - &mut HashStats::default(), - true, - PreviousPass::default(), - one_range(), - ); + let result = accounts_hash + .rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default()); let expected_hash = Hash::from_str("7NNPg5A8Xsg1uv4UFm6KZNwsipyyUnmgCrznP6MBWoBZ").unwrap(); assert_eq!((result.0, result.1), (expected_hash, 118)); } @@ -1178,308 +1280,6 @@ pub mod tests { 0 } - const EMPTY_DATA: [CalculateHashIntermediate; 0] = []; - - fn empty_data() -> Vec> { - vec![vec![&EMPTY_DATA]] - } - - #[test] - fn test_accountsdb_multi_pass_rest_of_hash_calculation() { - solana_logger::setup(); - - // passes: - // 0: empty, NON-empty, empty, empty final - // 1: NON-empty, empty final - // 2: NON-empty, empty, empty final - for pass in 0..3 { - let mut account_maps = Vec::new(); - - let key = Pubkey::new(&[11u8; 32]); - let hash = Hash::new(&[1u8; 32]); - let val = CalculateHashIntermediate::new(hash, 88, key); - account_maps.push(val); - - // 2nd key - zero lamports, so will be removed - let key = Pubkey::new(&[12u8; 32]); - let hash = Hash::new(&[2u8; 32]); - let val = CalculateHashIntermediate::new(hash, 0, key); - account_maps.push(val); - - let mut previous_pass = PreviousPass::default(); - - let accounts_index = AccountsHasher::default(); - if pass == 0 { - // first pass that is not last and is empty - let result = accounts_index.rest_of_hash_calculation( - empty_data(), - &mut HashStats::default(), - false, // not last pass - previous_pass, - one_range(), - ); - assert_eq!(result.0, Hash::default()); - assert_eq!(result.1, 0); - previous_pass = result.2; - assert_eq!(previous_pass.remaining_unhashed.len(), 0); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, 0); - } - - let result = accounts_index.rest_of_hash_calculation( - for_rest(&account_maps), - &mut HashStats::default(), - false, // not last pass - previous_pass, - one_range(), - ); - - assert_eq!(result.0, Hash::default()); - assert_eq!(result.1, 0); - let mut previous_pass = result.2; - assert_eq!(previous_pass.remaining_unhashed, vec![account_maps[0].hash]); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, account_maps[0].lamports); - - let expected_hash = - Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap(); - let accounts_index = AccountsHasher::default(); - if pass == 2 { - let result = accounts_index.rest_of_hash_calculation( - empty_data(), - &mut HashStats::default(), - false, - previous_pass, - one_range(), - ); - - previous_pass = result.2; - assert_eq!(previous_pass.remaining_unhashed, vec![account_maps[0].hash]); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, account_maps[0].lamports); - } - - let result = accounts_index.rest_of_hash_calculation( - empty_data(), - &mut HashStats::default(), - true, // finally, last pass - previous_pass, - one_range(), - ); - let previous_pass = result.2; - - assert_eq!(previous_pass.remaining_unhashed.len(), 0); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, 0); - - assert_eq!((result.0, result.1), (expected_hash, 88)); - } - } - - #[test] - fn test_accountsdb_multi_pass_rest_of_hash_calculation_partial() { - solana_logger::setup(); - - let mut account_maps = Vec::new(); - - let key = Pubkey::new(&[11u8; 32]); - let hash = Hash::new(&[1u8; 32]); - let val = CalculateHashIntermediate::new(hash, 88, key); - account_maps.push(val); - - let key = Pubkey::new(&[12u8; 32]); - let hash = Hash::new(&[2u8; 32]); - let val = CalculateHashIntermediate::new(hash, 20, key); - account_maps.push(val); - let accounts_hash = AccountsHasher::default(); - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&[account_maps[0].clone()]), - &mut HashStats::default(), - false, // not last pass - PreviousPass::default(), - one_range(), - ); - - assert_eq!(result.0, Hash::default()); - assert_eq!(result.1, 0); - let previous_pass = result.2; - assert_eq!(previous_pass.remaining_unhashed, vec![account_maps[0].hash]); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, account_maps[0].lamports); - - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&[account_maps[1].clone()]), - &mut HashStats::default(), - false, // not last pass - previous_pass, - one_range(), - ); - - assert_eq!(result.0, Hash::default()); - assert_eq!(result.1, 0); - let previous_pass = result.2; - assert_eq!( - previous_pass.remaining_unhashed, - vec![account_maps[0].hash, account_maps[1].hash] - ); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - let total_lamports_expected = account_maps[0].lamports + account_maps[1].lamports; - assert_eq!(previous_pass.lamports, total_lamports_expected); - - let result = accounts_hash.rest_of_hash_calculation( - empty_data(), - &mut HashStats::default(), - true, - previous_pass, - one_range(), - ); - - let previous_pass = result.2; - assert_eq!(previous_pass.remaining_unhashed.len(), 0); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, 0); - - let expected_hash = AccountsHasher::compute_merkle_root( - account_maps - .iter() - .map(|a| (a.pubkey, a.hash)) - .collect::>(), - MERKLE_FANOUT, - ); - - assert_eq!( - (result.0, result.1), - (expected_hash, total_lamports_expected) - ); - } - - #[test] - fn test_accountsdb_multi_pass_rest_of_hash_calculation_partial_hashes() { - solana_logger::setup(); - - let mut account_maps = Vec::new(); - let accounts_hash = AccountsHasher::default(); - - const TARGET_FANOUT_LEVEL: usize = 3; - let target_fanout = MERKLE_FANOUT.pow(TARGET_FANOUT_LEVEL as u32); - let mut total_lamports_expected = 0; - let plus1 = target_fanout + 1; - for i in 0..plus1 * 2 { - let lamports = (i + 1) as u64; - total_lamports_expected += lamports; - let key = Pubkey::new_unique(); - let hash = Hash::new_unique(); - let val = CalculateHashIntermediate::new(hash, lamports, key); - account_maps.push(val); - } - - let mut chunk = account_maps[0..plus1].to_vec(); - chunk.sort_by(AccountsHasher::compare_two_hash_entries); - let sorted = chunk.clone(); - - // first 4097 hashes (1 left over) - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&chunk), - &mut HashStats::default(), - false, // not last pass - PreviousPass::default(), - one_range(), - ); - - assert_eq!(result.0, Hash::default()); - assert_eq!(result.1, 0); - let previous_pass = result.2; - let left_over_1 = sorted[plus1 - 1].hash; - assert_eq!(previous_pass.remaining_unhashed, vec![left_over_1]); - assert_eq!(previous_pass.reduced_hashes.len(), 1); - let expected_hash = AccountsHasher::compute_merkle_root( - sorted[0..target_fanout] - .iter() - .map(|a| (a.pubkey, a.hash)) - .collect::>(), - MERKLE_FANOUT, - ); - assert_eq!(previous_pass.reduced_hashes[0], vec![expected_hash]); - assert_eq!( - previous_pass.lamports, - account_maps[0..plus1] - .iter() - .map(|i| i.lamports) - .sum::() - ); - - let mut chunk = account_maps[plus1..plus1 * 2].to_vec(); - chunk.sort_by(AccountsHasher::compare_two_hash_entries); - let sorted2 = chunk.clone(); - - let mut with_left_over = vec![left_over_1]; - with_left_over.extend(sorted2[0..plus1 - 2].iter().cloned().map(|i| i.hash)); - let expected_hash2 = AccountsHasher::compute_merkle_root( - with_left_over[0..target_fanout] - .iter() - .map(|a| (Pubkey::default(), *a)) - .collect::>(), - MERKLE_FANOUT, - ); - - // second 4097 hashes (2 left over) - let result = accounts_hash.rest_of_hash_calculation( - for_rest(&chunk), - &mut HashStats::default(), - false, // not last pass - previous_pass, - one_range(), - ); - - assert_eq!(result.0, Hash::default()); - assert_eq!(result.1, 0); - let previous_pass = result.2; - assert_eq!( - previous_pass.remaining_unhashed, - vec![sorted2[plus1 - 2].hash, sorted2[plus1 - 1].hash] - ); - assert_eq!(previous_pass.reduced_hashes.len(), 2); - assert_eq!( - previous_pass.reduced_hashes, - vec![vec![expected_hash], vec![expected_hash2]] - ); - assert_eq!( - previous_pass.lamports, - account_maps[0..plus1 * 2] - .iter() - .map(|i| i.lamports) - .sum::() - ); - - let result = accounts_hash.rest_of_hash_calculation( - empty_data(), - &mut HashStats::default(), - true, - previous_pass, - one_range(), - ); - - let previous_pass = result.2; - assert_eq!(previous_pass.remaining_unhashed.len(), 0); - assert_eq!(previous_pass.reduced_hashes.len(), 0); - assert_eq!(previous_pass.lamports, 0); - - let mut combined = sorted; - combined.extend(sorted2); - let expected_hash = AccountsHasher::compute_merkle_root( - combined - .iter() - .map(|a| (a.pubkey, a.hash)) - .collect::>(), - MERKLE_FANOUT, - ); - - assert_eq!( - (result.0, result.1), - (expected_hash, total_lamports_expected) - ); - } - #[test] fn test_accountsdb_de_dup_accounts_zero_chunks() { let vec = [vec![vec![CalculateHashIntermediate { @@ -1488,12 +1288,22 @@ pub mod tests { }]]]; let temp_vec = vec.to_vec(); let slice = convert_to_slice2(&temp_vec); - let (hashes, lamports, _) = + let (mut hashes, lamports, _) = AccountsHasher::default().de_dup_accounts_in_parallel(&slice, 0); - assert_eq!(vec![&Hash::default()], hashes); + assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0)); assert_eq!(lamports, 1); } + fn get_vec_vec(hashes: Vec) -> Vec> { + hashes.into_iter().map(get_vec).collect() + } + fn get_vec(mut hashes: AccountHashesFile) -> Vec { + hashes + .get_reader() + .map(|r| r.1.read(0).to_vec()) + .unwrap_or_default() + } + #[test] fn test_accountsdb_de_dup_accounts_empty() { solana_logger::setup(); @@ -1503,23 +1313,26 @@ pub mod tests { let (hashes, lamports) = accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), one_range()); assert_eq!( - vec![&Hash::default(); 0], - hashes.into_iter().flatten().collect::>() + vec![Hash::default(); 0], + get_vec_vec(hashes) + .into_iter() + .flatten() + .collect::>(), ); assert_eq!(lamports, 0); let vec = vec![]; let (hashes, lamports) = accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), zero_range()); - let empty: Vec> = Vec::default(); - assert_eq!(empty, hashes); + let empty: Vec> = Vec::default(); + assert_eq!(empty, get_vec_vec(hashes)); assert_eq!(lamports, 0); let (hashes, lamports, _) = accounts_hash.de_dup_accounts_in_parallel(&[], 1); - assert_eq!(vec![&Hash::default(); 0], hashes); + assert_eq!(vec![Hash::default(); 0], get_vec(hashes)); assert_eq!(lamports, 0); let (hashes, lamports, _) = accounts_hash.de_dup_accounts_in_parallel(&[], 2); - assert_eq!(vec![&Hash::default(); 0], hashes); + assert_eq!(vec![Hash::default(); 0], get_vec(hashes)); assert_eq!(lamports, 0); } @@ -1626,6 +1439,12 @@ pub mod tests { end - start, ); + let hashes2 = get_vec(hashes2); + let hashes3 = get_vec(hashes3); + let hashes4 = get_vec_vec(hashes4); + let hashes5 = get_vec_vec(hashes5); + let hashes6 = get_vec_vec(hashes6); + assert_eq!(hashes2, hashes3); let expected2 = hashes2.clone(); assert_eq!( @@ -1732,7 +1551,7 @@ pub mod tests { fn test_de_dup_accounts_in_parallel<'a>( account_maps: &'a [SortedDataByPubkey<'a>], - ) -> (Vec<&'a Hash>, u64, usize) { + ) -> (AccountHashesFile, u64, usize) { AccountsHasher::default().de_dup_accounts_in_parallel(account_maps, 0) } @@ -1748,8 +1567,11 @@ pub mod tests { let vecs = vec![vec![account_maps.to_vec()]]; let slice = convert_to_slice2(&vecs); - let result = test_de_dup_accounts_in_parallel(&slice); - assert_eq!(result, (vec![&val.hash], val.lamports, 1)); + let (hashfile, lamports, count) = test_de_dup_accounts_in_parallel(&slice); + assert_eq!( + (get_vec(hashfile), lamports, count), + (vec![val.hash], val.lamports, 1) + ); // zero original lamports, higher version let val = CalculateHashIntermediate::new(hash, 0, key); @@ -1757,8 +1579,8 @@ pub mod tests { let vecs = vec![vec![account_maps.to_vec()]]; let slice = convert_to_slice2(&vecs); - let result = test_de_dup_accounts_in_parallel(&slice); - assert_eq!(result, (vec![], 0, 2)); + let (hashfile, lamports, count) = test_de_dup_accounts_in_parallel(&slice); + assert_eq!((get_vec(hashfile), lamports, count), (vec![], 0, 2)); } #[test]