refactor CacheHashDataFile to use mmapped data (#28163)

reafactor CacheHashDataFile to use mmapped data
This commit is contained in:
Jeff Washington (jwash) 2022-10-01 17:09:36 -07:00 committed by GitHub
parent 929a311155
commit 7fd8540b49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 51 additions and 86 deletions

View File

@ -4,7 +4,6 @@ use {
accounts_hash::CalculateHashIntermediate, cache_hash_data_stats::CacheHashDataStats,
pubkey_bins::PubkeyBinCalculator24,
},
log::*,
memmap2::MmapMut,
solana_measure::measure::Measure,
std::{
@ -25,13 +24,44 @@ pub struct Header {
count: usize,
}
struct CacheHashDataFile {
pub(crate) struct CacheHashDataFile {
cell_size: u64,
mmap: MmapMut,
capacity: u64,
}
impl CacheHashDataFile {
/// return a slice of a reference to all the cache hash data from the mmapped file
pub(crate) fn get_cache_hash_data(&self) -> &[EntryType] {
self.get_slice(0)
}
/// Populate 'accumulator' from entire contents of the cache file.
pub(crate) fn load_all(
&self,
accumulator: &mut SavedType,
start_bin_index: usize,
bin_calculator: &PubkeyBinCalculator24,
stats: &mut CacheHashDataStats,
) {
let mut m2 = Measure::start("decode");
let slices = self.get_cache_hash_data();
for d in slices {
let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
assert!(
pubkey_to_bin_index >= start_bin_index,
"{}, {}",
pubkey_to_bin_index,
start_bin_index
); // this would indicate we put a pubkey in too high of a bin
pubkey_to_bin_index -= start_bin_index;
accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here
}
m2.stop();
stats.decode_us += m2.as_us();
}
/// get '&mut EntryType' from cache file [ix]
fn get_mut(&mut self, ix: u64) -> &mut EntryType {
let item_slice = self.get_slice_internal(ix);
@ -41,14 +71,6 @@ impl CacheHashDataFile {
}
}
/// get '&EntryType' from cache file [ix]
fn get(&self, ix: u64) -> &EntryType {
// get cache file[ix..]
let slice = self.get_slice(ix);
// return [0]
&slice[0]
}
/// get '&[EntryType]' from cache file [ix..]
fn get_slice(&self, ix: u64) -> &[EntryType] {
let start = self.get_element_offset_byte(ix);
@ -126,42 +148,6 @@ impl CacheHashDataFile {
}
}
/// refer to a mmaped cache file and enable accessing the data held within
struct MappedCacheFile {
/// the cache file
cache_file: CacheHashDataFile,
/// number of entries in the cache file
entries: usize,
}
impl MappedCacheFile {
/// Populate 'accumulator' from entire contents of the cache file.
fn load_all(
&mut self,
accumulator: &mut SavedType,
start_bin_index: usize,
bin_calculator: &PubkeyBinCalculator24,
stats: &mut CacheHashDataStats,
) {
let mut m2 = Measure::start("decode");
for i in 0..self.entries {
let d = self.cache_file.get(i as u64);
let mut pubkey_to_bin_index = bin_calculator.bin_from_pubkey(&d.pubkey);
assert!(
pubkey_to_bin_index >= start_bin_index,
"{}, {}",
pubkey_to_bin_index,
start_bin_index
); // this would indicate we put a pubkey in too high of a bin
pubkey_to_bin_index -= start_bin_index;
accumulator[pubkey_to_bin_index].push(d.clone()); // may want to avoid clone here
}
m2.stop();
stats.decode_us += m2.as_us();
}
}
pub type PreExistingCacheFiles = HashSet<String>;
pub struct CacheHashData {
cache_folder: PathBuf,
@ -222,47 +208,39 @@ impl CacheHashData {
}
/// load from 'file_name' into 'accumulator'
pub fn load<P: AsRef<Path> + std::fmt::Debug>(
pub(crate) fn load<P: AsRef<Path> + std::fmt::Debug>(
&self,
file_name: &P,
accumulator: &mut SavedType,
start_bin_index: usize,
bin_calculator: &PubkeyBinCalculator24,
) -> Result<(), std::io::Error> {
let mut stats = CacheHashDataStats::default();
let result = self.load_internal(
file_name,
accumulator,
start_bin_index,
bin_calculator,
&mut stats,
);
self.stats.lock().unwrap().merge(&stats);
result
}
fn load_internal<P: AsRef<Path> + std::fmt::Debug>(
&self,
file_name: &P,
accumulator: &mut SavedType,
start_bin_index: usize,
bin_calculator: &PubkeyBinCalculator24,
stats: &mut CacheHashDataStats,
) -> Result<(), std::io::Error> {
let mut m = Measure::start("overall");
let mut cache_file = self.map(file_name, stats)?;
cache_file.load_all(accumulator, start_bin_index, bin_calculator, stats);
let cache_file = self.load_map(file_name)?;
let mut stats = CacheHashDataStats::default();
cache_file.load_all(accumulator, start_bin_index, bin_calculator, &mut stats);
m.stop();
stats.load_us += m.as_us();
self.stats.lock().unwrap().load_us += m.as_us();
Ok(())
}
/// map 'file_name' into memory
pub(crate) fn load_map<P: AsRef<Path> + std::fmt::Debug>(
&self,
file_name: &P,
) -> Result<CacheHashDataFile, std::io::Error> {
let mut stats = CacheHashDataStats::default();
let result = self.map(file_name, &mut stats);
self.stats.lock().unwrap().merge(&stats);
result
}
/// create and return a MappedCacheFile for a cache file path
fn map<P: AsRef<Path> + std::fmt::Debug>(
&self,
file_name: &P,
stats: &mut CacheHashDataStats,
) -> Result<MappedCacheFile, std::io::Error> {
) -> Result<CacheHashDataFile, std::io::Error> {
let path = self.cache_folder.join(file_name);
let file_len = std::fs::metadata(path.clone())?.len();
let mut m1 = Measure::start("read_file");
@ -306,28 +284,15 @@ impl CacheHashData {
stats.cache_file_size += capacity as usize;
let file_name_lookup = file_name.as_ref().to_str().unwrap().to_string();
let found = self
.pre_existing_cache_files
self.pre_existing_cache_files
.lock()
.unwrap()
.remove(&file_name_lookup);
if !found {
info!(
"tried to mark {:?} as used, but it wasn't in the set, one example: {:?}",
file_name_lookup,
self.pre_existing_cache_files.lock().unwrap().iter().next()
);
}
stats.loaded_from_cache += 1;
stats.entries_loaded_from_cache += entries;
let mapped_file = MappedCacheFile {
cache_file,
entries,
};
Ok(mapped_file)
Ok(cache_file)
}
/// save 'data' to 'file_name'