accounts hash calculation uses files instead of memory (#28065)

This commit is contained in:
Jeff Washington (jwash) 2022-10-18 07:51:38 -07:00 committed by GitHub
parent b669b827fb
commit 682999a423
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 182 additions and 142 deletions

View File

@ -43,7 +43,7 @@ use {
},
append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion},
bank::Rewrites,
cache_hash_data::CacheHashData,
cache_hash_data::{CacheHashData, CacheHashDataFile},
contains::Contains,
epoch_accounts_hash::EpochAccountsHashManager,
pubkey_bins::PubkeyBinCalculator24,
@ -7042,9 +7042,9 @@ impl AccountsDb {
}
/// Scan through all the account storage in parallel.
/// Returns a Vec of cache data. At this level, the vector is ordered from older slots to newer slots.
/// A single pubkey could be in multiple entries. The pubkey found int the latest entry is the one to use.
/// Each entry in the Vec contains data binned by pubkey according to the various binning parameters.
/// Returns a Vec of open/mmapped files.
/// Each file has serialized hash info, sorted by pubkey and then slot, from scanning the append vecs.
/// A single pubkey could be in multiple entries. The pubkey found in the latest entry is the one to use.
fn scan_account_storage_no_bank<S>(
&self,
cache_hash_data: &CacheHashData,
@ -7052,9 +7052,8 @@ impl AccountsDb {
snapshot_storages: &SortedStorages,
scanner: S,
bin_range: &Range<usize>,
bin_calculator: &PubkeyBinCalculator24,
stats: &HashStats,
) -> Vec<BinnedHashData>
) -> Vec<CacheHashDataFile>
where
S: AppendVecScan,
{
@ -7066,34 +7065,12 @@ impl AccountsDb {
snapshot_storages,
);
let range = snapshot_storages.range();
let start_bin_index = bin_range.start;
(0..splitter.chunk_count)
.into_par_iter()
.map(|chunk| {
let mut scanner = scanner.clone();
let range_this_chunk = splitter.get_slot_range(chunk);
if range_this_chunk.is_none() {
return scanner.scanning_complete();
}
let range_this_chunk = range_this_chunk.unwrap();
let should_cache_hash_data = CalcAccountsHashConfig::get_should_cache_hash_data()
|| config.store_detailed_debug_info_on_failure;
// Single cached slots get cached and full chunks get cached.
// chunks that don't divide evenly would include some cached append vecs that are no longer part of this range and some that are, so we have to ignore caching on non-evenly dividing chunks.
let eligible_for_caching = splitter.is_chunk_ancient(chunk)
|| range_this_chunk.end.saturating_sub(range_this_chunk.start)
== MAX_ITEMS_PER_CHUNK;
if eligible_for_caching || config.store_detailed_debug_info_on_failure {
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
}
let range_this_chunk = splitter.get_slot_range(chunk)?;
let slots_per_epoch = config
.rent_collector
@ -7104,9 +7081,7 @@ impl AccountsDb {
.end
.saturating_sub(slots_per_epoch);
let file_name = if (should_cache_hash_data && eligible_for_caching)
|| config.store_detailed_debug_info_on_failure
{
let file_name = {
let mut load_from_cache = true;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
bin_range.start.hash(&mut hasher);
@ -7123,47 +7098,31 @@ impl AccountsDb {
break;
}
}
// we have a hash value for all the storages in this slot
// so, build a file name:
let hash = hasher.finish();
let file_name = format!(
"{}.{}.{}.{}.{}",
range_this_chunk.start,
range_this_chunk.end,
bin_range.start,
bin_range.end,
hash
);
if load_from_cache {
// we have a hash value for all the storages in this slot
// so, build a file name:
let hash = hasher.finish();
let file_name = format!(
"{}.{}.{}.{}.{}",
range_this_chunk.start,
range_this_chunk.end,
bin_range.start,
bin_range.end,
hash
);
let mut retval = scanner.get_accum();
if eligible_for_caching
&& cache_hash_data
.load(
&Path::new(&file_name),
&mut retval,
start_bin_index,
bin_calculator,
)
.is_ok()
{
return retval;
if let Ok(mapped_file) = cache_hash_data.load_map(&Path::new(&file_name)) {
return Some(mapped_file);
}
scanner.set_accum(retval);
}
// fall through and load normally - we failed to load
file_name
} else {
String::default()
}
} else {
for (slot, sub_storages) in snapshot_storages.iter_range(&range_this_chunk) {
if bin_range.start == 0 && slot < one_epoch_old {
self.update_old_slot_stats(stats, sub_storages);
}
}
String::default()
// fall through and load normally - we failed to load from a cache file
file_name
};
// load from cache failed, so create the cache file for this chunk
let range = bin_range.end - bin_range.start;
scanner.init_accum(range);
for (slot, sub_storages) in snapshot_storages.iter_range(&range_this_chunk) {
scanner.set_slot(slot);
if let Some(sub_storages) = sub_storages {
@ -7171,24 +7130,15 @@ impl AccountsDb {
}
}
let r = scanner.scanning_complete();
if !file_name.is_empty() {
let result = cache_hash_data.save(Path::new(&file_name), &r);
if result.is_err() {
info!(
"FAILED_TO_SAVE: {}-{}, {}, first_chunk_start: {}, {:?}, error: {:?}",
range.start,
range.end,
splitter.non_ancient_slot_count,
splitter.first_chunk_start,
file_name,
result,
);
}
}
r
assert!(!file_name.is_empty());
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
// error if we can't write this
let file_name = Path::new(&file_name);
cache_hash_data.save(Path::new(&file_name), &r).unwrap();
cache_hash_data.load_map(&file_name).unwrap()
})
})
.filter(|x| !x.is_empty())
.filter_map(|x| x)
.collect()
}
@ -7338,7 +7288,7 @@ impl AccountsDb {
bin_range: &Range<usize>,
config: &CalcAccountsHashConfig<'_>,
filler_account_suffix: Option<&Pubkey>,
) -> Result<Vec<BinnedHashData>, BankHashVerificationError> {
) -> Result<Vec<CacheHashDataFile>, BankHashVerificationError> {
let bin_calculator = PubkeyBinCalculator24::new(bins);
assert!(bin_range.start < bins && bin_range.end <= bins && bin_range.start < bin_range.end);
let mut time = Measure::start("scan all accounts");
@ -7361,13 +7311,12 @@ impl AccountsDb {
pubkey_to_bin_index: 0,
};
let result: Vec<BinnedHashData> = self.scan_account_storage_no_bank(
let result = self.scan_account_storage_no_bank(
cache_hash_data,
config,
storage,
scanner,
bin_range,
&bin_calculator,
stats,
);
@ -7485,6 +7434,19 @@ impl AccountsDb {
hash.filler_account_suffix.as_ref(),
)?;
// convert mmapped cache files into slices of data
let slices = result
.iter()
.map(|d| d.get_cache_hash_data())
.collect::<Vec<_>>();
// rework slices of data into bins for parallel processing and to match data shape expected by 'rest_of_hash_calculation'
let result = AccountsHash::get_binned_data(
&slices,
PUBKEY_BINS_FOR_CALCULATING_HASHES,
&bounds,
);
// turn raw data into merkle tree hashes and sum of lamports
let (hash, lamports, for_next_pass) = hash.rest_of_hash_calculation(
result,
@ -9588,6 +9550,7 @@ pub mod tests {
tests::*, AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount,
},
append_vec::{test_utils::TempFile, AccountMeta},
cache_hash_data_stats::CacheHashDataStats,
inline_spl_token,
},
assert_matches::assert_matches,
@ -9627,7 +9590,7 @@ pub mod tests {
bins: usize,
bin_range: &Range<usize>,
check_hash: bool,
) -> Result<Vec<BinnedHashData>, BankHashVerificationError> {
) -> Result<Vec<CacheHashDataFile>, BankHashVerificationError> {
let temp_dir = TempDir::new().unwrap();
let accounts_hash_cache_path = temp_dir.path();
self.scan_snapshot_stores_with_cache(
@ -9777,9 +9740,35 @@ pub mod tests {
SortedStorages::new(input)
}
/// helper method. Refactoring will soon make this comparison more complicated.
fn assert_scan(result: Vec<BinnedHashData>, expected: Vec<BinnedHashData>) {
assert_eq!(result, expected);
/// helper to compare expected binned data with scan result in cache files
/// result: return from scanning
/// expected: binned data expected
/// bins: # bins total to divide pubkeys into
/// start_bin_index: bin # that was the minimum # we were scanning for 0<=start_bin_index<bins
/// bin_range: end_exclusive-start_bin_index passed to scan
fn assert_scan(
result: Vec<CacheHashDataFile>,
expected: Vec<BinnedHashData>,
bins: usize,
start_bin_index: usize,
bin_range: usize,
) {
assert_eq!(expected.len(), result.len());
for cache_file in &result {
let mut result2 = (0..bin_range).map(|_| Vec::default()).collect::<Vec<_>>();
cache_file.load_all(
&mut result2,
start_bin_index,
&PubkeyBinCalculator24::new(bins),
&mut CacheHashDataStats::default(),
);
assert_eq!(
convert_to_slice(&[result2]),
expected,
"bins: {bins}, start_bin_index: {start_bin_index}"
);
}
}
#[test]
@ -9803,7 +9792,7 @@ pub mod tests {
false,
)
.unwrap();
assert_scan(result, vec![vec![raw_expected.clone()]]);
assert_scan(result, vec![vec![raw_expected.clone()]], bins, 0, bins);
let bins = 2;
let accounts_db = AccountsDb::new_single_for_tests();
@ -9824,7 +9813,7 @@ pub mod tests {
expected[0].push(raw_expected[1].clone());
expected[bins - 1].push(raw_expected[2].clone());
expected[bins - 1].push(raw_expected[3].clone());
assert_scan(result, vec![expected]);
assert_scan(result, vec![expected], bins, 0, bins);
let bins = 4;
let accounts_db = AccountsDb::new_single_for_tests();
@ -9845,7 +9834,7 @@ pub mod tests {
expected[1].push(raw_expected[1].clone());
expected[2].push(raw_expected[2].clone());
expected[bins - 1].push(raw_expected[3].clone());
assert_scan(result, vec![expected]);
assert_scan(result, vec![expected], bins, 0, bins);
let bins = 256;
let accounts_db = AccountsDb::new_single_for_tests();
@ -9866,7 +9855,7 @@ pub mod tests {
expected[127].push(raw_expected[1].clone());
expected[128].push(raw_expected[2].clone());
expected[bins - 1].push(raw_expected.last().unwrap().clone());
assert_scan(result, vec![expected]);
assert_scan(result, vec![expected], bins, 0, bins);
}
#[test]
@ -9894,11 +9883,8 @@ pub mod tests {
false,
)
.unwrap();
assert_eq!(result.len(), 2); // 2 chunks
assert_eq!(result[0].len(), bins);
assert_eq!(0, result[0].iter().map(|x| x.len()).sum::<usize>()); // nothing found in bin 0
assert_eq!(result[1].len(), bins);
assert_eq!(result[1], vec![raw_expected]);
assert_scan(result, vec![vec![raw_expected]], bins, 0, bins);
}
#[test]
@ -9925,7 +9911,7 @@ pub mod tests {
let mut expected = vec![Vec::new(); half_bins];
expected[0].push(raw_expected[0].clone());
expected[0].push(raw_expected[1].clone());
assert_scan(result, vec![expected]);
assert_scan(result, vec![expected], bins, 0, half_bins);
// just the second bin of 2
let accounts_db = AccountsDb::new_single_for_tests();
@ -9946,7 +9932,7 @@ pub mod tests {
let starting_bin_index = 0;
expected[starting_bin_index].push(raw_expected[2].clone());
expected[starting_bin_index].push(raw_expected[3].clone());
assert_scan(result, vec![expected]);
assert_scan(result, vec![expected], bins, 1, bins - 1);
// 1 bin at a time of 4
let bins = 4;
@ -9967,7 +9953,7 @@ pub mod tests {
.unwrap();
let mut expected = vec![Vec::new(); 1];
expected[0].push(expected_item.clone());
assert_scan(result, vec![expected]);
assert_scan(result, vec![expected], bins, bin, 1);
}
let bins = 256;
@ -9989,10 +9975,22 @@ pub mod tests {
.unwrap();
let mut expected = vec![];
if let Some(index) = bin_locations.iter().position(|&r| r == bin) {
expected = vec![vec![Vec::new(); range]];
expected[0][0].push(raw_expected[index].clone());
expected = vec![Vec::new(); range];
expected[0].push(raw_expected[index].clone());
}
assert_eq!(result, expected);
let mut result2 = (0..range).map(|_| Vec::default()).collect::<Vec<_>>();
if let Some(m) = result.get(0) {
m.load_all(
&mut result2,
bin,
&PubkeyBinCalculator24::new(bins),
&mut CacheHashDataStats::default(),
);
} else {
result2 = vec![];
}
assert_eq!(result2, expected);
}
}
@ -10024,13 +10022,18 @@ pub mod tests {
false,
)
.unwrap();
assert_eq!(result.len(), 2); // 2 chunks
assert_eq!(result[0].len(), range);
assert_eq!(0, result[0].iter().map(|x| x.len()).sum::<usize>()); // nothing found in bin 0
assert_eq!(result.len(), 1); // 2 chunks, but 1 is empty so not included
let mut expected = vec![Vec::new(); range];
expected[0].push(raw_expected[1].clone());
assert_eq!(result[1].len(), 1);
assert_eq!(result[1], expected);
let mut result2 = (0..range).map(|_| Vec::default()).collect::<Vec<_>>();
result[0].load_all(
&mut result2,
0,
&PubkeyBinCalculator24::new(range),
&mut CacheHashDataStats::default(),
);
assert_eq!(result2.len(), 1);
assert_eq!(result2, expected);
}
#[test]
@ -10170,7 +10173,6 @@ pub mod tests {
&get_storage_refs(&storages),
test_scan,
&Range { start: 0, end: 1 },
&PubkeyBinCalculator24::new(1),
&HashStats::default(),
);
assert_eq!(calls.load(Ordering::Relaxed), 1);
@ -10181,9 +10183,21 @@ pub mod tests {
expected,
pubkey,
)]]],
1,
0,
1,
);
}
fn convert_to_slice(
input: &[Vec<Vec<CalculateHashIntermediate>>],
) -> Vec<Vec<&[CalculateHashIntermediate]>> {
input
.iter()
.map(|v| v.iter().map(|v| &v[..]).collect::<Vec<_>>())
.collect::<Vec<_>>()
}
#[test]
fn test_accountsdb_scan_account_storage_no_bank_one_slot() {
solana_logger::setup();

View File

@ -22,7 +22,7 @@ use {
pub const MERKLE_FANOUT: usize = 16;
/// the data passed through the processing functions
pub type SortedDataByPubkey = Vec<Vec<CalculateHashIntermediate>>;
pub type SortedDataByPubkey<'a> = Vec<&'a [CalculateHashIntermediate]>;
#[derive(Default, Debug)]
pub struct PreviousPass {
@ -695,8 +695,7 @@ impl AccountsHash {
/// return references to cache hash data, grouped by bin, sourced from 'sorted_data_by_pubkey',
/// which is probably a mmapped file.
#[allow(dead_code)]
fn get_binned_data<'a>(
pub(crate) fn get_binned_data<'a>(
sorted_data_by_pubkey: &'a Vec<&'a [CalculateHashIntermediate]>,
bins: usize,
bin_range: &Range<usize>,
@ -744,7 +743,7 @@ impl AccountsHash {
fn de_dup_and_eliminate_zeros<'a>(
&self,
sorted_data_by_pubkey: &'a [SortedDataByPubkey],
sorted_data_by_pubkey: &'a [SortedDataByPubkey<'a>],
stats: &mut HashStats,
max_bin: usize,
) -> (Vec<Vec<&'a Hash>>, u64) {
@ -792,7 +791,7 @@ impl AccountsHash {
min_index: usize,
bin: usize,
first_items: &'a mut Vec<Pubkey>,
pubkey_division: &'b [SortedDataByPubkey],
pubkey_division: &'b [SortedDataByPubkey<'b>],
indexes: &'a mut [usize],
first_item_to_pubkey_division: &'a mut Vec<usize>,
) -> &'b CalculateHashIntermediate {
@ -835,7 +834,7 @@ impl AccountsHash {
// c. unreduced count (ie. including duplicates and zero lamport)
fn de_dup_accounts_in_parallel<'a>(
&self,
pubkey_division: &'a [SortedDataByPubkey],
pubkey_division: &'a [SortedDataByPubkey<'a>],
pubkey_bin: usize,
) -> (Vec<&'a Hash>, u64, usize) {
let len = pubkey_division.len();
@ -850,7 +849,7 @@ impl AccountsHash {
pubkey_division.iter().enumerate().for_each(|(i, bins)| {
// check to make sure we can do bins[pubkey_bin]
if bins.len() > pubkey_bin {
let sub = &bins[pubkey_bin];
let sub = bins[pubkey_bin];
if !sub.is_empty() {
item_len += bins[pubkey_bin].len(); // sum for metrics
first_items.push(bins[pubkey_bin][0].pubkey);
@ -944,7 +943,7 @@ impl AccountsHash {
// vec: [..] - items which fit in the containing bin. Sorted by: Pubkey, higher Slot, higher Write version (if pubkey =)
pub fn rest_of_hash_calculation(
&self,
data_sections_by_pubkey: Vec<SortedDataByPubkey>,
data_sections_by_pubkey: Vec<SortedDataByPubkey<'_>>,
mut stats: &mut HashStats,
is_last_pass: bool,
mut previous_state: PreviousPass,
@ -1077,8 +1076,8 @@ pub mod tests {
assert_eq!(AccountsHash::div_ceil(10, 0), 0);
}
fn for_rest(original: &[CalculateHashIntermediate]) -> Vec<SortedDataByPubkey> {
vec![vec![original.to_vec()]]
fn for_rest(original: &[CalculateHashIntermediate]) -> Vec<SortedDataByPubkey<'_>> {
vec![vec![original]]
}
#[test]
@ -1150,8 +1149,10 @@ pub mod tests {
0
}
fn empty_data() -> Vec<SortedDataByPubkey> {
vec![vec![vec![]]]
const EMPTY_DATA: [CalculateHashIntermediate; 0] = [];
fn empty_data() -> Vec<SortedDataByPubkey<'static>> {
vec![vec![&EMPTY_DATA]]
}
#[test]
@ -1456,7 +1457,9 @@ pub mod tests {
lamports: 1,
..CalculateHashIntermediate::default()
}]]];
let (hashes, lamports, _) = AccountsHash::default().de_dup_accounts_in_parallel(&vec, 0);
let temp_vec = vec.to_vec();
let slice = convert_to_slice2(&temp_vec);
let (hashes, lamports, _) = AccountsHash::default().de_dup_accounts_in_parallel(&slice, 0);
assert_eq!(vec![&Hash::default()], hashes);
assert_eq!(lamports, 1);
}
@ -1567,23 +1570,28 @@ pub mod tests {
let slice2 = vec![vec![slice.to_vec()]];
let slice = &slice2[..];
let (hashes2, lamports2, _) = hash.de_dup_accounts_in_parallel(slice, 0);
let (hashes3, lamports3, _) = hash.de_dup_accounts_in_parallel(slice, 0);
let slice_temp = convert_to_slice2(&slice2);
let (hashes2, lamports2, _) = hash.de_dup_accounts_in_parallel(&slice_temp, 0);
let slice3 = convert_to_slice2(&slice2);
let (hashes3, lamports3, _) = hash.de_dup_accounts_in_parallel(&slice3, 0);
let vec = slice.to_vec();
let slice4 = convert_to_slice2(&vec);
let (hashes4, lamports4) = hash.de_dup_and_eliminate_zeros(
&vec,
&slice4,
&mut HashStats::default(),
end - start,
);
let vec = slice.to_vec();
let slice5 = convert_to_slice2(&vec);
let (hashes5, lamports5) = hash.de_dup_and_eliminate_zeros(
&vec,
&slice5,
&mut HashStats::default(),
end - start,
);
let vec = slice.to_vec();
let slice5 = convert_to_slice2(&vec);
let (hashes6, lamports6) = hash.de_dup_and_eliminate_zeros(
&vec,
&slice5,
&mut HashStats::default(),
end - start,
);
@ -1692,9 +1700,9 @@ pub mod tests {
);
}
fn test_de_dup_accounts_in_parallel(
account_maps: &[SortedDataByPubkey],
) -> (Vec<&Hash>, u64, usize) {
fn test_de_dup_accounts_in_parallel<'a>(
account_maps: &'a [SortedDataByPubkey<'a>],
) -> (Vec<&'a Hash>, u64, usize) {
AccountsHash::default().de_dup_accounts_in_parallel(account_maps, 0)
}
@ -1709,7 +1717,8 @@ pub mod tests {
account_maps.push(val.clone());
let vecs = vec![vec![account_maps.to_vec()]];
let result = test_de_dup_accounts_in_parallel(&vecs);
let slice = convert_to_slice2(&vecs);
let result = test_de_dup_accounts_in_parallel(&slice);
assert_eq!(result, (vec![&val.hash], val.lamports as u64, 1));
// zero original lamports, higher version
@ -1717,7 +1726,8 @@ pub mod tests {
account_maps.push(val); // has to be after previous entry since account_maps are in slot order
let vecs = vec![vec![account_maps.to_vec()]];
let result = test_de_dup_accounts_in_parallel(&vecs);
let slice = convert_to_slice2(&vecs);
let result = test_de_dup_accounts_in_parallel(&slice);
assert_eq!(result, (vec![], 0, 2));
}
@ -2087,7 +2097,22 @@ pub mod tests {
),
CalculateHashIntermediate::new(Hash::new(&[2u8; 32]), offset + 1, Pubkey::new_unique()),
];
AccountsHash::default().de_dup_accounts_in_parallel(&[vec![input]], 0);
AccountsHash::default().de_dup_accounts_in_parallel(&[convert_to_slice(&[input])], 0);
}
fn convert_to_slice(
input: &[Vec<CalculateHashIntermediate>],
) -> Vec<&[CalculateHashIntermediate]> {
input.iter().map(|v| &v[..]).collect::<Vec<_>>()
}
fn convert_to_slice2(
input: &[Vec<Vec<CalculateHashIntermediate>>],
) -> Vec<Vec<&[CalculateHashIntermediate]>> {
input
.iter()
.map(|v| v.iter().map(|v| &v[..]).collect::<Vec<_>>())
.collect::<Vec<_>>()
}
#[test]
@ -2109,7 +2134,7 @@ pub mod tests {
)],
];
AccountsHash::default().de_dup_and_eliminate_zeros(
&[input],
&[convert_to_slice(&input)],
&mut HashStats::default(),
2, // accounts above are in 2 groups
);

View File

@ -1,9 +1,8 @@
//! Cached data for hashing accounts
#[cfg(test)]
use crate::pubkey_bins::PubkeyBinCalculator24;
use {
crate::{
accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats,
pubkey_bins::PubkeyBinCalculator24,
},
crate::{accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats},
memmap2::MmapMut,
solana_measure::measure::Measure,
std::{
@ -36,6 +35,7 @@ impl CacheHashDataFile {
self.get_slice(0)
}
#[cfg(test)]
/// Populate 'accumulator' from entire contents of the cache file.
pub(crate) fn load_all(
&self,
@ -201,6 +201,7 @@ impl CacheHashData {
parent_folder.as_ref().join("calculate_accounts_hash_cache")
}
#[cfg(test)]
/// load from 'file_name' into 'accumulator'
pub(crate) fn load<P: AsRef<Path> + std::fmt::Debug>(
&self,