From 052677595c4314d2d6e9a258c2556393575cf70c Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Mon, 9 Oct 2023 11:47:39 -0700 Subject: [PATCH] in hash calc, delete old cache files that will not be used earlier (#33432) * in hash calc, delete old cache files that will not be used earlier * only delete if supposed to * fmt --- accounts-db/src/accounts_db.rs | 185 +++++++++++++++++------------ accounts-db/src/cache_hash_data.rs | 28 +++-- 2 files changed, 124 insertions(+), 89 deletions(-) diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index deb04fd920..4291cdfe9a 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -181,6 +181,13 @@ impl<'a> StoreTo<'a> { } } +enum ScanAccountStorageResult { + /// this data has already been scanned and cached + CacheFileAlreadyExists(CacheHashDataFileReference), + /// this data needs to be scanned and cached + CacheFileNeedsToBeCreated((String, Range)), +} + #[derive(Default, Debug)] /// hold alive accounts /// alive means in the accounts index @@ -7222,90 +7229,114 @@ impl AccountsDb { .saturating_sub(slots_per_epoch); stats.scan_chunks = splitter.chunk_count; - (0..splitter.chunk_count) - .into_par_iter() - .map(|chunk| { - let mut scanner = scanner.clone(); + let cache_files = (0..splitter.chunk_count) + .into_par_iter() + .filter_map(|chunk| { let range_this_chunk = splitter.get_slot_range(chunk)?; - let file_name = { - let mut load_from_cache = true; - let mut hasher = hash_map::DefaultHasher::new(); - bin_range.start.hash(&mut hasher); - bin_range.end.hash(&mut hasher); - let is_first_scan_pass = bin_range.start == 0; + let mut load_from_cache = true; + let mut hasher = hash_map::DefaultHasher::new(); + bin_range.start.hash(&mut hasher); + bin_range.end.hash(&mut hasher); + let is_first_scan_pass = bin_range.start == 0; - // calculate hash representing all storages in this chunk - for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) { - if is_first_scan_pass && slot < one_epoch_old { - self.update_old_slot_stats(stats, storage); - } - if !Self::hash_storage_info(&mut hasher, storage, slot) { - load_from_cache = false; - break; - } - } - // we have a hash value for the storages in this chunk - // so, build a file name: - let hash = hasher.finish(); - let file_name = format!( - "{}.{}.{}.{}.{:016x}", - range_this_chunk.start, - range_this_chunk.end, - bin_range.start, - bin_range.end, - hash - ); - if load_from_cache { - if let Ok(mapped_file) = - cache_hash_data.get_file_reference_to_map_later(&file_name) - { - return Some(mapped_file); - } - } - - // fall through and load normally - we failed to load from a cache file - file_name - }; - - let mut init_accum = true; - // load from cache failed, so create the cache file for this chunk + // calculate hash representing all storages in this chunk + let mut empty = true; for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) { - let ancient = slot < oldest_non_ancient_slot; - let (_, scan_us) = measure_us!(if let Some(storage) = storage { - if init_accum { - let range = bin_range.end - bin_range.start; - scanner.init_accum(range); - init_accum = false; - } - scanner.set_slot(slot); - - Self::scan_single_account_storage(storage, &mut scanner); - }); - if ancient { - stats - .sum_ancient_scans_us - .fetch_add(scan_us, Ordering::Relaxed); - stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed); - stats - .longest_ancient_scan_us - .fetch_max(scan_us, Ordering::Relaxed); + empty = false; + if is_first_scan_pass && slot < one_epoch_old { + self.update_old_slot_stats(stats, storage); + } + if !Self::hash_storage_info(&mut hasher, storage, slot) { + load_from_cache = false; + break; + } + } + if empty { + return None; + } + // we have a hash value for the storages in this chunk + // so, build a file name: + let hash = hasher.finish(); + let file_name = format!( + "{}.{}.{}.{}.{:016x}", + range_this_chunk.start, + range_this_chunk.end, + bin_range.start, + bin_range.end, + hash + ); + if load_from_cache { + if let Ok(mapped_file) = + cache_hash_data.get_file_reference_to_map_later(&file_name) + { + return Some(ScanAccountStorageResult::CacheFileAlreadyExists( + mapped_file, + )); + } + } + + // fall through and load normally - we failed to load from a cache file but there are storages present + Some(ScanAccountStorageResult::CacheFileNeedsToBeCreated(( + file_name, + range_this_chunk, + ))) + }) + .collect::>(); + + // deletes the old files that will not be used before creating new ones + cache_hash_data.delete_old_cache_files(); + + cache_files + .into_par_iter() + .map(|chunk| { + match chunk { + ScanAccountStorageResult::CacheFileAlreadyExists(file) => Some(file), + ScanAccountStorageResult::CacheFileNeedsToBeCreated(( + file_name, + range_this_chunk, + )) => { + let mut scanner = scanner.clone(); + let mut init_accum = true; + // load from cache failed, so create the cache file for this chunk + for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) { + let ancient = slot < oldest_non_ancient_slot; + let (_, scan_us) = measure_us!(if let Some(storage) = storage { + if init_accum { + let range = bin_range.end - bin_range.start; + scanner.init_accum(range); + init_accum = false; + } + scanner.set_slot(slot); + + Self::scan_single_account_storage(storage, &mut scanner); + }); + if ancient { + stats + .sum_ancient_scans_us + .fetch_add(scan_us, Ordering::Relaxed); + stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed); + stats + .longest_ancient_scan_us + .fetch_max(scan_us, Ordering::Relaxed); + } + } + (!init_accum) + .then(|| { + let r = scanner.scanning_complete(); + assert!(!file_name.is_empty()); + (!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| { + // error if we can't write this + cache_hash_data.save(&file_name, &r).unwrap(); + cache_hash_data + .get_file_reference_to_map_later(&file_name) + .unwrap() + }) + }) + .flatten() } } - (!init_accum) - .then(|| { - let r = scanner.scanning_complete(); - assert!(!file_name.is_empty()); - (!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| { - // error if we can't write this - cache_hash_data.save(&file_name, &r).unwrap(); - cache_hash_data - .get_file_reference_to_map_later(&file_name) - .unwrap() - }) - }) - .flatten() }) .filter_map(|x| x) .collect() diff --git a/accounts-db/src/cache_hash_data.rs b/accounts-db/src/cache_hash_data.rs index 630d650b36..5ccb478620 100644 --- a/accounts-db/src/cache_hash_data.rs +++ b/accounts-db/src/cache_hash_data.rs @@ -198,9 +198,7 @@ pub(crate) struct CacheHashData { impl Drop for CacheHashData { fn drop(&mut self) { - if self.should_delete_old_cache_files_on_drop { - self.delete_old_cache_files(); - } + self.delete_old_cache_files(); self.stats.report(); } } @@ -224,18 +222,24 @@ impl CacheHashData { result.get_cache_files(); result } - fn delete_old_cache_files(&self) { - let old_cache_files = std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap()); - if !old_cache_files.is_empty() { - self.stats - .unused_cache_files - .fetch_add(old_cache_files.len(), Ordering::Relaxed); - for file_name in old_cache_files.iter() { - let result = self.cache_dir.join(file_name); - let _ = fs::remove_file(result); + + /// delete all pre-existing files that will not be used + pub(crate) fn delete_old_cache_files(&self) { + if self.should_delete_old_cache_files_on_drop { + let old_cache_files = + std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap()); + if !old_cache_files.is_empty() { + self.stats + .unused_cache_files + .fetch_add(old_cache_files.len(), Ordering::Relaxed); + for file_name in old_cache_files.iter() { + let result = self.cache_dir.join(file_name); + let _ = fs::remove_file(result); + } } } } + fn get_cache_files(&self) { if self.cache_dir.is_dir() { let dir = fs::read_dir(&self.cache_dir);