combine flatten and hash (#15269)

This commit is contained in:
Jeff Washington (jwash) 2021-02-12 11:35:54 -06:00 committed by GitHub
parent 830ef2bae4
commit 8944efddf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 298 additions and 47 deletions

View File

@ -176,7 +176,6 @@ pub struct HashStats {
pub hash_time_total_us: u64,
pub sort_time_total_us: u64,
pub flatten_time_total_us: u64,
pub flatten_after_zeros_time_total_us: u64,
pub pre_scan_flatten_time_total_us: u64,
pub hash_total: usize,
pub unreduced_entries: usize,
@ -189,7 +188,6 @@ impl HashStats {
+ self.hash_time_total_us
+ self.sort_time_total_us
+ self.flatten_time_total_us
+ self.flatten_after_zeros_time_total_us
+ self.pre_scan_flatten_time_total_us;
datapoint_info!(
"calculate_accounts_hash_without_index",
@ -199,11 +197,6 @@ impl HashStats {
("sort", self.sort_time_total_us, i64),
("hash_total", self.hash_total, i64),
("flatten", self.flatten_time_total_us, i64),
(
"flatten_after_zeros",
self.flatten_after_zeros_time_total_us,
i64
),
("unreduced_entries", self.unreduced_entries as i64, i64),
(
"num_snapshot_storage",
@ -241,6 +234,70 @@ impl CalculateHashIntermediate {
}
}
#[derive(Default, Debug)]
struct CumulativeOffset1D {
pub index: usize,
pub start_offset: usize,
}
impl CumulativeOffset1D {
pub fn new(index: usize, start_offset: usize) -> CumulativeOffset1D {
Self {
index,
start_offset,
}
}
}
// Allow retreiving &[start..end] from a logical src: Vec<T>, where src is really Vec<Vec<T>> (or later Vec<Vec<Vec<T>>>)
// This model prevents callers from having to flatten which saves both working memory and time.
#[derive(Default, Debug)]
struct CumulativeOffsets1D {
cumulative_offsets: Vec<CumulativeOffset1D>,
total_count: usize,
}
impl CumulativeOffsets1D {
pub fn from_raw<T>(raw: &[Vec<T>]) -> CumulativeOffsets1D {
let mut total_count: usize = 0;
let cumulative_offsets: Vec<_> = raw
.iter()
.enumerate()
.filter_map(|(i, v)| {
let len = v.len();
if len > 0 {
let result = CumulativeOffset1D::new(i, total_count);
total_count += len;
Some(result)
} else {
None
}
})
.collect();
Self {
cumulative_offsets,
total_count,
}
}
// return the biggest slice possible that starts at 'start'
pub fn get_slice<'a, T>(&self, raw: &'a [Vec<T>], start: usize) -> &'a [T] {
// This could be binary search, but we expect a small number of vectors.
for i in (0..self.cumulative_offsets.len()).into_iter().rev() {
let index = &self.cumulative_offsets[i];
if start >= index.start_offset {
let start = start - index.start_offset;
return &raw[index.index][start..];
}
}
panic!(
"get_slice didn't find: {}, len: {}",
start, self.total_count
);
}
}
trait Versioned {
fn version(&self) -> u64;
}
@ -3504,6 +3561,64 @@ impl AccountsDB {
}
}
// This function is designed to allow hashes to be located in multiple, perhaps multiply deep vecs.
// The caller provides a function to return a slice from the source data.
fn compute_merkle_root_from_slices<'a, F>(
total_hashes: usize,
fanout: usize,
get_hashes: F,
) -> Hash
where
F: Fn(usize) -> &'a [Hash] + std::marker::Sync,
{
if total_hashes == 0 {
return Hasher::default().result();
}
let mut time = Measure::start("time");
let chunks = Self::div_ceil(total_hashes, fanout);
// initial fetch - could return entire slice
let data: &[Hash] = get_hashes(0);
let data_len = data.len();
let result: Vec<_> = (0..chunks)
.into_par_iter()
.map(|i| {
let start_index = i * fanout;
let end_index = std::cmp::min(start_index + fanout, total_hashes);
let mut hasher = Hasher::default();
let mut data_index = start_index;
let mut data = data;
let mut data_len = data_len;
for i in start_index..end_index {
if data_index >= data_len {
// fetch next slice
data = get_hashes(i);
data_len = data.len();
data_index = 0;
}
hasher.hash(data[data_index].as_ref());
data_index += 1;
}
(hasher.result(), 0)
})
.collect();
time.stop();
debug!("hashing {} {}", total_hashes, time);
if result.len() == 1 {
result[0].0
} else {
Self::compute_merkle_root_and_capitalization_recurse(result, fanout).0
}
}
fn accumulate_account_hashes(mut hashes: Vec<(Pubkey, Hash)>) -> Hash {
Self::sort_hashes_by_pubkey(&mut hashes);
@ -3573,16 +3688,25 @@ impl AccountsDB {
.cloned()
.collect();
let mismatch_found = AtomicU64::new(0);
let hashes: Vec<(Hash, u64)> = {
// Pick a chunk size big enough to allow us to produce output vectors that are smaller than the overall size.
// We'll also accumulate the lamports within each chunk and fewer chunks results in less contention to accumulate the sum.
let chunks = MERKLE_FANOUT.pow(4);
let total_lamports = Mutex::<u64>::new(0);
let hashes: Vec<Vec<Hash>> = {
self.thread_pool_clean.install(|| {
keys.par_iter()
.filter_map(|pubkey| {
if let Some((lock, index)) =
self.accounts_index.get(pubkey, Some(ancestors), Some(slot))
{
let (slot, account_info) = &lock.slot_list()[index];
if account_info.lamports != 0 {
self.get_account_accessor_from_cache_or_storage(
keys.par_chunks(chunks)
.map(|pubkeys| {
let mut sum = 0u128;
let result: Vec<Hash> =
pubkeys
.iter()
.filter_map(|pubkey| {
if let Some((lock, index)) =
self.accounts_index.get(pubkey, Some(ancestors), Some(slot))
{
let (slot, account_info) = &lock.slot_list()[index];
if account_info.lamports != 0 {
self.get_account_accessor_from_cache_or_storage(
*slot,
pubkey,
account_info.store_id,
@ -3612,14 +3736,20 @@ impl AccountsDB {
}
}
Some((*loaded_hash, balance))
sum += balance as u128;
Some(*loaded_hash)
})
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
}
})
.collect();
let mut total = total_lamports.lock().unwrap();
*total = Self::checked_cast_for_capitalization(*total as u128 + sum);
result
})
.collect()
})
@ -3632,11 +3762,16 @@ impl AccountsDB {
return Err(MismatchedAccountHash);
}
let cumulative_offsets = CumulativeOffsets1D::from_raw(&hashes);
scan.stop();
let hash_total = hashes.len();
let hash_total = cumulative_offsets.total_count;
let total_lamports = *total_lamports.lock().unwrap();
let mut hash_time = Measure::start("hash");
let (accumulated_hash, total_lamports) =
Self::compute_merkle_root_and_capitalization_recurse(hashes, MERKLE_FANOUT);
let accumulated_hash =
Self::compute_merkle_root_from_slices(hash_total, MERKLE_FANOUT, |start: usize| {
cumulative_offsets.get_slice(&hashes, start)
});
hash_time.stop();
datapoint_info!(
"update_accounts_hash",
@ -3861,15 +3996,23 @@ impl AccountsDB {
(result, sum)
}
fn flatten_hashes(hashes: Vec<Vec<Hash>>, stats: &mut HashStats) -> Vec<Hash> {
fn flatten_hashes_and_hash(
hashes: Vec<Vec<Hash>>,
fanout: usize,
stats: &mut HashStats,
) -> Hash {
// flatten vec/vec into 1d vec of hashes in order
let mut flat2_time = Measure::start("flat2");
let hashes: Vec<Hash> = hashes.into_iter().flatten().collect();
flat2_time.stop();
stats.flatten_after_zeros_time_total_us += flat2_time.as_us();
stats.hash_total = hashes.len();
let mut hash_time = Measure::start("flat2");
hashes
let offsets = CumulativeOffsets1D::from_raw(&hashes);
let get_slice = |start: usize| -> &[Hash] { offsets.get_slice(&hashes, start) };
let hash = Self::compute_merkle_root_from_slices(offsets.total_count, fanout, get_slice);
hash_time.stop();
stats.hash_time_total_us += hash_time.as_us();
stats.hash_total = offsets.total_count;
hash
}
// input:
@ -3888,16 +4031,10 @@ impl AccountsDB {
let (hashes, total_lamports) =
Self::de_dup_and_eliminate_zeros(sorted_data_by_pubkey, &mut stats);
let hashes = Self::flatten_hashes(hashes, &mut stats);
let hash = Self::flatten_hashes_and_hash(hashes, MERKLE_FANOUT, &mut stats);
let mut hash_time = Measure::start("hashes");
let (hash, _) =
Self::compute_merkle_root_and_capitalization_loop(hashes, MERKLE_FANOUT, |t: &Hash| {
(*t, 0)
});
hash_time.stop();
stats.hash_time_total_us += hash_time.as_us();
stats.log();
(hash, total_lamports)
}
@ -5240,6 +5377,59 @@ pub mod tests {
ancestors
}
#[test]
fn test_accountsdb_cumulative_offsets1_d() {
let input = vec![vec![0, 1], vec![], vec![2, 3, 4], vec![]];
let cumulative = CumulativeOffsets1D::from_raw(&input);
let src: Vec<_> = input.clone().into_iter().flatten().collect();
let len = src.len();
assert_eq!(cumulative.total_count, len);
assert_eq!(cumulative.cumulative_offsets.len(), 2); // 2 non-empty vectors
assert_eq!(cumulative.cumulative_offsets[0].index, 0);
assert_eq!(cumulative.cumulative_offsets[1].index, 2);
assert_eq!(cumulative.cumulative_offsets[0].start_offset, 0);
assert_eq!(cumulative.cumulative_offsets[1].start_offset, 2);
for start in 0..len {
let slice = cumulative.get_slice(&input, start);
let len = slice.len();
assert!(len > 0);
assert_eq!(&src[start..(start + len)], slice);
}
let input = vec![vec![], vec![0, 1], vec![], vec![2, 3, 4], vec![]];
let cumulative = CumulativeOffsets1D::from_raw(&input);
let src: Vec<_> = input.clone().into_iter().flatten().collect();
let len = src.len();
assert_eq!(cumulative.total_count, len);
assert_eq!(cumulative.cumulative_offsets.len(), 2); // 2 non-empty vectors
assert_eq!(cumulative.cumulative_offsets[0].index, 1);
assert_eq!(cumulative.cumulative_offsets[1].index, 3);
assert_eq!(cumulative.cumulative_offsets[0].start_offset, 0);
assert_eq!(cumulative.cumulative_offsets[1].start_offset, 2);
for start in 0..len {
let slice = cumulative.get_slice(&input, start);
let len = slice.len();
assert!(len > 0);
assert_eq!(&src[start..(start + len)], slice);
}
let input: Vec<Vec<u32>> = vec![vec![]];
let cumulative = CumulativeOffsets1D::from_raw(&input);
let src: Vec<_> = input.into_iter().flatten().collect();
let len = src.len();
assert_eq!(cumulative.total_count, len);
assert_eq!(cumulative.cumulative_offsets.len(), 0); // 2 non-empty vectors
}
#[test]
fn test_accountsdb_div_ceil() {
assert_eq!(AccountsDB::div_ceil(10, 3), 4);
@ -5467,26 +5657,36 @@ pub mod tests {
}
#[test]
fn test_accountsdb_flatten_hashes() {
fn test_accountsdb_flatten_hashes_and_hash() {
solana_logger::setup();
const COUNT: usize = 4;
let hashes: Vec<_> = (0..COUNT)
.into_iter()
.map(|i| Hash::new(&[(i) as u8; 32]))
.collect();
let expected = hashes.clone();
let expected = AccountsDB::compute_merkle_root_and_capitalization_loop(
hashes.clone(),
MERKLE_FANOUT,
|i| (*i, 0),
)
.0;
assert_eq!(
AccountsDB::flatten_hashes(vec![hashes.clone()], &mut HashStats::default()),
expected
AccountsDB::flatten_hashes_and_hash(
vec![hashes.clone()],
MERKLE_FANOUT,
&mut HashStats::default()
),
expected,
);
for in_first in 1..COUNT - 1 {
assert_eq!(
AccountsDB::flatten_hashes(
AccountsDB::flatten_hashes_and_hash(
vec![
hashes.clone()[0..in_first].to_vec(),
hashes.clone()[in_first..COUNT].to_vec()
],
MERKLE_FANOUT,
&mut HashStats::default()
),
expected
@ -5684,6 +5884,56 @@ pub mod tests {
hashes.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
}
fn test_hashing_larger(hashes: Vec<(Pubkey, Hash, u64)>, fanout: usize) -> (Hash, u64) {
let result = AccountsDB::compute_merkle_root_and_capitalization(hashes.clone(), fanout);
if hashes.len() >= fanout * fanout * fanout {
let reduced: Vec<_> = hashes.iter().map(|x| x.1).collect();
let result2 =
AccountsDB::compute_merkle_root_from_slices(hashes.len(), fanout, |start| {
&reduced[start..]
});
assert_eq!(result.0, result2);
let reduced2: Vec<_> = hashes.iter().map(|x| vec![x.1]).collect();
let result2 =
AccountsDB::flatten_hashes_and_hash(reduced2, fanout, &mut HashStats::default());
assert_eq!(result.0, result2);
}
result
}
fn test_hashing(hashes: Vec<Hash>, fanout: usize) -> (Hash, u64) {
let temp: Vec<_> = hashes.iter().map(|h| (Pubkey::default(), *h, 0)).collect();
let result = AccountsDB::compute_merkle_root_and_capitalization(temp, fanout);
if hashes.len() >= fanout * fanout * fanout {
let reduced: Vec<_> = hashes.clone();
let result2 =
AccountsDB::compute_merkle_root_from_slices(hashes.len(), fanout, |start| {
&reduced[start..]
});
assert_eq!(result.0, result2, "len: {}", hashes.len());
let reduced2: Vec<_> = hashes.iter().map(|x| vec![*x]).collect();
let result2 =
AccountsDB::flatten_hashes_and_hash(reduced2, fanout, &mut HashStats::default());
assert_eq!(result.0, result2, "len: {}", hashes.len());
}
result
}
#[test]
fn test_accountsdb_compute_merkle_root_and_capitalization_large() {
solana_logger::setup();
let mut num = 100;
for _pass in 0..2 {
num *= 10;
let hashes: Vec<_> = (0..num).into_iter().map(|_| Hash::new_unique()).collect();
test_hashing(hashes, MERKLE_FANOUT);
}
}
#[test]
fn test_accountsdb_compute_merkle_root_and_capitalization() {
solana_logger::setup();
@ -5730,8 +5980,9 @@ pub mod tests {
(key, hash, i as u64)
})
.collect();
let result = if pass == 0 {
AccountsDB::compute_merkle_root_and_capitalization(input.clone(), fanout)
test_hashing_larger(input.clone(), fanout)
} else {
// this sorts inside
let early_result = AccountsDB::accumulate_account_hashes(