From 682999a42317680156f0c78a32adb9fdc3f8d5b3 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 18 Oct 2022 07:51:38 -0700 Subject: [PATCH] accounts hash calculation uses files instead of memory (#28065) --- runtime/src/accounts_db.rs | 240 +++++++++++++++++---------------- runtime/src/accounts_hash.rs | 75 +++++++---- runtime/src/cache_hash_data.rs | 9 +- 3 files changed, 182 insertions(+), 142 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 3499d621ef..2d90aadffe 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -43,7 +43,7 @@ use { }, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, bank::Rewrites, - cache_hash_data::CacheHashData, + cache_hash_data::{CacheHashData, CacheHashDataFile}, contains::Contains, epoch_accounts_hash::EpochAccountsHashManager, pubkey_bins::PubkeyBinCalculator24, @@ -7042,9 +7042,9 @@ impl AccountsDb { } /// Scan through all the account storage in parallel. - /// Returns a Vec of cache data. At this level, the vector is ordered from older slots to newer slots. - /// A single pubkey could be in multiple entries. The pubkey found int the latest entry is the one to use. - /// Each entry in the Vec contains data binned by pubkey according to the various binning parameters. + /// Returns a Vec of open/mmapped files. + /// Each file has serialized hash info, sorted by pubkey and then slot, from scanning the append vecs. + /// A single pubkey could be in multiple entries. The pubkey found in the latest entry is the one to use. fn scan_account_storage_no_bank( &self, cache_hash_data: &CacheHashData, @@ -7052,9 +7052,8 @@ impl AccountsDb { snapshot_storages: &SortedStorages, scanner: S, bin_range: &Range, - bin_calculator: &PubkeyBinCalculator24, stats: &HashStats, - ) -> Vec + ) -> Vec where S: AppendVecScan, { @@ -7066,34 +7065,12 @@ impl AccountsDb { snapshot_storages, ); - let range = snapshot_storages.range(); - let start_bin_index = bin_range.start; - (0..splitter.chunk_count) .into_par_iter() .map(|chunk| { let mut scanner = scanner.clone(); - let range_this_chunk = splitter.get_slot_range(chunk); - - if range_this_chunk.is_none() { - return scanner.scanning_complete(); - } - let range_this_chunk = range_this_chunk.unwrap(); - - let should_cache_hash_data = CalcAccountsHashConfig::get_should_cache_hash_data() - || config.store_detailed_debug_info_on_failure; - - // Single cached slots get cached and full chunks get cached. - // chunks that don't divide evenly would include some cached append vecs that are no longer part of this range and some that are, so we have to ignore caching on non-evenly dividing chunks. - let eligible_for_caching = splitter.is_chunk_ancient(chunk) - || range_this_chunk.end.saturating_sub(range_this_chunk.start) - == MAX_ITEMS_PER_CHUNK; - - if eligible_for_caching || config.store_detailed_debug_info_on_failure { - let range = bin_range.end - bin_range.start; - scanner.init_accum(range); - } + let range_this_chunk = splitter.get_slot_range(chunk)?; let slots_per_epoch = config .rent_collector @@ -7104,9 +7081,7 @@ impl AccountsDb { .end .saturating_sub(slots_per_epoch); - let file_name = if (should_cache_hash_data && eligible_for_caching) - || config.store_detailed_debug_info_on_failure - { + let file_name = { let mut load_from_cache = true; let mut hasher = std::collections::hash_map::DefaultHasher::new(); bin_range.start.hash(&mut hasher); @@ -7123,47 +7098,31 @@ impl AccountsDb { break; } } + // we have a hash value for all the storages in this slot + // so, build a file name: + let hash = hasher.finish(); + let file_name = format!( + "{}.{}.{}.{}.{}", + range_this_chunk.start, + range_this_chunk.end, + bin_range.start, + bin_range.end, + hash + ); if load_from_cache { - // we have a hash value for all the storages in this slot - // so, build a file name: - let hash = hasher.finish(); - let file_name = format!( - "{}.{}.{}.{}.{}", - range_this_chunk.start, - range_this_chunk.end, - bin_range.start, - bin_range.end, - hash - ); - let mut retval = scanner.get_accum(); - if eligible_for_caching - && cache_hash_data - .load( - &Path::new(&file_name), - &mut retval, - start_bin_index, - bin_calculator, - ) - .is_ok() - { - return retval; + if let Ok(mapped_file) = cache_hash_data.load_map(&Path::new(&file_name)) { + return Some(mapped_file); } - scanner.set_accum(retval); + } - // fall through and load normally - we failed to load - file_name - } else { - String::default() - } - } else { - for (slot, sub_storages) in snapshot_storages.iter_range(&range_this_chunk) { - if bin_range.start == 0 && slot < one_epoch_old { - self.update_old_slot_stats(stats, sub_storages); - } - } - String::default() + // fall through and load normally - we failed to load from a cache file + file_name }; + // load from cache failed, so create the cache file for this chunk + let range = bin_range.end - bin_range.start; + scanner.init_accum(range); + for (slot, sub_storages) in snapshot_storages.iter_range(&range_this_chunk) { scanner.set_slot(slot); if let Some(sub_storages) = sub_storages { @@ -7171,24 +7130,15 @@ impl AccountsDb { } } let r = scanner.scanning_complete(); - if !file_name.is_empty() { - let result = cache_hash_data.save(Path::new(&file_name), &r); - - if result.is_err() { - info!( - "FAILED_TO_SAVE: {}-{}, {}, first_chunk_start: {}, {:?}, error: {:?}", - range.start, - range.end, - splitter.non_ancient_slot_count, - splitter.first_chunk_start, - file_name, - result, - ); - } - } - r + assert!(!file_name.is_empty()); + (!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| { + // error if we can't write this + let file_name = Path::new(&file_name); + cache_hash_data.save(Path::new(&file_name), &r).unwrap(); + cache_hash_data.load_map(&file_name).unwrap() + }) }) - .filter(|x| !x.is_empty()) + .filter_map(|x| x) .collect() } @@ -7338,7 +7288,7 @@ impl AccountsDb { bin_range: &Range, config: &CalcAccountsHashConfig<'_>, filler_account_suffix: Option<&Pubkey>, - ) -> Result, BankHashVerificationError> { + ) -> Result, BankHashVerificationError> { let bin_calculator = PubkeyBinCalculator24::new(bins); assert!(bin_range.start < bins && bin_range.end <= bins && bin_range.start < bin_range.end); let mut time = Measure::start("scan all accounts"); @@ -7361,13 +7311,12 @@ impl AccountsDb { pubkey_to_bin_index: 0, }; - let result: Vec = self.scan_account_storage_no_bank( + let result = self.scan_account_storage_no_bank( cache_hash_data, config, storage, scanner, bin_range, - &bin_calculator, stats, ); @@ -7485,6 +7434,19 @@ impl AccountsDb { hash.filler_account_suffix.as_ref(), )?; + // convert mmapped cache files into slices of data + let slices = result + .iter() + .map(|d| d.get_cache_hash_data()) + .collect::>(); + + // rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation' + let result = AccountsHash::get_binned_data( + &slices, + PUBKEY_BINS_FOR_CALCULATING_HASHES, + &bounds, + ); + // turn raw data into merkle tree hashes and sum of lamports let (hash, lamports, for_next_pass) = hash.rest_of_hash_calculation( result, @@ -9588,6 +9550,7 @@ pub mod tests { tests::*, AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount, }, append_vec::{test_utils::TempFile, AccountMeta}, + cache_hash_data_stats::CacheHashDataStats, inline_spl_token, }, assert_matches::assert_matches, @@ -9627,7 +9590,7 @@ pub mod tests { bins: usize, bin_range: &Range, check_hash: bool, - ) -> Result, BankHashVerificationError> { + ) -> Result, BankHashVerificationError> { let temp_dir = TempDir::new().unwrap(); let accounts_hash_cache_path = temp_dir.path(); self.scan_snapshot_stores_with_cache( @@ -9777,9 +9740,35 @@ pub mod tests { SortedStorages::new(input) } - /// helper method. Refactoring will soon make this comparison more complicated. - fn assert_scan(result: Vec, expected: Vec) { - assert_eq!(result, expected); + /// helper to compare expected binned data with scan result in cache files + /// result: return from scanning + /// expected: binned data expected + /// bins: # bins total to divide pubkeys into + /// start_bin_index: bin # that was the minimum # we were scanning for 0<=start_bin_index, + expected: Vec, + bins: usize, + start_bin_index: usize, + bin_range: usize, + ) { + assert_eq!(expected.len(), result.len()); + + for cache_file in &result { + let mut result2 = (0..bin_range).map(|_| Vec::default()).collect::>(); + cache_file.load_all( + &mut result2, + start_bin_index, + &PubkeyBinCalculator24::new(bins), + &mut CacheHashDataStats::default(), + ); + assert_eq!( + convert_to_slice(&[result2]), + expected, + "bins: {bins}, start_bin_index: {start_bin_index}" + ); + } } #[test] @@ -9803,7 +9792,7 @@ pub mod tests { false, ) .unwrap(); - assert_scan(result, vec![vec![raw_expected.clone()]]); + assert_scan(result, vec![vec![raw_expected.clone()]], bins, 0, bins); let bins = 2; let accounts_db = AccountsDb::new_single_for_tests(); @@ -9824,7 +9813,7 @@ pub mod tests { expected[0].push(raw_expected[1].clone()); expected[bins - 1].push(raw_expected[2].clone()); expected[bins - 1].push(raw_expected[3].clone()); - assert_scan(result, vec![expected]); + assert_scan(result, vec![expected], bins, 0, bins); let bins = 4; let accounts_db = AccountsDb::new_single_for_tests(); @@ -9845,7 +9834,7 @@ pub mod tests { expected[1].push(raw_expected[1].clone()); expected[2].push(raw_expected[2].clone()); expected[bins - 1].push(raw_expected[3].clone()); - assert_scan(result, vec![expected]); + assert_scan(result, vec![expected], bins, 0, bins); let bins = 256; let accounts_db = AccountsDb::new_single_for_tests(); @@ -9866,7 +9855,7 @@ pub mod tests { expected[127].push(raw_expected[1].clone()); expected[128].push(raw_expected[2].clone()); expected[bins - 1].push(raw_expected.last().unwrap().clone()); - assert_scan(result, vec![expected]); + assert_scan(result, vec![expected], bins, 0, bins); } #[test] @@ -9894,11 +9883,8 @@ pub mod tests { false, ) .unwrap(); - assert_eq!(result.len(), 2); // 2 chunks - assert_eq!(result[0].len(), bins); - assert_eq!(0, result[0].iter().map(|x| x.len()).sum::()); // nothing found in bin 0 - assert_eq!(result[1].len(), bins); - assert_eq!(result[1], vec![raw_expected]); + + assert_scan(result, vec![vec![raw_expected]], bins, 0, bins); } #[test] @@ -9925,7 +9911,7 @@ pub mod tests { let mut expected = vec![Vec::new(); half_bins]; expected[0].push(raw_expected[0].clone()); expected[0].push(raw_expected[1].clone()); - assert_scan(result, vec![expected]); + assert_scan(result, vec![expected], bins, 0, half_bins); // just the second bin of 2 let accounts_db = AccountsDb::new_single_for_tests(); @@ -9946,7 +9932,7 @@ pub mod tests { let starting_bin_index = 0; expected[starting_bin_index].push(raw_expected[2].clone()); expected[starting_bin_index].push(raw_expected[3].clone()); - assert_scan(result, vec![expected]); + assert_scan(result, vec![expected], bins, 1, bins - 1); // 1 bin at a time of 4 let bins = 4; @@ -9967,7 +9953,7 @@ pub mod tests { .unwrap(); let mut expected = vec![Vec::new(); 1]; expected[0].push(expected_item.clone()); - assert_scan(result, vec![expected]); + assert_scan(result, vec![expected], bins, bin, 1); } let bins = 256; @@ -9989,10 +9975,22 @@ pub mod tests { .unwrap(); let mut expected = vec![]; if let Some(index) = bin_locations.iter().position(|&r| r == bin) { - expected = vec![vec![Vec::new(); range]]; - expected[0][0].push(raw_expected[index].clone()); + expected = vec![Vec::new(); range]; + expected[0].push(raw_expected[index].clone()); } - assert_eq!(result, expected); + let mut result2 = (0..range).map(|_| Vec::default()).collect::>(); + if let Some(m) = result.get(0) { + m.load_all( + &mut result2, + bin, + &PubkeyBinCalculator24::new(bins), + &mut CacheHashDataStats::default(), + ); + } else { + result2 = vec![]; + } + + assert_eq!(result2, expected); } } @@ -10024,13 +10022,18 @@ pub mod tests { false, ) .unwrap(); - assert_eq!(result.len(), 2); // 2 chunks - assert_eq!(result[0].len(), range); - assert_eq!(0, result[0].iter().map(|x| x.len()).sum::()); // nothing found in bin 0 + assert_eq!(result.len(), 1); // 2 chunks, but 1 is empty so not included let mut expected = vec![Vec::new(); range]; expected[0].push(raw_expected[1].clone()); - assert_eq!(result[1].len(), 1); - assert_eq!(result[1], expected); + let mut result2 = (0..range).map(|_| Vec::default()).collect::>(); + result[0].load_all( + &mut result2, + 0, + &PubkeyBinCalculator24::new(range), + &mut CacheHashDataStats::default(), + ); + assert_eq!(result2.len(), 1); + assert_eq!(result2, expected); } #[test] @@ -10170,7 +10173,6 @@ pub mod tests { &get_storage_refs(&storages), test_scan, &Range { start: 0, end: 1 }, - &PubkeyBinCalculator24::new(1), &HashStats::default(), ); assert_eq!(calls.load(Ordering::Relaxed), 1); @@ -10181,9 +10183,21 @@ pub mod tests { expected, pubkey, )]]], + 1, + 0, + 1, ); } + fn convert_to_slice( + input: &[Vec>], + ) -> Vec> { + input + .iter() + .map(|v| v.iter().map(|v| &v[..]).collect::>()) + .collect::>() + } + #[test] fn test_accountsdb_scan_account_storage_no_bank_one_slot() { solana_logger::setup(); diff --git a/runtime/src/accounts_hash.rs b/runtime/src/accounts_hash.rs index d6ff84fefc..3c44a0a53b 100644 --- a/runtime/src/accounts_hash.rs +++ b/runtime/src/accounts_hash.rs @@ -22,7 +22,7 @@ use { pub const MERKLE_FANOUT: usize = 16; /// the data passed through the processing functions -pub type SortedDataByPubkey = Vec>; +pub type SortedDataByPubkey<'a> = Vec<&'a [CalculateHashIntermediate]>; #[derive(Default, Debug)] pub struct PreviousPass { @@ -695,8 +695,7 @@ impl AccountsHash { /// return references to cache hash data, grouped by bin, sourced from 'sorted_data_by_pubkey', /// which is probably a mmapped file. - #[allow(dead_code)] - fn get_binned_data<'a>( + pub(crate) fn get_binned_data<'a>( sorted_data_by_pubkey: &'a Vec<&'a [CalculateHashIntermediate]>, bins: usize, bin_range: &Range, @@ -744,7 +743,7 @@ impl AccountsHash { fn de_dup_and_eliminate_zeros<'a>( &self, - sorted_data_by_pubkey: &'a [SortedDataByPubkey], + sorted_data_by_pubkey: &'a [SortedDataByPubkey<'a>], stats: &mut HashStats, max_bin: usize, ) -> (Vec>, u64) { @@ -792,7 +791,7 @@ impl AccountsHash { min_index: usize, bin: usize, first_items: &'a mut Vec, - pubkey_division: &'b [SortedDataByPubkey], + pubkey_division: &'b [SortedDataByPubkey<'b>], indexes: &'a mut [usize], first_item_to_pubkey_division: &'a mut Vec, ) -> &'b CalculateHashIntermediate { @@ -835,7 +834,7 @@ impl AccountsHash { // c. unreduced count (ie. including duplicates and zero lamport) fn de_dup_accounts_in_parallel<'a>( &self, - pubkey_division: &'a [SortedDataByPubkey], + pubkey_division: &'a [SortedDataByPubkey<'a>], pubkey_bin: usize, ) -> (Vec<&'a Hash>, u64, usize) { let len = pubkey_division.len(); @@ -850,7 +849,7 @@ impl AccountsHash { pubkey_division.iter().enumerate().for_each(|(i, bins)| { // check to make sure we can do bins[pubkey_bin] if bins.len() > pubkey_bin { - let sub = &bins[pubkey_bin]; + let sub = bins[pubkey_bin]; if !sub.is_empty() { item_len += bins[pubkey_bin].len(); // sum for metrics first_items.push(bins[pubkey_bin][0].pubkey); @@ -944,7 +943,7 @@ impl AccountsHash { // vec: [..] - items which fit in the containing bin. Sorted by: Pubkey, higher Slot, higher Write version (if pubkey =) pub fn rest_of_hash_calculation( &self, - data_sections_by_pubkey: Vec, + data_sections_by_pubkey: Vec>, mut stats: &mut HashStats, is_last_pass: bool, mut previous_state: PreviousPass, @@ -1077,8 +1076,8 @@ pub mod tests { assert_eq!(AccountsHash::div_ceil(10, 0), 0); } - fn for_rest(original: &[CalculateHashIntermediate]) -> Vec { - vec![vec![original.to_vec()]] + fn for_rest(original: &[CalculateHashIntermediate]) -> Vec> { + vec![vec![original]] } #[test] @@ -1150,8 +1149,10 @@ pub mod tests { 0 } - fn empty_data() -> Vec { - vec![vec![vec![]]] + const EMPTY_DATA: [CalculateHashIntermediate; 0] = []; + + fn empty_data() -> Vec> { + vec![vec![&EMPTY_DATA]] } #[test] @@ -1456,7 +1457,9 @@ pub mod tests { lamports: 1, ..CalculateHashIntermediate::default() }]]]; - let (hashes, lamports, _) = AccountsHash::default().de_dup_accounts_in_parallel(&vec, 0); + let temp_vec = vec.to_vec(); + let slice = convert_to_slice2(&temp_vec); + let (hashes, lamports, _) = AccountsHash::default().de_dup_accounts_in_parallel(&slice, 0); assert_eq!(vec![&Hash::default()], hashes); assert_eq!(lamports, 1); } @@ -1567,23 +1570,28 @@ pub mod tests { let slice2 = vec![vec![slice.to_vec()]]; let slice = &slice2[..]; - let (hashes2, lamports2, _) = hash.de_dup_accounts_in_parallel(slice, 0); - let (hashes3, lamports3, _) = hash.de_dup_accounts_in_parallel(slice, 0); + let slice_temp = convert_to_slice2(&slice2); + let (hashes2, lamports2, _) = hash.de_dup_accounts_in_parallel(&slice_temp, 0); + let slice3 = convert_to_slice2(&slice2); + let (hashes3, lamports3, _) = hash.de_dup_accounts_in_parallel(&slice3, 0); let vec = slice.to_vec(); + let slice4 = convert_to_slice2(&vec); let (hashes4, lamports4) = hash.de_dup_and_eliminate_zeros( - &vec, + &slice4, &mut HashStats::default(), end - start, ); let vec = slice.to_vec(); + let slice5 = convert_to_slice2(&vec); let (hashes5, lamports5) = hash.de_dup_and_eliminate_zeros( - &vec, + &slice5, &mut HashStats::default(), end - start, ); let vec = slice.to_vec(); + let slice5 = convert_to_slice2(&vec); let (hashes6, lamports6) = hash.de_dup_and_eliminate_zeros( - &vec, + &slice5, &mut HashStats::default(), end - start, ); @@ -1692,9 +1700,9 @@ pub mod tests { ); } - fn test_de_dup_accounts_in_parallel( - account_maps: &[SortedDataByPubkey], - ) -> (Vec<&Hash>, u64, usize) { + fn test_de_dup_accounts_in_parallel<'a>( + account_maps: &'a [SortedDataByPubkey<'a>], + ) -> (Vec<&'a Hash>, u64, usize) { AccountsHash::default().de_dup_accounts_in_parallel(account_maps, 0) } @@ -1709,7 +1717,8 @@ pub mod tests { account_maps.push(val.clone()); let vecs = vec![vec![account_maps.to_vec()]]; - let result = test_de_dup_accounts_in_parallel(&vecs); + let slice = convert_to_slice2(&vecs); + let result = test_de_dup_accounts_in_parallel(&slice); assert_eq!(result, (vec![&val.hash], val.lamports as u64, 1)); // zero original lamports, higher version @@ -1717,7 +1726,8 @@ pub mod tests { account_maps.push(val); // has to be after previous entry since account_maps are in slot order let vecs = vec![vec![account_maps.to_vec()]]; - let result = test_de_dup_accounts_in_parallel(&vecs); + let slice = convert_to_slice2(&vecs); + let result = test_de_dup_accounts_in_parallel(&slice); assert_eq!(result, (vec![], 0, 2)); } @@ -2087,7 +2097,22 @@ pub mod tests { ), CalculateHashIntermediate::new(Hash::new(&[2u8; 32]), offset + 1, Pubkey::new_unique()), ]; - AccountsHash::default().de_dup_accounts_in_parallel(&[vec![input]], 0); + AccountsHash::default().de_dup_accounts_in_parallel(&[convert_to_slice(&[input])], 0); + } + + fn convert_to_slice( + input: &[Vec], + ) -> Vec<&[CalculateHashIntermediate]> { + input.iter().map(|v| &v[..]).collect::>() + } + + fn convert_to_slice2( + input: &[Vec>], + ) -> Vec> { + input + .iter() + .map(|v| v.iter().map(|v| &v[..]).collect::>()) + .collect::>() } #[test] @@ -2109,7 +2134,7 @@ pub mod tests { )], ]; AccountsHash::default().de_dup_and_eliminate_zeros( - &[input], + &[convert_to_slice(&input)], &mut HashStats::default(), 2, // accounts above are in 2 groups ); diff --git a/runtime/src/cache_hash_data.rs b/runtime/src/cache_hash_data.rs index 91b9baff44..333d455df2 100644 --- a/runtime/src/cache_hash_data.rs +++ b/runtime/src/cache_hash_data.rs @@ -1,9 +1,8 @@ //! Cached data for hashing accounts +#[cfg(test)] +use crate::pubkey_bins::PubkeyBinCalculator24; use { - crate::{ - accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats, - pubkey_bins::PubkeyBinCalculator24, - }, + crate::{accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats}, memmap2::MmapMut, solana_measure::measure::Measure, std::{ @@ -36,6 +35,7 @@ impl CacheHashDataFile { self.get_slice(0) } + #[cfg(test)] /// Populate 'accumulator' from entire contents of the cache file. pub(crate) fn load_all( &self, @@ -201,6 +201,7 @@ impl CacheHashData { parent_folder.as_ref().join("calculate_accounts_hash_cache") } + #[cfg(test)] /// load from 'file_name' into 'accumulator' pub(crate) fn load + std::fmt::Debug>( &self,