From 8944efddf7f80e1bf7cb52f9f00d07a9a8378729 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Fri, 12 Feb 2021 11:35:54 -0600 Subject: [PATCH] combine flatten and hash (#15269) --- runtime/src/accounts_db.rs | 345 ++++++++++++++++++++++++++++++++----- 1 file changed, 298 insertions(+), 47 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 9cee6f985..e349765a5 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -176,7 +176,6 @@ pub struct HashStats { pub hash_time_total_us: u64, pub sort_time_total_us: u64, pub flatten_time_total_us: u64, - pub flatten_after_zeros_time_total_us: u64, pub pre_scan_flatten_time_total_us: u64, pub hash_total: usize, pub unreduced_entries: usize, @@ -189,7 +188,6 @@ impl HashStats { + self.hash_time_total_us + self.sort_time_total_us + self.flatten_time_total_us - + self.flatten_after_zeros_time_total_us + self.pre_scan_flatten_time_total_us; datapoint_info!( "calculate_accounts_hash_without_index", @@ -199,11 +197,6 @@ impl HashStats { ("sort", self.sort_time_total_us, i64), ("hash_total", self.hash_total, i64), ("flatten", self.flatten_time_total_us, i64), - ( - "flatten_after_zeros", - self.flatten_after_zeros_time_total_us, - i64 - ), ("unreduced_entries", self.unreduced_entries as i64, i64), ( "num_snapshot_storage", @@ -241,6 +234,70 @@ impl CalculateHashIntermediate { } } +#[derive(Default, Debug)] +struct CumulativeOffset1D { + pub index: usize, + pub start_offset: usize, +} + +impl CumulativeOffset1D { + pub fn new(index: usize, start_offset: usize) -> CumulativeOffset1D { + Self { + index, + start_offset, + } + } +} + +// Allow retreiving &[start..end] from a logical src: Vec, where src is really Vec> (or later Vec>>) +// This model prevents callers from having to flatten which saves both working memory and time. +#[derive(Default, Debug)] +struct CumulativeOffsets1D { + cumulative_offsets: Vec, + total_count: usize, +} + +impl CumulativeOffsets1D { + pub fn from_raw(raw: &[Vec]) -> CumulativeOffsets1D { + let mut total_count: usize = 0; + let cumulative_offsets: Vec<_> = raw + .iter() + .enumerate() + .filter_map(|(i, v)| { + let len = v.len(); + if len > 0 { + let result = CumulativeOffset1D::new(i, total_count); + total_count += len; + Some(result) + } else { + None + } + }) + .collect(); + + Self { + cumulative_offsets, + total_count, + } + } + + // return the biggest slice possible that starts at 'start' + pub fn get_slice<'a, T>(&self, raw: &'a [Vec], start: usize) -> &'a [T] { + // This could be binary search, but we expect a small number of vectors. + for i in (0..self.cumulative_offsets.len()).into_iter().rev() { + let index = &self.cumulative_offsets[i]; + if start >= index.start_offset { + let start = start - index.start_offset; + return &raw[index.index][start..]; + } + } + panic!( + "get_slice didn't find: {}, len: {}", + start, self.total_count + ); + } +} + trait Versioned { fn version(&self) -> u64; } @@ -3504,6 +3561,64 @@ impl AccountsDB { } } + // This function is designed to allow hashes to be located in multiple, perhaps multiply deep vecs. + // The caller provides a function to return a slice from the source data. + fn compute_merkle_root_from_slices<'a, F>( + total_hashes: usize, + fanout: usize, + get_hashes: F, + ) -> Hash + where + F: Fn(usize) -> &'a [Hash] + std::marker::Sync, + { + if total_hashes == 0 { + return Hasher::default().result(); + } + + let mut time = Measure::start("time"); + + let chunks = Self::div_ceil(total_hashes, fanout); + + // initial fetch - could return entire slice + let data: &[Hash] = get_hashes(0); + let data_len = data.len(); + + let result: Vec<_> = (0..chunks) + .into_par_iter() + .map(|i| { + let start_index = i * fanout; + let end_index = std::cmp::min(start_index + fanout, total_hashes); + + let mut hasher = Hasher::default(); + let mut data_index = start_index; + let mut data = data; + let mut data_len = data_len; + + for i in start_index..end_index { + if data_index >= data_len { + // fetch next slice + data = get_hashes(i); + data_len = data.len(); + data_index = 0; + } + + hasher.hash(data[data_index].as_ref()); + data_index += 1; + } + + (hasher.result(), 0) + }) + .collect(); + time.stop(); + debug!("hashing {} {}", total_hashes, time); + + if result.len() == 1 { + result[0].0 + } else { + Self::compute_merkle_root_and_capitalization_recurse(result, fanout).0 + } + } + fn accumulate_account_hashes(mut hashes: Vec<(Pubkey, Hash)>) -> Hash { Self::sort_hashes_by_pubkey(&mut hashes); @@ -3573,16 +3688,25 @@ impl AccountsDB { .cloned() .collect(); let mismatch_found = AtomicU64::new(0); - let hashes: Vec<(Hash, u64)> = { + // Pick a chunk size big enough to allow us to produce output vectors that are smaller than the overall size. + // We'll also accumulate the lamports within each chunk and fewer chunks results in less contention to accumulate the sum. + let chunks = MERKLE_FANOUT.pow(4); + let total_lamports = Mutex::::new(0); + let hashes: Vec> = { self.thread_pool_clean.install(|| { - keys.par_iter() - .filter_map(|pubkey| { - if let Some((lock, index)) = - self.accounts_index.get(pubkey, Some(ancestors), Some(slot)) - { - let (slot, account_info) = &lock.slot_list()[index]; - if account_info.lamports != 0 { - self.get_account_accessor_from_cache_or_storage( + keys.par_chunks(chunks) + .map(|pubkeys| { + let mut sum = 0u128; + let result: Vec = + pubkeys + .iter() + .filter_map(|pubkey| { + if let Some((lock, index)) = + self.accounts_index.get(pubkey, Some(ancestors), Some(slot)) + { + let (slot, account_info) = &lock.slot_list()[index]; + if account_info.lamports != 0 { + self.get_account_accessor_from_cache_or_storage( *slot, pubkey, account_info.store_id, @@ -3612,14 +3736,20 @@ impl AccountsDB { } } - Some((*loaded_hash, balance)) + sum += balance as u128; + Some(*loaded_hash) }) - } else { - None - } - } else { - None - } + } else { + None + } + } else { + None + } + }) + .collect(); + let mut total = total_lamports.lock().unwrap(); + *total = Self::checked_cast_for_capitalization(*total as u128 + sum); + result }) .collect() }) @@ -3632,11 +3762,16 @@ impl AccountsDB { return Err(MismatchedAccountHash); } + let cumulative_offsets = CumulativeOffsets1D::from_raw(&hashes); + scan.stop(); - let hash_total = hashes.len(); + let hash_total = cumulative_offsets.total_count; + let total_lamports = *total_lamports.lock().unwrap(); let mut hash_time = Measure::start("hash"); - let (accumulated_hash, total_lamports) = - Self::compute_merkle_root_and_capitalization_recurse(hashes, MERKLE_FANOUT); + let accumulated_hash = + Self::compute_merkle_root_from_slices(hash_total, MERKLE_FANOUT, |start: usize| { + cumulative_offsets.get_slice(&hashes, start) + }); hash_time.stop(); datapoint_info!( "update_accounts_hash", @@ -3861,15 +3996,23 @@ impl AccountsDB { (result, sum) } - fn flatten_hashes(hashes: Vec>, stats: &mut HashStats) -> Vec { + fn flatten_hashes_and_hash( + hashes: Vec>, + fanout: usize, + stats: &mut HashStats, + ) -> Hash { // flatten vec/vec into 1d vec of hashes in order - let mut flat2_time = Measure::start("flat2"); - let hashes: Vec = hashes.into_iter().flatten().collect(); - flat2_time.stop(); - stats.flatten_after_zeros_time_total_us += flat2_time.as_us(); - stats.hash_total = hashes.len(); + let mut hash_time = Measure::start("flat2"); - hashes + let offsets = CumulativeOffsets1D::from_raw(&hashes); + + let get_slice = |start: usize| -> &[Hash] { offsets.get_slice(&hashes, start) }; + let hash = Self::compute_merkle_root_from_slices(offsets.total_count, fanout, get_slice); + hash_time.stop(); + stats.hash_time_total_us += hash_time.as_us(); + stats.hash_total = offsets.total_count; + + hash } // input: @@ -3888,16 +4031,10 @@ impl AccountsDB { let (hashes, total_lamports) = Self::de_dup_and_eliminate_zeros(sorted_data_by_pubkey, &mut stats); - let hashes = Self::flatten_hashes(hashes, &mut stats); + let hash = Self::flatten_hashes_and_hash(hashes, MERKLE_FANOUT, &mut stats); - let mut hash_time = Measure::start("hashes"); - let (hash, _) = - Self::compute_merkle_root_and_capitalization_loop(hashes, MERKLE_FANOUT, |t: &Hash| { - (*t, 0) - }); - hash_time.stop(); - stats.hash_time_total_us += hash_time.as_us(); stats.log(); + (hash, total_lamports) } @@ -5240,6 +5377,59 @@ pub mod tests { ancestors } + #[test] + fn test_accountsdb_cumulative_offsets1_d() { + let input = vec![vec![0, 1], vec![], vec![2, 3, 4], vec![]]; + let cumulative = CumulativeOffsets1D::from_raw(&input); + + let src: Vec<_> = input.clone().into_iter().flatten().collect(); + let len = src.len(); + assert_eq!(cumulative.total_count, len); + assert_eq!(cumulative.cumulative_offsets.len(), 2); // 2 non-empty vectors + + assert_eq!(cumulative.cumulative_offsets[0].index, 0); + assert_eq!(cumulative.cumulative_offsets[1].index, 2); + + assert_eq!(cumulative.cumulative_offsets[0].start_offset, 0); + assert_eq!(cumulative.cumulative_offsets[1].start_offset, 2); + + for start in 0..len { + let slice = cumulative.get_slice(&input, start); + let len = slice.len(); + assert!(len > 0); + assert_eq!(&src[start..(start + len)], slice); + } + + let input = vec![vec![], vec![0, 1], vec![], vec![2, 3, 4], vec![]]; + let cumulative = CumulativeOffsets1D::from_raw(&input); + + let src: Vec<_> = input.clone().into_iter().flatten().collect(); + let len = src.len(); + assert_eq!(cumulative.total_count, len); + assert_eq!(cumulative.cumulative_offsets.len(), 2); // 2 non-empty vectors + + assert_eq!(cumulative.cumulative_offsets[0].index, 1); + assert_eq!(cumulative.cumulative_offsets[1].index, 3); + + assert_eq!(cumulative.cumulative_offsets[0].start_offset, 0); + assert_eq!(cumulative.cumulative_offsets[1].start_offset, 2); + + for start in 0..len { + let slice = cumulative.get_slice(&input, start); + let len = slice.len(); + assert!(len > 0); + assert_eq!(&src[start..(start + len)], slice); + } + + let input: Vec> = vec![vec![]]; + let cumulative = CumulativeOffsets1D::from_raw(&input); + + let src: Vec<_> = input.into_iter().flatten().collect(); + let len = src.len(); + assert_eq!(cumulative.total_count, len); + assert_eq!(cumulative.cumulative_offsets.len(), 0); // 2 non-empty vectors + } + #[test] fn test_accountsdb_div_ceil() { assert_eq!(AccountsDB::div_ceil(10, 3), 4); @@ -5467,26 +5657,36 @@ pub mod tests { } #[test] - fn test_accountsdb_flatten_hashes() { + fn test_accountsdb_flatten_hashes_and_hash() { solana_logger::setup(); const COUNT: usize = 4; let hashes: Vec<_> = (0..COUNT) .into_iter() .map(|i| Hash::new(&[(i) as u8; 32])) .collect(); - let expected = hashes.clone(); + let expected = AccountsDB::compute_merkle_root_and_capitalization_loop( + hashes.clone(), + MERKLE_FANOUT, + |i| (*i, 0), + ) + .0; assert_eq!( - AccountsDB::flatten_hashes(vec![hashes.clone()], &mut HashStats::default()), - expected + AccountsDB::flatten_hashes_and_hash( + vec![hashes.clone()], + MERKLE_FANOUT, + &mut HashStats::default() + ), + expected, ); for in_first in 1..COUNT - 1 { assert_eq!( - AccountsDB::flatten_hashes( + AccountsDB::flatten_hashes_and_hash( vec![ hashes.clone()[0..in_first].to_vec(), hashes.clone()[in_first..COUNT].to_vec() ], + MERKLE_FANOUT, &mut HashStats::default() ), expected @@ -5684,6 +5884,56 @@ pub mod tests { hashes.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); } + fn test_hashing_larger(hashes: Vec<(Pubkey, Hash, u64)>, fanout: usize) -> (Hash, u64) { + let result = AccountsDB::compute_merkle_root_and_capitalization(hashes.clone(), fanout); + if hashes.len() >= fanout * fanout * fanout { + let reduced: Vec<_> = hashes.iter().map(|x| x.1).collect(); + let result2 = + AccountsDB::compute_merkle_root_from_slices(hashes.len(), fanout, |start| { + &reduced[start..] + }); + assert_eq!(result.0, result2); + + let reduced2: Vec<_> = hashes.iter().map(|x| vec![x.1]).collect(); + let result2 = + AccountsDB::flatten_hashes_and_hash(reduced2, fanout, &mut HashStats::default()); + assert_eq!(result.0, result2); + } + result + } + + fn test_hashing(hashes: Vec, fanout: usize) -> (Hash, u64) { + let temp: Vec<_> = hashes.iter().map(|h| (Pubkey::default(), *h, 0)).collect(); + let result = AccountsDB::compute_merkle_root_and_capitalization(temp, fanout); + if hashes.len() >= fanout * fanout * fanout { + let reduced: Vec<_> = hashes.clone(); + let result2 = + AccountsDB::compute_merkle_root_from_slices(hashes.len(), fanout, |start| { + &reduced[start..] + }); + assert_eq!(result.0, result2, "len: {}", hashes.len()); + + let reduced2: Vec<_> = hashes.iter().map(|x| vec![*x]).collect(); + let result2 = + AccountsDB::flatten_hashes_and_hash(reduced2, fanout, &mut HashStats::default()); + assert_eq!(result.0, result2, "len: {}", hashes.len()); + } + result + } + + #[test] + fn test_accountsdb_compute_merkle_root_and_capitalization_large() { + solana_logger::setup(); + + let mut num = 100; + for _pass in 0..2 { + num *= 10; + let hashes: Vec<_> = (0..num).into_iter().map(|_| Hash::new_unique()).collect(); + + test_hashing(hashes, MERKLE_FANOUT); + } + } + #[test] fn test_accountsdb_compute_merkle_root_and_capitalization() { solana_logger::setup(); @@ -5730,8 +5980,9 @@ pub mod tests { (key, hash, i as u64) }) .collect(); + let result = if pass == 0 { - AccountsDB::compute_merkle_root_and_capitalization(input.clone(), fanout) + test_hashing_larger(input.clone(), fanout) } else { // this sorts inside let early_result = AccountsDB::accumulate_account_hashes(