in hash calc, delete old cache files that will not be used earlier (#33432)
* in hash calc, delete old cache files that will not be used earlier * only delete if supposed to * fmt
This commit is contained in:
parent
c924719040
commit
052677595c
|
@ -181,6 +181,13 @@ impl<'a> StoreTo<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
enum ScanAccountStorageResult {
|
||||
/// this data has already been scanned and cached
|
||||
CacheFileAlreadyExists(CacheHashDataFileReference),
|
||||
/// this data needs to be scanned and cached
|
||||
CacheFileNeedsToBeCreated((String, Range<Slot>)),
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
/// hold alive accounts
|
||||
/// alive means in the accounts index
|
||||
|
@ -7222,90 +7229,114 @@ impl AccountsDb {
|
|||
.saturating_sub(slots_per_epoch);
|
||||
|
||||
stats.scan_chunks = splitter.chunk_count;
|
||||
(0..splitter.chunk_count)
|
||||
.into_par_iter()
|
||||
.map(|chunk| {
|
||||
let mut scanner = scanner.clone();
|
||||
|
||||
let cache_files = (0..splitter.chunk_count)
|
||||
.into_par_iter()
|
||||
.filter_map(|chunk| {
|
||||
let range_this_chunk = splitter.get_slot_range(chunk)?;
|
||||
|
||||
let file_name = {
|
||||
let mut load_from_cache = true;
|
||||
let mut hasher = hash_map::DefaultHasher::new();
|
||||
bin_range.start.hash(&mut hasher);
|
||||
bin_range.end.hash(&mut hasher);
|
||||
let is_first_scan_pass = bin_range.start == 0;
|
||||
let mut load_from_cache = true;
|
||||
let mut hasher = hash_map::DefaultHasher::new();
|
||||
bin_range.start.hash(&mut hasher);
|
||||
bin_range.end.hash(&mut hasher);
|
||||
let is_first_scan_pass = bin_range.start == 0;
|
||||
|
||||
// calculate hash representing all storages in this chunk
|
||||
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
|
||||
if is_first_scan_pass && slot < one_epoch_old {
|
||||
self.update_old_slot_stats(stats, storage);
|
||||
}
|
||||
if !Self::hash_storage_info(&mut hasher, storage, slot) {
|
||||
load_from_cache = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
// we have a hash value for the storages in this chunk
|
||||
// so, build a file name:
|
||||
let hash = hasher.finish();
|
||||
let file_name = format!(
|
||||
"{}.{}.{}.{}.{:016x}",
|
||||
range_this_chunk.start,
|
||||
range_this_chunk.end,
|
||||
bin_range.start,
|
||||
bin_range.end,
|
||||
hash
|
||||
);
|
||||
if load_from_cache {
|
||||
if let Ok(mapped_file) =
|
||||
cache_hash_data.get_file_reference_to_map_later(&file_name)
|
||||
{
|
||||
return Some(mapped_file);
|
||||
}
|
||||
}
|
||||
|
||||
// fall through and load normally - we failed to load from a cache file
|
||||
file_name
|
||||
};
|
||||
|
||||
let mut init_accum = true;
|
||||
// load from cache failed, so create the cache file for this chunk
|
||||
// calculate hash representing all storages in this chunk
|
||||
let mut empty = true;
|
||||
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
|
||||
let ancient = slot < oldest_non_ancient_slot;
|
||||
let (_, scan_us) = measure_us!(if let Some(storage) = storage {
|
||||
if init_accum {
|
||||
let range = bin_range.end - bin_range.start;
|
||||
scanner.init_accum(range);
|
||||
init_accum = false;
|
||||
}
|
||||
scanner.set_slot(slot);
|
||||
|
||||
Self::scan_single_account_storage(storage, &mut scanner);
|
||||
});
|
||||
if ancient {
|
||||
stats
|
||||
.sum_ancient_scans_us
|
||||
.fetch_add(scan_us, Ordering::Relaxed);
|
||||
stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed);
|
||||
stats
|
||||
.longest_ancient_scan_us
|
||||
.fetch_max(scan_us, Ordering::Relaxed);
|
||||
empty = false;
|
||||
if is_first_scan_pass && slot < one_epoch_old {
|
||||
self.update_old_slot_stats(stats, storage);
|
||||
}
|
||||
if !Self::hash_storage_info(&mut hasher, storage, slot) {
|
||||
load_from_cache = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if empty {
|
||||
return None;
|
||||
}
|
||||
// we have a hash value for the storages in this chunk
|
||||
// so, build a file name:
|
||||
let hash = hasher.finish();
|
||||
let file_name = format!(
|
||||
"{}.{}.{}.{}.{:016x}",
|
||||
range_this_chunk.start,
|
||||
range_this_chunk.end,
|
||||
bin_range.start,
|
||||
bin_range.end,
|
||||
hash
|
||||
);
|
||||
if load_from_cache {
|
||||
if let Ok(mapped_file) =
|
||||
cache_hash_data.get_file_reference_to_map_later(&file_name)
|
||||
{
|
||||
return Some(ScanAccountStorageResult::CacheFileAlreadyExists(
|
||||
mapped_file,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// fall through and load normally - we failed to load from a cache file but there are storages present
|
||||
Some(ScanAccountStorageResult::CacheFileNeedsToBeCreated((
|
||||
file_name,
|
||||
range_this_chunk,
|
||||
)))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// deletes the old files that will not be used before creating new ones
|
||||
cache_hash_data.delete_old_cache_files();
|
||||
|
||||
cache_files
|
||||
.into_par_iter()
|
||||
.map(|chunk| {
|
||||
match chunk {
|
||||
ScanAccountStorageResult::CacheFileAlreadyExists(file) => Some(file),
|
||||
ScanAccountStorageResult::CacheFileNeedsToBeCreated((
|
||||
file_name,
|
||||
range_this_chunk,
|
||||
)) => {
|
||||
let mut scanner = scanner.clone();
|
||||
let mut init_accum = true;
|
||||
// load from cache failed, so create the cache file for this chunk
|
||||
for (slot, storage) in snapshot_storages.iter_range(&range_this_chunk) {
|
||||
let ancient = slot < oldest_non_ancient_slot;
|
||||
let (_, scan_us) = measure_us!(if let Some(storage) = storage {
|
||||
if init_accum {
|
||||
let range = bin_range.end - bin_range.start;
|
||||
scanner.init_accum(range);
|
||||
init_accum = false;
|
||||
}
|
||||
scanner.set_slot(slot);
|
||||
|
||||
Self::scan_single_account_storage(storage, &mut scanner);
|
||||
});
|
||||
if ancient {
|
||||
stats
|
||||
.sum_ancient_scans_us
|
||||
.fetch_add(scan_us, Ordering::Relaxed);
|
||||
stats.count_ancient_scans.fetch_add(1, Ordering::Relaxed);
|
||||
stats
|
||||
.longest_ancient_scan_us
|
||||
.fetch_max(scan_us, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
(!init_accum)
|
||||
.then(|| {
|
||||
let r = scanner.scanning_complete();
|
||||
assert!(!file_name.is_empty());
|
||||
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
|
||||
// error if we can't write this
|
||||
cache_hash_data.save(&file_name, &r).unwrap();
|
||||
cache_hash_data
|
||||
.get_file_reference_to_map_later(&file_name)
|
||||
.unwrap()
|
||||
})
|
||||
})
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
(!init_accum)
|
||||
.then(|| {
|
||||
let r = scanner.scanning_complete();
|
||||
assert!(!file_name.is_empty());
|
||||
(!r.is_empty() && r.iter().any(|b| !b.is_empty())).then(|| {
|
||||
// error if we can't write this
|
||||
cache_hash_data.save(&file_name, &r).unwrap();
|
||||
cache_hash_data
|
||||
.get_file_reference_to_map_later(&file_name)
|
||||
.unwrap()
|
||||
})
|
||||
})
|
||||
.flatten()
|
||||
})
|
||||
.filter_map(|x| x)
|
||||
.collect()
|
||||
|
|
|
@ -198,9 +198,7 @@ pub(crate) struct CacheHashData {
|
|||
|
||||
impl Drop for CacheHashData {
|
||||
fn drop(&mut self) {
|
||||
if self.should_delete_old_cache_files_on_drop {
|
||||
self.delete_old_cache_files();
|
||||
}
|
||||
self.delete_old_cache_files();
|
||||
self.stats.report();
|
||||
}
|
||||
}
|
||||
|
@ -224,18 +222,24 @@ impl CacheHashData {
|
|||
result.get_cache_files();
|
||||
result
|
||||
}
|
||||
fn delete_old_cache_files(&self) {
|
||||
let old_cache_files = std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
|
||||
if !old_cache_files.is_empty() {
|
||||
self.stats
|
||||
.unused_cache_files
|
||||
.fetch_add(old_cache_files.len(), Ordering::Relaxed);
|
||||
for file_name in old_cache_files.iter() {
|
||||
let result = self.cache_dir.join(file_name);
|
||||
let _ = fs::remove_file(result);
|
||||
|
||||
/// delete all pre-existing files that will not be used
|
||||
pub(crate) fn delete_old_cache_files(&self) {
|
||||
if self.should_delete_old_cache_files_on_drop {
|
||||
let old_cache_files =
|
||||
std::mem::take(&mut *self.pre_existing_cache_files.lock().unwrap());
|
||||
if !old_cache_files.is_empty() {
|
||||
self.stats
|
||||
.unused_cache_files
|
||||
.fetch_add(old_cache_files.len(), Ordering::Relaxed);
|
||||
for file_name in old_cache_files.iter() {
|
||||
let result = self.cache_dir.join(file_name);
|
||||
let _ = fs::remove_file(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_cache_files(&self) {
|
||||
if self.cache_dir.is_dir() {
|
||||
let dir = fs::read_dir(&self.cache_dir);
|
||||
|
|
Loading…
Reference in New Issue