use files for merkle tree inputs on accounts hash (#28965)

* use files for merkle tree inputs on accounts hash

* remove multi-pass scan, simplify

* cleanup and comments

* rework writing into the hash file for clarity

* add test, cleanup

* rename

* move count into option
This commit is contained in:
Jeff Washington (jwash) 2022-11-30 14:27:27 -06:00 committed by GitHub
parent 07d21d4114
commit 2c912c9049
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 323 additions and 533 deletions

View File

@ -25,7 +25,7 @@ use {
accounts_cache::{AccountsCache, CachedAccount, SlotCache},
accounts_hash::{
AccountsHash, AccountsHasher, CalcAccountsHashConfig, CalculateHashIntermediate,
HashStats, PreviousPass,
HashStats,
},
accounts_index::{
AccountIndexGetResult, AccountSecondaryIndexes, AccountsIndex, AccountsIndexConfig,
@ -116,7 +116,6 @@ pub const DEFAULT_NUM_DIRS: u32 = 4;
// When calculating hashes, it is helpful to break the pubkeys found into bins based on the pubkey value.
// More bins means smaller vectors to sort, copy, etc.
pub const PUBKEY_BINS_FOR_CALCULATING_HASHES: usize = 65536;
pub const NUM_SCAN_PASSES_DEFAULT: usize = 2;
// Without chunks, we end up with 1 output vec for each outer snapshot storage.
// This results in too many vectors to be efficient.
@ -2277,23 +2276,6 @@ impl AccountsDb {
Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None)
}
/// return (num_hash_scan_passes, bins_per_pass)
fn bins_per_pass() -> (usize, usize) {
let num_hash_scan_passes = NUM_SCAN_PASSES_DEFAULT;
let bins_per_pass = PUBKEY_BINS_FOR_CALCULATING_HASHES / num_hash_scan_passes;
assert!(
num_hash_scan_passes <= PUBKEY_BINS_FOR_CALCULATING_HASHES,
"num_hash_scan_passes must be <= {}",
PUBKEY_BINS_FOR_CALCULATING_HASHES
);
assert_eq!(
bins_per_pass * num_hash_scan_passes,
PUBKEY_BINS_FOR_CALCULATING_HASHES
); // evenly divisible
(num_hash_scan_passes, bins_per_pass)
}
fn default_with_accounts_index(
accounts_index: AccountInfoAccountsIndex,
accounts_hash_cache_path: Option<PathBuf>,
@ -7615,63 +7597,49 @@ impl AccountsDb {
self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);
let (num_hash_scan_passes, bins_per_pass) = Self::bins_per_pass();
let use_bg_thread_pool = config.use_bg_thread_pool;
let mut scan_and_hash = || {
let mut previous_pass = PreviousPass::default();
let mut final_result = (Hash::default(), 0);
let cache_hash_data = self.get_cache_hash_data(config, storages.max_slot_inclusive());
for pass in 0..num_hash_scan_passes {
let bounds = Range {
start: pass * bins_per_pass,
end: (pass + 1) * bins_per_pass,
};
let bounds = Range {
start: 0,
end: PUBKEY_BINS_FOR_CALCULATING_HASHES,
};
let hash = AccountsHasher {
filler_account_suffix: if self.filler_accounts_config.count > 0 {
self.filler_account_suffix
} else {
None
},
};
let hash = AccountsHasher {
filler_account_suffix: if self.filler_accounts_config.count > 0 {
self.filler_account_suffix
} else {
None
},
};
// get raw data by scanning
let result = self.scan_snapshot_stores_with_cache(
&cache_hash_data,
storages,
&mut stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
config,
hash.filler_account_suffix.as_ref(),
)?;
// get raw data by scanning
let result = self.scan_snapshot_stores_with_cache(
&cache_hash_data,
storages,
&mut stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
config,
hash.filler_account_suffix.as_ref(),
)?;
// convert mmapped cache files into slices of data
let slices = result
.iter()
.map(|d| d.get_cache_hash_data())
.collect::<Vec<_>>();
// convert mmapped cache files into slices of data
let slices = result
.iter()
.map(|d| d.get_cache_hash_data())
.collect::<Vec<_>>();
// rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation'
let result = AccountsHasher::get_binned_data(
&slices,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
);
// rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation'
let result = AccountsHasher::get_binned_data(
&slices,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
);
// turn raw data into merkle tree hashes and sum of lamports
let (hash, lamports, for_next_pass) = hash.rest_of_hash_calculation(
result,
&mut stats,
pass == num_hash_scan_passes - 1,
previous_pass,
bins_per_pass,
);
previous_pass = for_next_pass;
final_result = (hash, lamports);
}
// turn raw data into merkle tree hashes and sum of lamports
let final_result = hash.rest_of_hash_calculation(result, &mut stats);
info!(
"calculate_accounts_hash_from_storages: slot: {} {:?}",

View File

@ -1,7 +1,12 @@
use {
crate::{accounts_db::SnapshotStorages, ancestors::Ancestors, rent_collector::RentCollector},
crate::{
accounts_db::{SnapshotStorages, PUBKEY_BINS_FOR_CALCULATING_HASHES},
ancestors::Ancestors,
rent_collector::RentCollector,
},
core::ops::Range,
log::*,
memmap2::MmapMut,
rayon::prelude::*,
solana_measure::measure::Measure,
solana_sdk::{
@ -13,22 +18,81 @@ use {
std::{
borrow::Borrow,
convert::TryInto,
fs::File,
io::{BufWriter, Write},
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Mutex,
},
},
tempfile::tempfile,
};
pub const MERKLE_FANOUT: usize = 16;
/// the data passed through the processing functions
pub type SortedDataByPubkey<'a> = Vec<&'a [CalculateHashIntermediate]>;
#[derive(Default, Debug)]
pub struct PreviousPass {
pub reduced_hashes: Vec<Vec<Hash>>,
pub remaining_unhashed: Vec<Hash>,
pub lamports: u64,
/// 1 file containing account hashes sorted by pubkey, mapped into memory
struct MmapAccountHashesFile {
mmap: MmapMut,
}
impl MmapAccountHashesFile {
/// return a slice of account hashes starting at 'index'
fn read(&self, index: usize) -> &[Hash] {
let start = std::mem::size_of::<Hash>() * index;
let item_slice: &[u8] = &self.mmap[start..];
let remaining_elements = item_slice.len() / std::mem::size_of::<Hash>();
unsafe {
let item = item_slice.as_ptr() as *const Hash;
std::slice::from_raw_parts(item, remaining_elements)
}
}
}
/// 1 file containing account hashes sorted by pubkey
#[derive(Default)]
pub struct AccountHashesFile {
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
count_and_writer: Option<(usize, BufWriter<File>)>,
}
impl AccountHashesFile {
/// map the file into memory and return a reader that can access it by slice
fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> {
std::mem::take(&mut self.count_and_writer).map(|(count, writer)| {
let file = Some(writer.into_inner().unwrap());
(
count,
MmapAccountHashesFile {
mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() },
},
)
})
}
/// # hashes stored in this file
pub fn count(&self) -> usize {
self.count_and_writer
.as_ref()
.map(|(count, _)| *count)
.unwrap_or_default()
}
/// write 'hash' to the file
/// If the file isn't open, create it first.
pub fn write(&mut self, hash: &Hash) {
if self.count_and_writer.is_none() {
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
self.count_and_writer = Some((0, BufWriter::new(tempfile().unwrap())));
}
let mut count_and_writer = self.count_and_writer.as_mut().unwrap();
assert_eq!(
std::mem::size_of::<Hash>(),
count_and_writer.1.write(hash.as_ref()).unwrap()
);
count_and_writer.0 += 1;
}
}
#[derive(Debug)]
@ -325,14 +389,57 @@ pub struct CumulativeOffsets {
total_count: usize,
}
/// used by merkle tree calculation to lookup account hashes by overall index
#[derive(Default)]
pub struct CumulativeHashesFromFiles {
/// source of hashes in order
readers: Vec<MmapAccountHashesFile>,
/// look up reader index and offset by overall index
cumulative: CumulativeOffsets,
}
impl CumulativeHashesFromFiles {
/// Calculate offset from overall index to which file and offset within that file based on the length of each hash file.
/// Also collect readers to access the data.
pub fn from_files(hashes: Vec<AccountHashesFile>) -> Self {
let mut readers = Vec::with_capacity(hashes.len());
let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| {
// ignores all hashfiles that have zero entries
hash_file.get_reader().map(|(count, reader)| {
readers.push(reader);
count
})
}));
Self {
cumulative,
readers,
}
}
/// total # of items referenced
pub fn total_count(&self) -> usize {
self.cumulative.total_count
}
// return the biggest slice possible that starts at the overall index 'start'
pub fn get_slice(&self, start: usize) -> &[Hash] {
let (start, offset) = self.cumulative.find(start);
let data_source_index = offset.index[0];
let data = &self.readers[data_source_index];
// unwrap here because we should never ask for data that doesn't exist. If we do, then cumulative calculated incorrectly.
data.read(start)
}
}
impl CumulativeOffsets {
pub fn from_raw<T>(raw: &[Vec<T>]) -> CumulativeOffsets {
pub fn new<I>(iter: I) -> Self
where
I: Iterator<Item = usize>,
{
let mut total_count: usize = 0;
let cumulative_offsets: Vec<_> = raw
.iter()
let cumulative_offsets: Vec<_> = iter
.enumerate()
.filter_map(|(i, v)| {
let len = v.len();
.filter_map(|(i, len)| {
if len > 0 {
let result = CumulativeOffset::new(vec![i], total_count);
total_count += len;
@ -349,7 +456,11 @@ impl CumulativeOffsets {
}
}
pub fn from_raw_2d<T>(raw: &[Vec<Vec<T>>]) -> CumulativeOffsets {
pub fn from_raw<T>(raw: &[Vec<T>]) -> Self {
Self::new(raw.iter().map(|v| v.len()))
}
pub fn from_raw_2d<T>(raw: &[Vec<Vec<T>>]) -> Self {
let mut total_count: usize = 0;
let mut cumulative_offsets = Vec::with_capacity(0);
for (i, v_outer) in raw.iter().enumerate() {
@ -372,6 +483,7 @@ impl CumulativeOffsets {
}
}
/// find the index of the data source that contains 'start'
fn find_index(&self, start: usize) -> usize {
assert!(!self.cumulative_offsets.is_empty());
match self.cumulative_offsets[..].binary_search_by(|index| index.start_offset.cmp(&start)) {
@ -380,6 +492,9 @@ impl CumulativeOffsets {
}
}
/// given overall start index 'start'
/// return ('start', which is the offset into the data source at 'index',
/// and 'index', which is the data source to use)
fn find(&self, start: usize) -> (usize, &CumulativeOffset) {
let index = self.find_index(start);
let index = &self.cumulative_offsets[index];
@ -517,6 +632,7 @@ impl AccountsHasher {
specific_level_count: Option<usize>,
) -> (Hash, Vec<Hash>)
where
// returns a slice of hashes starting at the given overall index
F: Fn(usize) -> &'a [T] + std::marker::Sync,
T: Borrow<Hash> + std::marker::Sync + 'a,
{
@ -765,13 +881,13 @@ impl AccountsHasher {
/// returns:
/// Vec, with one entry per bin
/// for each entry, Vec<Hash> in pubkey order
/// If return Vec<Vec<Hash>> was flattened, it would be all hashes, in pubkey order.
/// If return Vec<AccountHashesFile> was flattened, it would be all hashes, in pubkey order.
fn de_dup_and_eliminate_zeros<'a>(
&self,
sorted_data_by_pubkey: &'a [SortedDataByPubkey<'a>],
stats: &mut HashStats,
max_bin: usize,
) -> (Vec<Vec<&'a Hash>>, u64) {
) -> (Vec<AccountHashesFile>, u64) {
// 1. eliminate zero lamport accounts
// 2. pick the highest slot or (slot = and highest version) of each pubkey
// 3. produce this output:
@ -780,10 +896,10 @@ impl AccountsHasher {
// b. lamports
let mut zeros = Measure::start("eliminate zeros");
let min_max_sum_entries_hashes = Mutex::new((usize::MAX, usize::MIN, 0u64, 0usize, 0usize));
let hashes: Vec<Vec<&Hash>> = (0..max_bin)
let hashes: Vec<_> = (0..max_bin)
.into_par_iter()
.map(|bin| {
let (hashes, lamports_bin, unreduced_entries_count) =
let (hashes_file, lamports_bin, unreduced_entries_count) =
self.de_dup_accounts_in_parallel(sorted_data_by_pubkey, bin);
{
let mut lock = min_max_sum_entries_hashes.lock().unwrap();
@ -794,10 +910,10 @@ impl AccountsHasher {
lamports_sum as u128 + lamports_bin as u128,
);
entries += unreduced_entries_count;
hash_total += hashes.len();
hash_total += hashes_file.count();
*lock = (min, max, lamports_sum, entries, hash_total);
}
hashes
hashes_file
})
.collect();
zeros.stop();
@ -854,14 +970,14 @@ impl AccountsHasher {
// 1. eliminate zero lamport accounts
// 2. pick the highest slot or (slot = and highest version) of each pubkey
// 3. produce this output:
// a. vec: individual hashes in pubkey order
// a. AccountHashesFile: individual account hashes in pubkey order
// b. lamport sum
// c. unreduced count (ie. including duplicates and zero lamport)
fn de_dup_accounts_in_parallel<'a>(
&self,
pubkey_division: &'a [SortedDataByPubkey<'a>],
pubkey_bin: usize,
) -> (Vec<&'a Hash>, u64, usize) {
) -> (AccountHashesFile, u64, usize) {
let len = pubkey_division.len();
let mut unreduced_count = 0;
let mut indexes = vec![0; len];
@ -869,7 +985,7 @@ impl AccountsHasher {
// map from index of an item in first_items[] to index of the corresponding item in pubkey_division[]
// this will change as items in pubkey_division[] are exhausted
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
let mut hashes = AccountHashesFile::default();
// initialize 'first_items', which holds the current lowest item in each slot group
pubkey_division.iter().enumerate().for_each(|(i, bins)| {
// check to make sure we can do bins[pubkey_bin]
@ -883,7 +999,6 @@ impl AccountsHasher {
}
});
let mut overall_sum = 0;
let mut hashes: Vec<&Hash> = Vec::with_capacity(unreduced_count);
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
let filler_accounts_enabled = self.filler_accounts_enabled();
@ -933,7 +1048,7 @@ impl AccountsHasher {
overall_sum = Self::checked_cast_for_capitalization(
item.lamports as u128 + overall_sum as u128,
);
hashes.push(&item.hash);
hashes.write(&item.hash);
}
if !duplicate_pubkey_indexes.is_empty() {
// skip past duplicate keys in earlier slots
@ -970,115 +1085,26 @@ impl AccountsHasher {
&self,
data_sections_by_pubkey: Vec<SortedDataByPubkey<'_>>,
mut stats: &mut HashStats,
is_last_pass: bool,
mut previous_state: PreviousPass,
max_bin: usize,
) -> (Hash, u64, PreviousPass) {
let (mut hashes, mut total_lamports) =
self.de_dup_and_eliminate_zeros(&data_sections_by_pubkey, stats, max_bin);
) -> (Hash, u64) {
let (hashes, total_lamports) = self.de_dup_and_eliminate_zeros(
&data_sections_by_pubkey,
stats,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
);
total_lamports += previous_state.lamports;
let cumulative = CumulativeHashesFromFiles::from_files(hashes);
let mut _remaining_unhashed = None;
if !previous_state.remaining_unhashed.is_empty() {
// These items were not hashed last iteration because they didn't divide evenly.
// These are hashes for pubkeys that are < the pubkeys we are looking at now, so their hashes go first in order.
_remaining_unhashed = Some(previous_state.remaining_unhashed);
hashes.insert(
0,
_remaining_unhashed
.as_ref()
.unwrap()
.iter()
.collect::<Vec<_>>(),
);
previous_state.remaining_unhashed = Vec::new();
}
let mut next_pass = PreviousPass::default();
let cumulative = CumulativeOffsets::from_raw(&hashes);
let mut hash_total = cumulative.total_count;
next_pass.reduced_hashes = previous_state.reduced_hashes;
const TARGET_FANOUT_LEVEL: usize = 3;
let target_fanout = MERKLE_FANOUT.pow(TARGET_FANOUT_LEVEL as u32);
if !is_last_pass {
next_pass.lamports = total_lamports;
total_lamports = 0;
// Save hashes that don't evenly hash. They will be combined with hashes from the next pass.
let left_over_hashes = hash_total % target_fanout;
// move tail hashes that don't evenly hash into a 1d vector for next time
let mut i = hash_total - left_over_hashes;
while i < hash_total {
let data = cumulative.get_slice(&hashes, i);
next_pass.remaining_unhashed.extend(data.iter().cloned());
i += data.len();
}
hash_total -= left_over_hashes; // this is enough to cause the hashes at the end of the data set to be ignored
}
// if we have raw hashes to process and
// we are not the last pass (we already modded against target_fanout) OR
// we have previously surpassed target_fanout and hashed some already to the target_fanout level. In that case, we know
// we need to hash whatever is left here to the target_fanout level.
if hash_total != 0 && (!is_last_pass || !next_pass.reduced_hashes.is_empty()) {
let mut hash_time = Measure::start("hash");
let partial_hashes = Self::compute_merkle_root_from_slices(
hash_total, // note this does not include the ones that didn't divide evenly, unless we're in the last iteration
MERKLE_FANOUT,
Some(TARGET_FANOUT_LEVEL),
|start| cumulative.get_slice(&hashes, start),
Some(TARGET_FANOUT_LEVEL),
)
.1;
hash_time.stop();
stats.hash_time_total_us += hash_time.as_us();
stats.hash_time_pre_us += hash_time.as_us();
next_pass.reduced_hashes.push(partial_hashes);
}
let no_progress = is_last_pass && next_pass.reduced_hashes.is_empty() && !hashes.is_empty();
if no_progress {
// we never made partial progress, so hash everything now
hashes.into_iter().for_each(|v| {
if !v.is_empty() {
next_pass
.reduced_hashes
.push(v.into_iter().cloned().collect());
}
});
}
let hash = if is_last_pass {
let cumulative = CumulativeOffsets::from_raw(&next_pass.reduced_hashes);
let hash = if cumulative.total_count == 1 && !no_progress {
// all the passes resulted in a single hash, that means we're done, so we had <= MERKLE_ROOT total hashes
cumulative.get_slice(&next_pass.reduced_hashes, 0)[0]
} else {
let mut hash_time = Measure::start("hash");
// hash all the rest and combine and hash until we have only 1 hash left
let (hash, _) = Self::compute_merkle_root_from_slices(
cumulative.total_count,
MERKLE_FANOUT,
None,
|start| cumulative.get_slice(&next_pass.reduced_hashes, start),
None,
);
hash_time.stop();
stats.hash_time_total_us += hash_time.as_us();
hash
};
next_pass.reduced_hashes = Vec::new();
hash
} else {
Hash::default()
};
(hash, total_lamports, next_pass)
let mut hash_time = Measure::start("hash");
let (hash, _) = Self::compute_merkle_root_from_slices(
cumulative.total_count(),
MERKLE_FANOUT,
None,
|start| cumulative.get_slice(start),
None,
);
hash_time.stop();
stats.hash_time_total_us += hash_time.as_us();
(hash, total_lamports)
}
}
@ -1090,6 +1116,97 @@ pub struct AccountsHash(pub Hash);
pub mod tests {
use {super::*, std::str::FromStr};
#[test]
fn test_account_hashes_file() {
// 0 hashes
let mut file = AccountHashesFile::default();
assert!(file.get_reader().is_none());
let hashes = (0..2).map(|i| Hash::new(&[i; 32])).collect::<Vec<_>>();
// 1 hash
file.write(&hashes[0]);
let reader = file.get_reader().unwrap();
assert_eq!(&[hashes[0]][..], reader.1.read(0));
assert!(reader.1.read(1).is_empty());
// multiple hashes
let mut file = AccountHashesFile::default();
assert!(file.get_reader().is_none());
hashes.iter().for_each(|hash| file.write(hash));
let reader = file.get_reader().unwrap();
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i)));
assert!(reader.1.read(2).is_empty());
}
#[test]
fn test_cumulative_hashes_from_files() {
(0..4).for_each(|permutation| {
let hashes = (0..2).map(|i| Hash::new(&[i + 1; 32])).collect::<Vec<_>>();
let mut combined = Vec::default();
// 0 hashes
let file0 = AccountHashesFile::default();
// 1 hash
let mut file1 = AccountHashesFile::default();
file1.write(&hashes[0]);
combined.push(hashes[0]);
// multiple hashes
let mut file2 = AccountHashesFile::default();
hashes.iter().for_each(|hash| {
file2.write(hash);
combined.push(*hash);
});
let hashes = if permutation == 0 {
vec![file0, file1, file2]
} else if permutation == 1 {
// include more empty files
vec![
file0,
file1,
AccountHashesFile::default(),
file2,
AccountHashesFile::default(),
]
} else if permutation == 2 {
vec![file1, file2]
} else {
// swap file2 and 1
let one = combined.remove(0);
combined.push(one);
vec![
file2,
AccountHashesFile::default(),
AccountHashesFile::default(),
file1,
]
};
let cumulative = CumulativeHashesFromFiles::from_files(hashes);
let len = combined.len();
assert_eq!(cumulative.total_count(), len);
(0..combined.len()).for_each(|start| {
let mut retreived = Vec::default();
let mut cumulative_start = start;
// read all data
while retreived.len() < (len - start) {
let this_one = cumulative.get_slice(cumulative_start);
retreived.extend(this_one.iter());
cumulative_start += this_one.len();
assert_ne!(0, this_one.len());
}
assert_eq!(
&combined[start..],
&retreived[..],
"permutation: {permutation}"
);
});
});
}
#[test]
fn test_accountsdb_div_ceil() {
assert_eq!(AccountsHasher::div_ceil(10, 3), 4);
@ -1127,13 +1244,8 @@ pub mod tests {
account_maps.push(val);
let accounts_hash = AccountsHasher::default();
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&account_maps),
&mut HashStats::default(),
true,
PreviousPass::default(),
one_range(),
);
let result = accounts_hash
.rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default());
let expected_hash = Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap();
assert_eq!((result.0, result.1), (expected_hash, 88));
@ -1143,13 +1255,8 @@ pub mod tests {
let val = CalculateHashIntermediate::new(hash, 20, key);
account_maps.insert(0, val);
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&account_maps),
&mut HashStats::default(),
true,
PreviousPass::default(),
one_range(),
);
let result = accounts_hash
.rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default());
let expected_hash = Hash::from_str("EHv9C5vX7xQjjMpsJMzudnDTzoTSRwYkqLzY8tVMihGj").unwrap();
assert_eq!((result.0, result.1), (expected_hash, 108));
@ -1159,13 +1266,8 @@ pub mod tests {
let val = CalculateHashIntermediate::new(hash, 30, key);
account_maps.insert(1, val);
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&account_maps),
&mut HashStats::default(),
true,
PreviousPass::default(),
one_range(),
);
let result = accounts_hash
.rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default());
let expected_hash = Hash::from_str("7NNPg5A8Xsg1uv4UFm6KZNwsipyyUnmgCrznP6MBWoBZ").unwrap();
assert_eq!((result.0, result.1), (expected_hash, 118));
}
@ -1178,308 +1280,6 @@ pub mod tests {
0
}
const EMPTY_DATA: [CalculateHashIntermediate; 0] = [];
fn empty_data() -> Vec<SortedDataByPubkey<'static>> {
vec![vec![&EMPTY_DATA]]
}
#[test]
fn test_accountsdb_multi_pass_rest_of_hash_calculation() {
solana_logger::setup();
// passes:
// 0: empty, NON-empty, empty, empty final
// 1: NON-empty, empty final
// 2: NON-empty, empty, empty final
for pass in 0..3 {
let mut account_maps = Vec::new();
let key = Pubkey::new(&[11u8; 32]);
let hash = Hash::new(&[1u8; 32]);
let val = CalculateHashIntermediate::new(hash, 88, key);
account_maps.push(val);
// 2nd key - zero lamports, so will be removed
let key = Pubkey::new(&[12u8; 32]);
let hash = Hash::new(&[2u8; 32]);
let val = CalculateHashIntermediate::new(hash, 0, key);
account_maps.push(val);
let mut previous_pass = PreviousPass::default();
let accounts_index = AccountsHasher::default();
if pass == 0 {
// first pass that is not last and is empty
let result = accounts_index.rest_of_hash_calculation(
empty_data(),
&mut HashStats::default(),
false, // not last pass
previous_pass,
one_range(),
);
assert_eq!(result.0, Hash::default());
assert_eq!(result.1, 0);
previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed.len(), 0);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, 0);
}
let result = accounts_index.rest_of_hash_calculation(
for_rest(&account_maps),
&mut HashStats::default(),
false, // not last pass
previous_pass,
one_range(),
);
assert_eq!(result.0, Hash::default());
assert_eq!(result.1, 0);
let mut previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed, vec![account_maps[0].hash]);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, account_maps[0].lamports);
let expected_hash =
Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap();
let accounts_index = AccountsHasher::default();
if pass == 2 {
let result = accounts_index.rest_of_hash_calculation(
empty_data(),
&mut HashStats::default(),
false,
previous_pass,
one_range(),
);
previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed, vec![account_maps[0].hash]);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, account_maps[0].lamports);
}
let result = accounts_index.rest_of_hash_calculation(
empty_data(),
&mut HashStats::default(),
true, // finally, last pass
previous_pass,
one_range(),
);
let previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed.len(), 0);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, 0);
assert_eq!((result.0, result.1), (expected_hash, 88));
}
}
#[test]
fn test_accountsdb_multi_pass_rest_of_hash_calculation_partial() {
solana_logger::setup();
let mut account_maps = Vec::new();
let key = Pubkey::new(&[11u8; 32]);
let hash = Hash::new(&[1u8; 32]);
let val = CalculateHashIntermediate::new(hash, 88, key);
account_maps.push(val);
let key = Pubkey::new(&[12u8; 32]);
let hash = Hash::new(&[2u8; 32]);
let val = CalculateHashIntermediate::new(hash, 20, key);
account_maps.push(val);
let accounts_hash = AccountsHasher::default();
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&[account_maps[0].clone()]),
&mut HashStats::default(),
false, // not last pass
PreviousPass::default(),
one_range(),
);
assert_eq!(result.0, Hash::default());
assert_eq!(result.1, 0);
let previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed, vec![account_maps[0].hash]);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, account_maps[0].lamports);
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&[account_maps[1].clone()]),
&mut HashStats::default(),
false, // not last pass
previous_pass,
one_range(),
);
assert_eq!(result.0, Hash::default());
assert_eq!(result.1, 0);
let previous_pass = result.2;
assert_eq!(
previous_pass.remaining_unhashed,
vec![account_maps[0].hash, account_maps[1].hash]
);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
let total_lamports_expected = account_maps[0].lamports + account_maps[1].lamports;
assert_eq!(previous_pass.lamports, total_lamports_expected);
let result = accounts_hash.rest_of_hash_calculation(
empty_data(),
&mut HashStats::default(),
true,
previous_pass,
one_range(),
);
let previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed.len(), 0);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, 0);
let expected_hash = AccountsHasher::compute_merkle_root(
account_maps
.iter()
.map(|a| (a.pubkey, a.hash))
.collect::<Vec<_>>(),
MERKLE_FANOUT,
);
assert_eq!(
(result.0, result.1),
(expected_hash, total_lamports_expected)
);
}
#[test]
fn test_accountsdb_multi_pass_rest_of_hash_calculation_partial_hashes() {
solana_logger::setup();
let mut account_maps = Vec::new();
let accounts_hash = AccountsHasher::default();
const TARGET_FANOUT_LEVEL: usize = 3;
let target_fanout = MERKLE_FANOUT.pow(TARGET_FANOUT_LEVEL as u32);
let mut total_lamports_expected = 0;
let plus1 = target_fanout + 1;
for i in 0..plus1 * 2 {
let lamports = (i + 1) as u64;
total_lamports_expected += lamports;
let key = Pubkey::new_unique();
let hash = Hash::new_unique();
let val = CalculateHashIntermediate::new(hash, lamports, key);
account_maps.push(val);
}
let mut chunk = account_maps[0..plus1].to_vec();
chunk.sort_by(AccountsHasher::compare_two_hash_entries);
let sorted = chunk.clone();
// first 4097 hashes (1 left over)
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&chunk),
&mut HashStats::default(),
false, // not last pass
PreviousPass::default(),
one_range(),
);
assert_eq!(result.0, Hash::default());
assert_eq!(result.1, 0);
let previous_pass = result.2;
let left_over_1 = sorted[plus1 - 1].hash;
assert_eq!(previous_pass.remaining_unhashed, vec![left_over_1]);
assert_eq!(previous_pass.reduced_hashes.len(), 1);
let expected_hash = AccountsHasher::compute_merkle_root(
sorted[0..target_fanout]
.iter()
.map(|a| (a.pubkey, a.hash))
.collect::<Vec<_>>(),
MERKLE_FANOUT,
);
assert_eq!(previous_pass.reduced_hashes[0], vec![expected_hash]);
assert_eq!(
previous_pass.lamports,
account_maps[0..plus1]
.iter()
.map(|i| i.lamports)
.sum::<u64>()
);
let mut chunk = account_maps[plus1..plus1 * 2].to_vec();
chunk.sort_by(AccountsHasher::compare_two_hash_entries);
let sorted2 = chunk.clone();
let mut with_left_over = vec![left_over_1];
with_left_over.extend(sorted2[0..plus1 - 2].iter().cloned().map(|i| i.hash));
let expected_hash2 = AccountsHasher::compute_merkle_root(
with_left_over[0..target_fanout]
.iter()
.map(|a| (Pubkey::default(), *a))
.collect::<Vec<_>>(),
MERKLE_FANOUT,
);
// second 4097 hashes (2 left over)
let result = accounts_hash.rest_of_hash_calculation(
for_rest(&chunk),
&mut HashStats::default(),
false, // not last pass
previous_pass,
one_range(),
);
assert_eq!(result.0, Hash::default());
assert_eq!(result.1, 0);
let previous_pass = result.2;
assert_eq!(
previous_pass.remaining_unhashed,
vec![sorted2[plus1 - 2].hash, sorted2[plus1 - 1].hash]
);
assert_eq!(previous_pass.reduced_hashes.len(), 2);
assert_eq!(
previous_pass.reduced_hashes,
vec![vec![expected_hash], vec![expected_hash2]]
);
assert_eq!(
previous_pass.lamports,
account_maps[0..plus1 * 2]
.iter()
.map(|i| i.lamports)
.sum::<u64>()
);
let result = accounts_hash.rest_of_hash_calculation(
empty_data(),
&mut HashStats::default(),
true,
previous_pass,
one_range(),
);
let previous_pass = result.2;
assert_eq!(previous_pass.remaining_unhashed.len(), 0);
assert_eq!(previous_pass.reduced_hashes.len(), 0);
assert_eq!(previous_pass.lamports, 0);
let mut combined = sorted;
combined.extend(sorted2);
let expected_hash = AccountsHasher::compute_merkle_root(
combined
.iter()
.map(|a| (a.pubkey, a.hash))
.collect::<Vec<_>>(),
MERKLE_FANOUT,
);
assert_eq!(
(result.0, result.1),
(expected_hash, total_lamports_expected)
);
}
#[test]
fn test_accountsdb_de_dup_accounts_zero_chunks() {
let vec = [vec![vec![CalculateHashIntermediate {
@ -1488,12 +1288,22 @@ pub mod tests {
}]]];
let temp_vec = vec.to_vec();
let slice = convert_to_slice2(&temp_vec);
let (hashes, lamports, _) =
let (mut hashes, lamports, _) =
AccountsHasher::default().de_dup_accounts_in_parallel(&slice, 0);
assert_eq!(vec![&Hash::default()], hashes);
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0));
assert_eq!(lamports, 1);
}
fn get_vec_vec(hashes: Vec<AccountHashesFile>) -> Vec<Vec<Hash>> {
hashes.into_iter().map(get_vec).collect()
}
fn get_vec(mut hashes: AccountHashesFile) -> Vec<Hash> {
hashes
.get_reader()
.map(|r| r.1.read(0).to_vec())
.unwrap_or_default()
}
#[test]
fn test_accountsdb_de_dup_accounts_empty() {
solana_logger::setup();
@ -1503,23 +1313,26 @@ pub mod tests {
let (hashes, lamports) =
accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), one_range());
assert_eq!(
vec![&Hash::default(); 0],
hashes.into_iter().flatten().collect::<Vec<_>>()
vec![Hash::default(); 0],
get_vec_vec(hashes)
.into_iter()
.flatten()
.collect::<Vec<_>>(),
);
assert_eq!(lamports, 0);
let vec = vec![];
let (hashes, lamports) =
accounts_hash.de_dup_and_eliminate_zeros(&vec, &mut HashStats::default(), zero_range());
let empty: Vec<Vec<&Hash>> = Vec::default();
assert_eq!(empty, hashes);
let empty: Vec<Vec<Hash>> = Vec::default();
assert_eq!(empty, get_vec_vec(hashes));
assert_eq!(lamports, 0);
let (hashes, lamports, _) = accounts_hash.de_dup_accounts_in_parallel(&[], 1);
assert_eq!(vec![&Hash::default(); 0], hashes);
assert_eq!(vec![Hash::default(); 0], get_vec(hashes));
assert_eq!(lamports, 0);
let (hashes, lamports, _) = accounts_hash.de_dup_accounts_in_parallel(&[], 2);
assert_eq!(vec![&Hash::default(); 0], hashes);
assert_eq!(vec![Hash::default(); 0], get_vec(hashes));
assert_eq!(lamports, 0);
}
@ -1626,6 +1439,12 @@ pub mod tests {
end - start,
);
let hashes2 = get_vec(hashes2);
let hashes3 = get_vec(hashes3);
let hashes4 = get_vec_vec(hashes4);
let hashes5 = get_vec_vec(hashes5);
let hashes6 = get_vec_vec(hashes6);
assert_eq!(hashes2, hashes3);
let expected2 = hashes2.clone();
assert_eq!(
@ -1732,7 +1551,7 @@ pub mod tests {
fn test_de_dup_accounts_in_parallel<'a>(
account_maps: &'a [SortedDataByPubkey<'a>],
) -> (Vec<&'a Hash>, u64, usize) {
) -> (AccountHashesFile, u64, usize) {
AccountsHasher::default().de_dup_accounts_in_parallel(account_maps, 0)
}
@ -1748,8 +1567,11 @@ pub mod tests {
let vecs = vec![vec![account_maps.to_vec()]];
let slice = convert_to_slice2(&vecs);
let result = test_de_dup_accounts_in_parallel(&slice);
assert_eq!(result, (vec![&val.hash], val.lamports, 1));
let (hashfile, lamports, count) = test_de_dup_accounts_in_parallel(&slice);
assert_eq!(
(get_vec(hashfile), lamports, count),
(vec![val.hash], val.lamports, 1)
);
// zero original lamports, higher version
let val = CalculateHashIntermediate::new(hash, 0, key);
@ -1757,8 +1579,8 @@ pub mod tests {
let vecs = vec![vec![account_maps.to_vec()]];
let slice = convert_to_slice2(&vecs);
let result = test_de_dup_accounts_in_parallel(&slice);
assert_eq!(result, (vec![], 0, 2));
let (hashfile, lamports, count) = test_de_dup_accounts_in_parallel(&slice);
assert_eq!((get_vec(hashfile), lamports, count), (vec![], 0, 2));
}
#[test]