refactor cache hash data stats (#32884)

This commit is contained in:
Jeff Washington (jwash) 2023-08-18 09:24:59 -07:00 committed by GitHub
parent 280bb53802
commit 48e51134d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 98 deletions

View File

@ -9908,7 +9908,6 @@ pub mod tests {
AccountSecondaryIndexesIncludeExclude, ReadAccountMapEntry, RefCount,
},
append_vec::{test_utils::TempFile, AppendVecStoredAccountMeta},
cache_hash_data_stats::CacheHashDataStats,
inline_spl_token,
secondary_index::MAX_NUM_LARGEST_INDEX_KEYS_RETURNED,
},
@ -10436,7 +10435,6 @@ pub mod tests {
&mut result2,
start_bin_index,
&PubkeyBinCalculator24::new(bins),
&mut CacheHashDataStats::default(),
);
assert_eq!(
convert_to_slice(&[result2]),
@ -10821,12 +10819,7 @@ pub mod tests {
}
let mut result2 = (0..range).map(|_| Vec::default()).collect::<Vec<_>>();
if let Some(m) = result.get(0) {
m.load_all(
&mut result2,
bin,
&PubkeyBinCalculator24::new(bins),
&mut CacheHashDataStats::default(),
);
m.load_all(&mut result2, bin, &PubkeyBinCalculator24::new(bins));
} else {
result2 = vec![];
}
@ -10869,12 +10862,7 @@ pub mod tests {
let mut expected = vec![Vec::new(); range];
expected[0].push(raw_expected[1].clone());
let mut result2 = (0..range).map(|_| Vec::default()).collect::<Vec<_>>();
result[0].load_all(
&mut result2,
0,
&PubkeyBinCalculator24::new(range),
&mut CacheHashDataStats::default(),
);
result[0].load_all(&mut result2, 0, &PubkeyBinCalculator24::new(range));
assert_eq!(result2.len(), 1);
assert_eq!(result2, expected);
}

View File

@ -10,7 +10,7 @@ use {
fs::{self, remove_file, File, OpenOptions},
io::{Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::{Arc, Mutex},
sync::{atomic::Ordering, Arc, Mutex},
},
};
@ -28,6 +28,7 @@ pub(crate) struct CacheHashDataFileReference {
file: File,
file_len: u64,
path: PathBuf,
stats: Arc<CacheHashDataStats>,
}
/// mmapped cache hash data file
@ -39,15 +40,12 @@ pub(crate) struct CacheHashDataFile {
impl CacheHashDataFileReference {
/// convert the open file refrence to a mmapped file that can be returned as a slice
pub(crate) fn map(
&self,
stats: &mut CacheHashDataStats,
) -> Result<CacheHashDataFile, std::io::Error> {
pub(crate) fn map(&self) -> Result<CacheHashDataFile, std::io::Error> {
let file_len = self.file_len;
let mut m1 = Measure::start("read_file");
let mmap = CacheHashDataFileReference::load_map(&self.file)?;
m1.stop();
stats.read_us = m1.as_us();
self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
let header_size = std::mem::size_of::<Header>() as u64;
if file_len < header_size {
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
@ -80,12 +78,17 @@ impl CacheHashDataFileReference {
"expected: {capacity}, len on disk: {file_len} {}, entries: {entries}, cell_size: {cell_size}", self.path.display(),
);
stats.total_entries = entries;
stats.cache_file_size += capacity as usize;
stats.loaded_from_cache += 1;
stats.entries_loaded_from_cache += entries;
self.stats
.total_entries
.fetch_add(entries, Ordering::Relaxed);
self.stats
.cache_file_size
.fetch_add(capacity as usize, Ordering::Relaxed);
self.stats.loaded_from_cache.fetch_add(1, Ordering::Relaxed);
self.stats
.entries_loaded_from_cache
.fetch_add(entries, Ordering::Relaxed);
Ok(cache_file)
}
@ -107,7 +110,6 @@ impl CacheHashDataFile {
accumulator: &mut SavedType,
start_bin_index: usize,
bin_calculator: &PubkeyBinCalculator24,
stats: &mut CacheHashDataStats,
) {
let mut m2 = Measure::start("decode");
let slices = self.get_cache_hash_data();
@ -122,7 +124,6 @@ impl CacheHashDataFile {
}
m2.stop();
stats.decode_us += m2.as_us();
}
/// get '&mut EntryType' from cache file [ix]
@ -199,13 +200,13 @@ pub type PreExistingCacheFiles = HashSet<PathBuf>;
pub struct CacheHashData {
cache_dir: PathBuf,
pre_existing_cache_files: Arc<Mutex<PreExistingCacheFiles>>,
pub stats: Arc<Mutex<CacheHashDataStats>>,
pub stats: Arc<CacheHashDataStats>,
}
impl Drop for CacheHashData {
fn drop(&mut self) {
self.delete_old_cache_files();
self.stats.lock().unwrap().report();
self.stats.report();
}
}
@ -218,7 +219,7 @@ impl CacheHashData {
let result = CacheHashData {
cache_dir,
pre_existing_cache_files: Arc::new(Mutex::new(PreExistingCacheFiles::default())),
stats: Arc::new(Mutex::new(CacheHashDataStats::default())),
stats: Arc::default(),
};
result.get_cache_files();
@ -227,7 +228,9 @@ impl CacheHashData {
fn delete_old_cache_files(&self) {
let pre_existing_cache_files = self.pre_existing_cache_files.lock().unwrap();
if !pre_existing_cache_files.is_empty() {
self.stats.lock().unwrap().unused_cache_files += pre_existing_cache_files.len();
self.stats
.unused_cache_files
.fetch_add(pre_existing_cache_files.len(), Ordering::Relaxed);
for file_name in pre_existing_cache_files.iter() {
let result = self.cache_dir.join(file_name);
let _ = fs::remove_file(result);
@ -244,7 +247,9 @@ impl CacheHashData {
pre_existing.insert(PathBuf::from(name));
}
}
self.stats.lock().unwrap().cache_file_count += pre_existing.len();
self.stats
.cache_file_count
.fetch_add(pre_existing.len(), Ordering::Relaxed);
}
}
}
@ -260,10 +265,9 @@ impl CacheHashData {
) -> Result<(), std::io::Error> {
let mut m = Measure::start("overall");
let cache_file = self.load_map(file_name)?;
let mut stats = CacheHashDataStats::default();
cache_file.load_all(accumulator, start_bin_index, bin_calculator, &mut stats);
cache_file.load_all(accumulator, start_bin_index, bin_calculator);
m.stop();
self.stats.lock().unwrap().load_us += m.as_us();
self.stats.load_us.fetch_add(m.as_us(), Ordering::Relaxed);
Ok(())
}
@ -273,7 +277,6 @@ impl CacheHashData {
&self,
file_name: impl AsRef<Path>,
) -> Result<CacheHashDataFileReference, std::io::Error> {
let mut stats = CacheHashDataStats::default();
let path = self.cache_dir.join(&file_name);
let file_len = std::fs::metadata(&path)?.len();
let mut m1 = Measure::start("read_file");
@ -284,15 +287,14 @@ impl CacheHashData {
.create(false)
.open(&path)?;
m1.stop();
stats.read_us = m1.as_us();
self.stats.lock().unwrap().accumulate(&stats);
self.stats.read_us.fetch_add(m1.as_us(), Ordering::Relaxed);
self.pre_existing_cache_file_will_be_used(file_name);
Ok(CacheHashDataFileReference {
file,
file_len,
path,
stats: Arc::clone(&self.stats),
})
}
@ -301,11 +303,8 @@ impl CacheHashData {
&self,
file_name: impl AsRef<Path>,
) -> Result<CacheHashDataFile, std::io::Error> {
let mut stats = CacheHashDataStats::default();
let reference = self.get_file_reference_to_map_later(file_name)?;
let result = reference.map(&mut stats);
self.stats.lock().unwrap().accumulate(&stats);
result
reference.map()
}
pub(crate) fn pre_existing_cache_file_will_be_used(&self, file_name: impl AsRef<Path>) {
@ -321,17 +320,13 @@ impl CacheHashData {
file_name: impl AsRef<Path>,
data: &SavedTypeSlice,
) -> Result<(), std::io::Error> {
let mut stats = CacheHashDataStats::default();
let result = self.save_internal(file_name, data, &mut stats);
self.stats.lock().unwrap().accumulate(&stats);
result
self.save_internal(file_name, data)
}
fn save_internal(
&self,
file_name: impl AsRef<Path>,
data: &SavedTypeSlice,
stats: &mut CacheHashDataStats,
) -> Result<(), std::io::Error> {
let mut m = Measure::start("save");
let cache_path = self.cache_dir.join(file_name);
@ -348,7 +343,9 @@ impl CacheHashData {
let mmap = CacheHashDataFile::new_map(&cache_path, capacity)?;
m1.stop();
stats.create_save_us += m1.as_us();
self.stats
.create_save_us
.fetch_add(m1.as_us(), Ordering::Relaxed);
let mut cache_file = CacheHashDataFile {
mmap,
cell_size,
@ -358,8 +355,12 @@ impl CacheHashData {
let header = cache_file.get_header_mut();
header.count = entries;
stats.cache_file_size = capacity as usize;
stats.total_entries = entries;
self.stats
.cache_file_size
.fetch_add(capacity as usize, Ordering::Relaxed);
self.stats
.total_entries
.fetch_add(entries, Ordering::Relaxed);
let mut m2 = Measure::start("write_to_mmap");
let mut i = 0;
@ -372,10 +373,12 @@ impl CacheHashData {
});
assert_eq!(i, entries);
m2.stop();
stats.write_to_mmap_us += m2.as_us();
self.stats
.write_to_mmap_us
.fetch_add(m2.as_us(), Ordering::Relaxed);
m.stop();
stats.save_us += m.as_us();
stats.saved_to_cache += 1;
self.stats.save_us.fetch_add(m.as_us(), Ordering::Relaxed);
self.stats.saved_to_cache.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}

View File

@ -1,59 +1,75 @@
//! Cached data for hashing accounts
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[derive(Default, Debug)]
pub struct CacheHashDataStats {
pub cache_file_size: usize,
pub cache_file_count: usize,
pub total_entries: usize,
pub loaded_from_cache: usize,
pub entries_loaded_from_cache: usize,
pub save_us: u64,
pub saved_to_cache: usize,
pub write_to_mmap_us: u64,
pub create_save_us: u64,
pub load_us: u64,
pub read_us: u64,
pub decode_us: u64,
pub merge_us: u64,
pub unused_cache_files: usize,
pub cache_file_size: AtomicUsize,
pub cache_file_count: AtomicUsize,
pub total_entries: AtomicUsize,
pub loaded_from_cache: AtomicUsize,
pub entries_loaded_from_cache: AtomicUsize,
pub save_us: AtomicU64,
pub saved_to_cache: AtomicUsize,
pub write_to_mmap_us: AtomicU64,
pub create_save_us: AtomicU64,
pub load_us: AtomicU64,
pub read_us: AtomicU64,
pub merge_us: AtomicU64,
pub unused_cache_files: AtomicUsize,
}
impl CacheHashDataStats {
pub fn accumulate(&mut self, other: &CacheHashDataStats) {
self.cache_file_size += other.cache_file_size;
self.total_entries += other.total_entries;
self.loaded_from_cache += other.loaded_from_cache;
self.entries_loaded_from_cache += other.entries_loaded_from_cache;
self.load_us += other.load_us;
self.read_us += other.read_us;
self.decode_us += other.decode_us;
self.save_us += other.save_us;
self.saved_to_cache += other.saved_to_cache;
self.create_save_us += other.create_save_us;
self.cache_file_count += other.cache_file_count;
self.write_to_mmap_us += other.write_to_mmap_us;
self.unused_cache_files += other.unused_cache_files;
}
pub fn report(&self) {
datapoint_info!(
"cache_hash_data_stats",
("cache_file_size", self.cache_file_size, i64),
("cache_file_count", self.cache_file_count, i64),
("total_entries", self.total_entries, i64),
("loaded_from_cache", self.loaded_from_cache, i64),
("saved_to_cache", self.saved_to_cache, i64),
(
"entries_loaded_from_cache",
self.entries_loaded_from_cache,
"cache_file_size",
self.cache_file_size.load(Ordering::Relaxed),
i64
),
(
"cache_file_count",
self.cache_file_count.load(Ordering::Relaxed),
i64
),
(
"total_entries",
self.total_entries.load(Ordering::Relaxed),
i64
),
(
"loaded_from_cache",
self.loaded_from_cache.load(Ordering::Relaxed),
i64
),
(
"saved_to_cache",
self.saved_to_cache.load(Ordering::Relaxed),
i64
),
(
"entries_loaded_from_cache",
self.entries_loaded_from_cache.load(Ordering::Relaxed),
i64
),
("save_us", self.save_us.load(Ordering::Relaxed), i64),
(
"write_to_mmap_us",
self.write_to_mmap_us.load(Ordering::Relaxed),
i64
),
(
"create_save_us",
self.create_save_us.load(Ordering::Relaxed),
i64
),
("load_us", self.load_us.load(Ordering::Relaxed), i64),
("read_us", self.read_us.load(Ordering::Relaxed), i64),
(
"unused_cache_files",
self.unused_cache_files.load(Ordering::Relaxed),
i64
),
("save_us", self.save_us, i64),
("write_to_mmap_us", self.write_to_mmap_us, i64),
("create_save_us", self.create_save_us, i64),
("load_us", self.load_us, i64),
("read_us", self.read_us, i64),
("decode_us", self.decode_us, i64),
("unused_cache_files", self.unused_cache_files, i64),
);
}
}