From b57e86abf2ed48ba8e5a0d096bcbd87eb1e648d5 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Mon, 13 Sep 2021 20:39:26 -0500 Subject: [PATCH] cache account hash info (#19426) * cache account hash info * ledger_path -> accounts_hash_cache_path --- core/src/accounts_hash_verifier.rs | 11 +- core/src/tvu.rs | 1 + core/tests/snapshots.rs | 2 + ledger-tool/src/main.rs | 6 +- runtime/src/accounts_db.rs | 261 +++++++++++++--- runtime/src/cache_hash_data.rs | 437 +++++++++++++++++++++++++++ runtime/src/cache_hash_data_stats.rs | 59 ++++ runtime/src/lib.rs | 2 + validator/src/main.rs | 6 +- 9 files changed, 737 insertions(+), 48 deletions(-) create mode 100644 runtime/src/cache_hash_data.rs create mode 100644 runtime/src/cache_hash_data_stats.rs diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index eb2d5851b..1e345a296 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -20,6 +20,7 @@ use solana_runtime::{ use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use std::collections::{HashMap, HashSet}; use std::{ + path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, Ordering}, mpsc::RecvTimeoutError, @@ -43,6 +44,7 @@ impl AccountsHashVerifier { halt_on_trusted_validators_accounts_hash_mismatch: bool, fault_injection_rate_slots: u64, snapshot_config: Option, + ledger_path: PathBuf, ) -> Self { let exit = exit.clone(); let cluster_info = cluster_info.clone(); @@ -74,6 +76,7 @@ impl AccountsHashVerifier { fault_injection_rate_slots, snapshot_config.as_ref(), thread_pool.as_ref(), + &ledger_path, ); } Err(RecvTimeoutError::Disconnected) => break, @@ -99,8 +102,9 @@ impl AccountsHashVerifier { fault_injection_rate_slots: u64, snapshot_config: Option<&SnapshotConfig>, thread_pool: Option<&ThreadPool>, + ledger_path: &Path, ) { - Self::verify_accounts_package_hash(&accounts_package, thread_pool); + Self::verify_accounts_package_hash(&accounts_package, thread_pool, ledger_path); Self::push_accounts_hashes_to_cluster( &accounts_package, @@ -118,11 +122,13 @@ impl AccountsHashVerifier { fn verify_accounts_package_hash( accounts_package: &AccountsPackage, thread_pool: Option<&ThreadPool>, + ledger_path: &Path, ) { let mut measure_hash = Measure::start("hash"); if let Some(expected_hash) = accounts_package.hash_for_testing { let sorted_storages = SortedStorages::new(&accounts_package.snapshot_storages); let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index( + ledger_path, &sorted_storages, thread_pool, HashStats::default(), @@ -357,6 +363,8 @@ mod tests { snapshot_type: None, }; + let ledger_path = TempDir::new().unwrap(); + AccountsHashVerifier::process_accounts_package( accounts_package, &cluster_info, @@ -368,6 +376,7 @@ mod tests { 0, Some(&snapshot_config), None, + ledger_path.path(), ); // sleep for 1ms to create a newer timestmap for gossip entry diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d066b8834..256060570 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -224,6 +224,7 @@ impl Tvu { tvu_config.halt_on_trusted_validators_accounts_hash_mismatch, tvu_config.accounts_hash_fault_injection_slots, snapshot_config.clone(), + blockstore.ledger_path().to_path_buf(), ); let (snapshot_request_sender, snapshot_request_handler) = match snapshot_config { diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index ae5d59452..1cb323ee5 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -928,6 +928,7 @@ mod tests { snapshot_test_config.snapshot_config.clone(), ); + let tmpdir = TempDir::new().unwrap(); let accounts_hash_verifier = AccountsHashVerifier::new( accounts_package_receiver, Some(pending_snapshot_package), @@ -937,6 +938,7 @@ mod tests { false, 0, Some(snapshot_test_config.snapshot_config.clone()), + tmpdir.path().to_path_buf(), ); let accounts_background_service = AccountsBackgroundService::new( diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 46520761a..ef7f79196 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -1893,8 +1893,10 @@ fn main() { .ok() .map(|bins| AccountsIndexConfig { bins: Some(bins) }); - let accounts_db_config = - accounts_index_config.map(|x| AccountsDbConfig { index: Some(x) }); + let accounts_db_config = Some(AccountsDbConfig { + index: accounts_index_config, + accounts_hash_cache_path: Some(ledger_path.clone()), + }); let process_options = ProcessOptions { dev_halt_at_slot: value_t!(arg_matches, "halt_at_slot", Slot).ok(), diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 0c78a1a2a..5febd52a9 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -29,6 +29,7 @@ use crate::{ }, ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, + cache_hash_data::CacheHashData, contains::Contains, pubkey_bins::PubkeyBinCalculator16, read_only_accounts_cache::ReadOnlyAccountsCache, @@ -61,6 +62,7 @@ use std::{ boxed::Box, collections::{hash_map::Entry, BTreeSet, HashMap, HashSet}, convert::TryFrom, + hash::{Hash as StdHash, Hasher as StdHasher}, io::{Error as IoError, Result as IoResult}, ops::{Range, RangeBounds}, path::{Path, PathBuf}, @@ -101,7 +103,7 @@ pub const BINS_PER_PASS: usize = PUBKEY_BINS_FOR_CALCULATING_HASHES / NUM_SCAN_P // If this is too big, we don't get enough parallelism of scanning storages. // If this is too small, then we produce too many output vectors to iterate. // Metrics indicate a sweet spot in the 2.5k-5k range for mnb. -const MAX_ITEMS_PER_CHUNK: Slot = 5_000; +const MAX_ITEMS_PER_CHUNK: Slot = 2_500; // A specially reserved storage id just for entries in the cache, so that // operations that take a storage entry can maintain a common interface @@ -124,9 +126,11 @@ const CACHE_VIRTUAL_STORED_SIZE: usize = 0; pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig { index: Some(ACCOUNTS_INDEX_CONFIG_FOR_TESTING), + accounts_hash_cache_path: None, }; pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig { index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS), + accounts_hash_cache_path: None, }; pub type BinnedHashData = Vec>; @@ -134,6 +138,7 @@ pub type BinnedHashData = Vec>; #[derive(Debug, Default, Clone)] pub struct AccountsDbConfig { pub index: Option, + pub accounts_hash_cache_path: Option, } struct FoundStoredAccount<'a> { @@ -956,6 +961,12 @@ pub struct AccountsDb { /// Set of storage paths to pick from pub(crate) paths: Vec, + accounts_hash_cache_path: PathBuf, + + // used by tests + // holds this until we are dropped + temp_accounts_hash_cache_path: Option, + pub shrink_paths: RwLock>>, /// Directory of paths this accounts_db needs to hold/remove @@ -1423,13 +1434,26 @@ type GenerateIndexAccountsMap<'a> = HashMap>; impl AccountsDb { pub fn default_for_tests() -> Self { - Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests()) + Self::default_with_accounts_index(AccountInfoAccountsIndex::default_for_tests(), None) } - fn default_with_accounts_index(accounts_index: AccountInfoAccountsIndex) -> Self { + fn default_with_accounts_index( + accounts_index: AccountInfoAccountsIndex, + accounts_hash_cache_path: Option, + ) -> Self { let num_threads = get_thread_count(); const MAX_READ_ONLY_CACHE_DATA_SIZE: usize = 200_000_000; + let mut temp_accounts_hash_cache_path = None; + let accounts_hash_cache_path = accounts_hash_cache_path.unwrap_or_else(|| { + temp_accounts_hash_cache_path = Some(TempDir::new().unwrap()); + temp_accounts_hash_cache_path + .as_ref() + .unwrap() + .path() + .to_path_buf() + }); + let mut bank_hashes = HashMap::new(); bank_hashes.insert(0, BankHashInfo::default()); AccountsDb { @@ -1445,6 +1469,8 @@ impl AccountsDb { shrink_candidate_slots: Mutex::new(HashMap::new()), write_version: AtomicU64::new(0), paths: vec![], + accounts_hash_cache_path, + temp_accounts_hash_cache_path, shrink_paths: RwLock::new(None), temp_paths: None, file_size: DEFAULT_FILE_SIZE, @@ -1495,7 +1521,11 @@ impl AccountsDb { shrink_ratio: AccountShrinkThreshold, accounts_db_config: Option, ) -> Self { - let accounts_index = AccountsIndex::new(accounts_db_config.and_then(|x| x.index)); + let accounts_index = + AccountsIndex::new(accounts_db_config.as_ref().and_then(|x| x.index.clone())); + let accounts_hash_cache_path = accounts_db_config + .as_ref() + .and_then(|x| x.accounts_hash_cache_path.clone()); let mut new = if !paths.is_empty() { Self { paths, @@ -1504,7 +1534,7 @@ impl AccountsDb { account_indexes, caching_enabled, shrink_ratio, - ..Self::default_with_accounts_index(accounts_index) + ..Self::default_with_accounts_index(accounts_index, accounts_hash_cache_path) } } else { // Create a temporary set of accounts directories, used primarily @@ -1517,7 +1547,7 @@ impl AccountsDb { account_indexes, caching_enabled, shrink_ratio, - ..Self::default_with_accounts_index(accounts_index) + ..Self::default_with_accounts_index(accounts_index, accounts_hash_cache_path) } }; @@ -4939,7 +4969,8 @@ impl AccountsDb { } /// Scan through all the account storage in parallel - fn scan_account_storage_no_bank( + fn scan_account_storage_no_bank( + cache_hash_data: &CacheHashData, accounts_cache_and_ancestors: Option<( &AccountsCache, &Ancestors, @@ -4948,32 +4979,113 @@ impl AccountsDb { snapshot_storages: &SortedStorages, scan_func: F, after_func: F2, - ) -> Vec + bin_range: &Range, + bin_calculator: &PubkeyBinCalculator16, + ) -> Vec where - F: Fn(LoadedAccount, &mut B, Slot) + Send + Sync, - F2: Fn(B) -> C + Send + Sync, - B: Send + Default, - C: Send + Default, + F: Fn(LoadedAccount, &mut BinnedHashData, Slot) + Send + Sync, + F2: Fn(BinnedHashData) -> BinnedHashData + Send + Sync, { - let chunks = 1 + (snapshot_storages.range_width() as Slot / MAX_ITEMS_PER_CHUNK); + let start_bin_index = bin_range.start; + + let width = snapshot_storages.range_width(); + // 2 is for 2 special chunks - unaligned slots at the beginning and end + let chunks = 2 + (width as Slot / MAX_ITEMS_PER_CHUNK); + let range = snapshot_storages.range(); + let slot0 = range.start; + let first_boundary = + ((slot0 + MAX_ITEMS_PER_CHUNK) / MAX_ITEMS_PER_CHUNK) * MAX_ITEMS_PER_CHUNK; (0..chunks) .into_par_iter() .map(|chunk| { - let mut retval = B::default(); - let start = snapshot_storages.range().start + chunk * MAX_ITEMS_PER_CHUNK; - let end = std::cmp::min(start + MAX_ITEMS_PER_CHUNK, snapshot_storages.range().end); + let mut retval = vec![]; + // calculate start, end + let (start, mut end) = if chunk == 0 { + if slot0 == first_boundary { + return after_func(retval); // if we evenly divide, nothing for special chunk 0 to do + } + // otherwise first chunk is not 'full' + (slot0, first_boundary) + } else { + // normal chunk in the middle or at the end + let start = first_boundary + MAX_ITEMS_PER_CHUNK * (chunk - 1); + let end = start + MAX_ITEMS_PER_CHUNK; + (start, end) + }; + end = std::cmp::min(end, range.end); + if start == end { + return after_func(retval); + } + + let mut file_name = String::default(); + if accounts_cache_and_ancestors.is_none() + && end.saturating_sub(start) == MAX_ITEMS_PER_CHUNK + { + let mut load_from_cache = true; + let mut hasher = std::collections::hash_map::DefaultHasher::new(); // wrong one? + + for slot in start..end { + let sub_storages = snapshot_storages.get(slot); + bin_range.start.hash(&mut hasher); + bin_range.end.hash(&mut hasher); + if let Some(sub_storages) = sub_storages { + if sub_storages.len() > 1 { + load_from_cache = false; + break; + } + let storage_file = sub_storages.first().unwrap().accounts.get_path(); + slot.hash(&mut hasher); + storage_file.hash(&mut hasher); + // check alive_bytes, etc. here? + let amod = std::fs::metadata(storage_file); + if amod.is_err() { + load_from_cache = false; + break; + } + let amod = amod.unwrap().modified(); + if amod.is_err() { + load_from_cache = false; + break; + } + let amod = amod + .unwrap() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + amod.hash(&mut hasher); + } + } + if load_from_cache { + // we have a hash value for all the storages in this slot + // so, build a file name: + let hash = hasher.finish(); + file_name = format!( + "{}.{}.{}.{}.{}", + start, end, bin_range.start, bin_range.end, hash + ); + if retval.is_empty() { + let range = bin_range.end - bin_range.start; + retval.append(&mut vec![Vec::new(); range]); + } + if cache_hash_data + .load( + &Path::new(&file_name), + &mut retval, + start_bin_index, + bin_calculator, + ) + .is_ok() + { + return retval; + } + + // fall through and load normally - we failed to load + } + } + for slot in start..end { let sub_storages = snapshot_storages.get(slot); - let mut valid_slot = false; - if let Some(sub_storages) = sub_storages { - valid_slot = true; - Self::scan_multiple_account_storages_one_slot( - sub_storages, - &scan_func, - slot, - &mut retval, - ); - } + let valid_slot = sub_storages.is_some(); if let Some((cache, ancestors, accounts_index)) = accounts_cache_and_ancestors { if let Some(slot_cache) = cache.slot_cache(slot) { if valid_slot @@ -4994,9 +5106,30 @@ impl AccountsDb { } } } + + if let Some(sub_storages) = sub_storages { + Self::scan_multiple_account_storages_one_slot( + sub_storages, + &scan_func, + slot, + &mut retval, + ); + } } - after_func(retval) + let r = after_func(retval); + if !file_name.is_empty() { + let result = cache_hash_data.save(Path::new(&file_name), &r); + + if result.is_err() { + info!( + "FAILED_TO_SAVE: {}-{}, {}, first_boundary: {}, {:?}", + range.start, range.end, width, first_boundary, file_name, + ); + } + } + r }) + .filter(|x| !x.is_empty()) .collect() } @@ -5057,6 +5190,7 @@ impl AccountsDb { }; Self::calculate_accounts_hash_without_index( + &self.accounts_hash_cache_path, &storages, Some(&self.thread_pool_clean), timings, @@ -5136,6 +5270,7 @@ impl AccountsDb { } fn scan_snapshot_stores_with_cache( + cache_hash_data: &CacheHashData, storage: &SortedStorages, mut stats: &mut crate::accounts_hash::HashStats, bins: usize, @@ -5156,6 +5291,7 @@ impl AccountsDb { let sort_time = AtomicU64::new(0); let result: Vec = Self::scan_account_storage_no_bank( + cache_hash_data, accounts_cache_and_ancestors, storage, |loaded_account: LoadedAccount, accum: &mut BinnedHashData, slot: Slot| { @@ -5189,10 +5325,8 @@ impl AccountsDb { mismatch_found.fetch_add(1, Ordering::Relaxed); } } - - let max = accum.len(); - if max == 0 { - accum.extend(vec![Vec::new(); range]); + if accum.is_empty() { + accum.append(&mut vec![Vec::new(); range]); } accum[pubkey_to_bin_index].push(source_item); }, @@ -5201,6 +5335,8 @@ impl AccountsDb { sort_time.fetch_add(timing, Ordering::Relaxed); result }, + bin_range, + &bin_calculator, ); stats.sort_time_total_us += sort_time.load(Ordering::Relaxed); @@ -5242,6 +5378,7 @@ impl AccountsDb { // modeled after get_accounts_delta_hash // intended to be faster than calculate_accounts_hash pub fn calculate_accounts_hash_without_index( + accounts_hash_cache_path: &Path, storages: &SortedStorages, thread_pool: Option<&ThreadPool>, mut stats: HashStats, @@ -5260,6 +5397,8 @@ impl AccountsDb { let mut previous_pass = PreviousPass::default(); let mut final_result = (Hash::default(), 0); + let cache_hash_data = CacheHashData::new(&accounts_hash_cache_path); + for pass in 0..NUM_SCAN_PASSES { let bounds = Range { start: pass * BINS_PER_PASS, @@ -5267,6 +5406,7 @@ impl AccountsDb { }; let result = Self::scan_snapshot_stores_with_cache( + &cache_hash_data, storages, &mut stats, PUBKEY_BINS_FOR_CALCULATING_HASHES, @@ -6753,7 +6893,17 @@ pub mod tests { bin_range: &Range, check_hash: bool, ) -> Result, BankHashVerificationError> { - Self::scan_snapshot_stores_with_cache(storage, stats, bins, bin_range, check_hash, None) + let temp_dir = TempDir::new().unwrap(); + let accounts_hash_cache_path = temp_dir.path(); + Self::scan_snapshot_stores_with_cache( + &CacheHashData::new(&accounts_hash_cache_path), + storage, + stats, + bins, + bin_range, + check_hash, + None, + ) } } @@ -6960,7 +7110,8 @@ pub mod tests { ) .unwrap(); assert_eq!(result.len(), 2); // 2 chunks - assert_eq!(result[0].len(), 0); // nothing found in first slots + assert_eq!(result[0].len(), bins); + assert_eq!(0, result[0].iter().map(|x| x.len()).sum::()); // nothing found in bin 0 assert_eq!(result[1].len(), bins); assert_eq!(result[1], vec![raw_expected]); } @@ -7029,6 +7180,7 @@ pub mod tests { let bins = 256; let bin_locations = vec![0, 127, 128, 255]; + let range = 1; for bin in 0..bins { let result = AccountsDb::scan_snapshot_stores( &get_storage_refs(&storages), @@ -7036,17 +7188,17 @@ pub mod tests { bins, &Range { start: bin, - end: bin + 1, + end: bin + range, }, false, ) .unwrap(); let mut expected = vec![]; if let Some(index) = bin_locations.iter().position(|&r| r == bin) { - expected = vec![Vec::new(); 1]; - expected[0].push(raw_expected[index].clone()); + expected = vec![vec![Vec::new(); range]]; + expected[0][0].push(raw_expected[index].clone()); } - assert_eq!(result, vec![expected]); + assert_eq!(result, expected); } } @@ -7063,20 +7215,23 @@ pub mod tests { SortedStorages::new_debug(&storage_data[..], 0, MAX_ITEMS_PER_CHUNK as usize + 1); let mut stats = HashStats::default(); + let range = 1; + let start = 127; let result = AccountsDb::scan_snapshot_stores( &sorted_storages, &mut stats, bins, &Range { - start: 127, - end: 128, + start, + end: start + range, }, false, ) .unwrap(); assert_eq!(result.len(), 2); // 2 chunks - assert_eq!(result[0].len(), 0); // nothing found in first slots - let mut expected = vec![Vec::new(); 1]; + assert_eq!(result[0].len(), range); + assert_eq!(0, result[0].iter().map(|x| x.len()).sum::()); // nothing found in bin 0 + let mut expected = vec![Vec::new(); range]; expected[0].push(raw_expected[1].clone()); assert_eq!(result[1].len(), 1); assert_eq!(result[1], expected); @@ -7088,6 +7243,7 @@ pub mod tests { let (storages, _size, _slot_expected) = sample_storage(); let result = AccountsDb::calculate_accounts_hash_without_index( + TempDir::new().unwrap().path(), &get_storage_refs(&storages), None, HashStats::default(), @@ -7110,6 +7266,7 @@ pub mod tests { }); let sum = raw_expected.iter().map(|item| item.lamports).sum(); let result = AccountsDb::calculate_accounts_hash_without_index( + TempDir::new().unwrap().path(), &get_storage_refs(&storages), None, HashStats::default(), @@ -7161,19 +7318,35 @@ pub mod tests { .append_accounts(&[(sm, Some(&acc))], &[&Hash::default()]); let calls = AtomicU64::new(0); + let temp_dir = TempDir::new().unwrap(); + let accounts_hash_cache_path = temp_dir.path(); let result = AccountsDb::scan_account_storage_no_bank( + &CacheHashData::new(&accounts_hash_cache_path), None, &get_storage_refs(&storages), - |loaded_account: LoadedAccount, accum: &mut Vec, slot: Slot| { + |loaded_account: LoadedAccount, accum: &mut BinnedHashData, slot: Slot| { calls.fetch_add(1, Ordering::Relaxed); assert_eq!(loaded_account.pubkey(), &pubkey); assert_eq!(slot_expected, slot); - accum.push(expected); + accum.push(vec![CalculateHashIntermediate::new( + Hash::default(), + expected, + pubkey, + )]); }, |a| a, + &Range { start: 0, end: 1 }, + &PubkeyBinCalculator16::new(1), ); assert_eq!(calls.load(Ordering::Relaxed), 1); - assert_eq!(result, vec![vec![expected]]); + assert_eq!( + result, + vec![vec![vec![CalculateHashIntermediate::new( + Hash::default(), + expected, + pubkey + )]]] + ); } #[test] diff --git a/runtime/src/cache_hash_data.rs b/runtime/src/cache_hash_data.rs new file mode 100644 index 000000000..b4948dd63 --- /dev/null +++ b/runtime/src/cache_hash_data.rs @@ -0,0 +1,437 @@ +//! Cached data for hashing accounts +use crate::accounts_hash::CalculateHashIntermediate; +use crate::cache_hash_data_stats::CacheHashDataStats; +use crate::pubkey_bins::PubkeyBinCalculator16; +use log::*; +use memmap2::MmapMut; +use solana_measure::measure::Measure; +use std::collections::HashSet; +use std::fs::{self}; +use std::fs::{remove_file, OpenOptions}; +use std::io::Seek; +use std::io::SeekFrom; +use std::io::Write; +use std::path::Path; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +pub type EntryType = CalculateHashIntermediate; +pub type SavedType = Vec>; +pub type SavedTypeSlice = [Vec]; + +#[repr(C)] +pub struct Header { + count: usize, +} + +struct CacheHashDataFile { + cell_size: u64, + mmap: MmapMut, + capacity: u64, +} + +impl CacheHashDataFile { + fn get_mut(&mut self, ix: u64) -> &mut T { + let start = (ix * self.cell_size) as usize + std::mem::size_of::
(); + let end = start + std::mem::size_of::(); + assert!( + end <= self.capacity as usize, + "end: {}, capacity: {}, ix: {}, cell size: {}", + end, + self.capacity, + ix, + self.cell_size + ); + let item_slice: &[u8] = &self.mmap[start..end]; + unsafe { + let item = item_slice.as_ptr() as *mut T; + &mut *item + } + } + + fn get_header_mut(&mut self) -> &mut Header { + let start = 0_usize; + let end = start + std::mem::size_of::
(); + let item_slice: &[u8] = &self.mmap[start..end]; + unsafe { + let item = item_slice.as_ptr() as *mut Header; + &mut *item + } + } + + fn new_map(file: &Path, capacity: u64) -> Result { + let mut data = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(file)?; + + // Theoretical performance optimization: write a zero to the end of + // the file so that we won't have to resize it later, which may be + // expensive. + data.seek(SeekFrom::Start(capacity - 1)).unwrap(); + data.write_all(&[0]).unwrap(); + data.seek(SeekFrom::Start(0)).unwrap(); + data.flush().unwrap(); + Ok(unsafe { MmapMut::map_mut(&data).unwrap() }) + } + + fn load_map(file: &Path) -> Result { + let data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(file)?; + + Ok(unsafe { MmapMut::map_mut(&data).unwrap() }) + } +} + +pub type PreExistingCacheFiles = HashSet; +pub struct CacheHashData { + cache_folder: PathBuf, + pre_existing_cache_files: Arc>, + pub stats: Arc>, +} + +impl Drop for CacheHashData { + fn drop(&mut self) { + self.delete_old_cache_files(); + self.stats.lock().unwrap().report(); + } +} + +impl CacheHashData { + pub fn new + std::fmt::Debug>(parent_folder: &P) -> CacheHashData { + let cache_folder = Self::get_cache_root_path(parent_folder); + + std::fs::create_dir_all(cache_folder.clone()) + .unwrap_or_else(|_| panic!("error creating cache dir: {:?}", cache_folder)); + + let result = CacheHashData { + cache_folder, + pre_existing_cache_files: Arc::new(Mutex::new(PreExistingCacheFiles::default())), + stats: Arc::new(Mutex::new(CacheHashDataStats::default())), + }; + + result.get_cache_files(); + result + } + fn delete_old_cache_files(&self) { + let pre_existing_cache_files = self.pre_existing_cache_files.lock().unwrap(); + if !pre_existing_cache_files.is_empty() { + self.stats.lock().unwrap().unused_cache_files += pre_existing_cache_files.len(); + for file_name in pre_existing_cache_files.iter() { + let result = self.cache_folder.join(file_name); + let _ = fs::remove_file(result); + } + } + } + fn get_cache_files(&self) { + if self.cache_folder.is_dir() { + let dir = fs::read_dir(self.cache_folder.clone()); + if let Ok(dir) = dir { + let mut pre_existing = self.pre_existing_cache_files.lock().unwrap(); + for entry in dir.flatten() { + if let Some(name) = entry.path().file_name() { + pre_existing.insert(name.to_str().unwrap().to_string()); + } + } + self.stats.lock().unwrap().cache_file_count += pre_existing.len(); + } + } + } + + fn get_cache_root_path>(parent_folder: &P) -> PathBuf { + parent_folder.as_ref().join("calculate_accounts_hash_cache") + } + + pub fn load + std::fmt::Debug>( + &self, + file_name: &P, + accumulator: &mut SavedType, + start_bin_index: usize, + bin_calculator: &PubkeyBinCalculator16, + ) -> Result<(), std::io::Error> { + let mut stats = CacheHashDataStats::default(); + let result = self.load_internal( + file_name, + accumulator, + start_bin_index, + bin_calculator, + &mut stats, + ); + self.stats.lock().unwrap().merge(&stats); + result + } + + fn load_internal + std::fmt::Debug>( + &self, + file_name: &P, + accumulator: &mut SavedType, + start_bin_index: usize, + bin_calculator: &PubkeyBinCalculator16, + stats: &mut CacheHashDataStats, + ) -> Result<(), std::io::Error> { + let mut m = Measure::start("overall"); + let path = self.cache_folder.join(file_name); + let file_len = std::fs::metadata(path.clone())?.len(); + let mut m1 = Measure::start("read_file"); + let mmap = CacheHashDataFile::load_map(&path)?; + m1.stop(); + stats.read_us = m1.as_us(); + + let cell_size = std::mem::size_of::() as u64; + let mut cache_file = CacheHashDataFile { + mmap, + cell_size, + capacity: 0, + }; + let header = cache_file.get_header_mut(); + let entries = header.count; + + let capacity = cell_size * (entries as u64) + std::mem::size_of::
() as u64; + cache_file.capacity = capacity; + assert_eq!( + capacity, file_len, + "expected: {}, len on disk: {} {:?}, entries: {}, cell_size: {}", + capacity, file_len, path, entries, cell_size + ); + + stats.total_entries = entries; + stats.cache_file_size += capacity as usize; + + let file_name_lookup = file_name.as_ref().to_str().unwrap().to_string(); + let found = self + .pre_existing_cache_files + .lock() + .unwrap() + .remove(&file_name_lookup); + if !found { + info!( + "tried to mark {:?} as used, but it wasn't in the set, one example: {:?}", + file_name_lookup, + self.pre_existing_cache_files.lock().unwrap().iter().next() + ); + } + + stats.loaded_from_cache += 1; + stats.entries_loaded_from_cache += entries; + let mut m2 = Measure::start("decode"); + for i in 0..entries { + let d = cache_file.get_mut::(i as u64); + let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey); + assert!( + pubkey_to_bin_index >= start_bin_index, + "{}, {}", + pubkey_to_bin_index, + start_bin_index + ); // this would indicate we put a pubkey in too high of a bin + pubkey_to_bin_index -= start_bin_index; + accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here + } + + m2.stop(); + stats.decode_us += m2.as_us(); + m.stop(); + stats.load_us += m.as_us(); + Ok(()) + } + + pub fn save(&self, file_name: &Path, data: &SavedTypeSlice) -> Result<(), std::io::Error> { + let mut stats = CacheHashDataStats::default(); + let result = self.save_internal(file_name, data, &mut stats); + self.stats.lock().unwrap().merge(&stats); + result + } + + pub fn save_internal( + &self, + file_name: &Path, + data: &SavedTypeSlice, + stats: &mut CacheHashDataStats, + ) -> Result<(), std::io::Error> { + let mut m = Measure::start("save"); + let cache_path = self.cache_folder.join(file_name); + let create = true; + if create { + let _ignored = remove_file(&cache_path); + } + let cell_size = std::mem::size_of::() as u64; + let mut m1 = Measure::start("create save"); + let entries = data + .iter() + .map(|x: &Vec| x.len()) + .collect::>(); + let entries = entries.iter().sum::(); + let capacity = cell_size * (entries as u64) + std::mem::size_of::
() as u64; + + let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?; + m1.stop(); + stats.create_save_us += m1.as_us(); + let mut cache_file = CacheHashDataFile { + mmap, + cell_size, + capacity, + }; + + let mut header = cache_file.get_header_mut(); + header.count = entries; + + stats.cache_file_size = capacity as usize; + stats.total_entries = entries; + + let mut m2 = Measure::start("write_to_mmap"); + let mut i = 0; + data.iter().for_each(|x| { + x.iter().for_each(|item| { + let d = cache_file.get_mut::(i as u64); + i += 1; + *d = item.clone(); + }) + }); + assert_eq!(i, entries); + m2.stop(); + stats.write_to_mmap_us += m2.as_us(); + m.stop(); + stats.save_us += m.as_us(); + stats.saved_to_cache += 1; + Ok(()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use rand::Rng; + + #[test] + fn test_read_write() { + // generate sample data + // write to file + // read + // compare + use tempfile::TempDir; + let tmpdir = TempDir::new().unwrap(); + std::fs::create_dir_all(&tmpdir).unwrap(); + + for bins in [1, 2, 4] { + let bin_calculator = PubkeyBinCalculator16::new(bins); + let num_points = 5; + let (data, _total_points) = generate_test_data(num_points, bins, &bin_calculator); + for passes in [1, 2] { + let bins_per_pass = bins / passes; + if bins_per_pass == 0 { + continue; // illegal test case + } + for pass in 0..passes { + for flatten_data in [true, false] { + let mut data_this_pass = if flatten_data { + vec![vec![], vec![]] + } else { + vec![] + }; + let start_bin_this_pass = pass * bins_per_pass; + for bin in 0..bins_per_pass { + let mut this_bin_data = data[bin + start_bin_this_pass].clone(); + if flatten_data { + data_this_pass[0].append(&mut this_bin_data); + } else { + data_this_pass.push(this_bin_data); + } + } + let cache = CacheHashData::new(&tmpdir); + let file_name = "test"; + let file = Path::new(file_name).to_path_buf(); + cache.save(&file, &data_this_pass).unwrap(); + cache.get_cache_files(); + assert_eq!( + cache + .pre_existing_cache_files + .lock() + .unwrap() + .iter() + .collect::>(), + vec![file_name] + ); + let mut accum = (0..bins_per_pass).into_iter().map(|_| vec![]).collect(); + cache + .load(&file, &mut accum, start_bin_this_pass, &bin_calculator) + .unwrap(); + if flatten_data { + bin_data( + &mut data_this_pass, + &bin_calculator, + bins_per_pass, + start_bin_this_pass, + ); + } + assert_eq!( + accum, data_this_pass, + "bins: {}, start_bin_this_pass: {}, pass: {}, flatten: {}, passes: {}", + bins, start_bin_this_pass, pass, flatten_data, passes + ); + } + } + } + } + } + + fn bin_data( + data: &mut SavedType, + bin_calculator: &PubkeyBinCalculator16, + bins: usize, + start_bin: usize, + ) { + let mut accum: SavedType = (0..bins).into_iter().map(|_| vec![]).collect(); + data.drain(..).into_iter().for_each(|mut x| { + x.drain(..).into_iter().for_each(|item| { + let bin = bin_calculator.bin_from_pubkey(&item.pubkey); + accum[bin - start_bin].push(item); + }) + }); + *data = accum; + } + + fn generate_test_data( + count: usize, + bins: usize, + binner: &PubkeyBinCalculator16, + ) -> (SavedType, usize) { + let mut rng = rand::thread_rng(); + let mut ct = 0; + ( + (0..bins) + .into_iter() + .map(|bin| { + let rnd = rng.gen::() % (bins as u64); + if rnd < count as u64 { + (0..std::cmp::max(1, count / bins)) + .into_iter() + .map(|_| { + ct += 1; + let mut pk; + loop { + // expensive, but small numbers and for tests, so ok + pk = solana_sdk::pubkey::new_rand(); + if binner.bin_from_pubkey(&pk) == bin { + break; + } + } + + CalculateHashIntermediate::new( + solana_sdk::hash::new_rand(&mut rng), + ct as u64, + pk, + ) + }) + .collect::>() + } else { + vec![] + } + }) + .collect::>(), + ct, + ) + } +} diff --git a/runtime/src/cache_hash_data_stats.rs b/runtime/src/cache_hash_data_stats.rs new file mode 100644 index 000000000..fe9867ad6 --- /dev/null +++ b/runtime/src/cache_hash_data_stats.rs @@ -0,0 +1,59 @@ +//! Cached data for hashing accounts +#[derive(Default, Debug)] +pub struct CacheHashDataStats { + pub cache_file_size: usize, + pub cache_file_count: usize, + pub total_entries: usize, + pub loaded_from_cache: usize, + pub entries_loaded_from_cache: usize, + pub save_us: u64, + pub saved_to_cache: usize, + pub write_to_mmap_us: u64, + pub create_save_us: u64, + pub load_us: u64, + pub read_us: u64, + pub decode_us: u64, + pub merge_us: u64, + pub unused_cache_files: usize, +} + +impl CacheHashDataStats { + pub fn merge(&mut self, other: &CacheHashDataStats) { + self.cache_file_size += other.cache_file_size; + self.total_entries += other.total_entries; + self.loaded_from_cache += other.loaded_from_cache; + self.entries_loaded_from_cache += other.entries_loaded_from_cache; + self.load_us += other.load_us; + self.read_us += other.read_us; + self.decode_us += other.decode_us; + self.save_us += other.save_us; + self.saved_to_cache += other.saved_to_cache; + self.create_save_us += other.create_save_us; + self.cache_file_count += other.cache_file_count; + self.write_to_mmap_us += other.write_to_mmap_us; + self.unused_cache_files += other.unused_cache_files; + } + + pub fn report(&self) { + datapoint_info!( + "cache_hash_data_stats", + ("cache_file_size", self.cache_file_size, i64), + ("cache_file_count", self.cache_file_count, i64), + ("total_entries", self.total_entries, i64), + ("loaded_from_cache", self.loaded_from_cache, i64), + ("saved_to_cache", self.saved_to_cache, i64), + ( + "entries_loaded_from_cache", + self.entries_loaded_from_cache, + i64 + ), + ("save_us", self.save_us, i64), + ("write_to_mmap_us", self.write_to_mmap_us, i64), + ("create_save_us", self.create_save_us, i64), + ("load_us", self.load_us, i64), + ("read_us", self.read_us, i64), + ("decode_us", self.decode_us, i64), + ("unused_cache_files", self.unused_cache_files, i64), + ); + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 11c70a229..4e1a464a2 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -18,6 +18,8 @@ pub mod bloom; pub mod bucket_map_holder; pub mod bucket_map_holder_stats; pub mod builtins; +pub mod cache_hash_data; +pub mod cache_hash_data_stats; pub mod commitment; pub mod contains; pub mod epoch_stakes; diff --git a/validator/src/main.rs b/validator/src/main.rs index ae880f69f..9cbe8c7fc 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -2488,7 +2488,11 @@ pub fn main() { let accounts_index_config = value_t!(matches, "accounts_index_bins", usize) .ok() .map(|bins| AccountsIndexConfig { bins: Some(bins) }); - let accounts_db_config = accounts_index_config.map(|x| AccountsDbConfig { index: Some(x) }); + + let accounts_db_config = Some(AccountsDbConfig { + index: accounts_index_config, + accounts_hash_cache_path: Some(ledger_path.clone()), + }); let accountsdb_repl_service_config = if matches.is_present("enable_accountsdb_repl") { let accountsdb_repl_bind_address = if matches.is_present("accountsdb_repl_bind_address") {