Creates temporary accounts hash cache files inside accounts hash cache dir (#31776)

This commit is contained in:
Brooks 2023-05-23 17:09:16 -04:00 committed by GitHub
parent a3fc622550
commit a474cb24b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 32 deletions

View File

@ -7585,7 +7585,8 @@ impl AccountsDb {
let slot = storages.max_slot_inclusive(); let slot = storages.max_slot_inclusive();
let use_bg_thread_pool = config.use_bg_thread_pool; let use_bg_thread_pool = config.use_bg_thread_pool;
let scan_and_hash = || { let scan_and_hash = || {
let cache_hash_data = Self::get_cache_hash_data(accounts_hash_cache_path, config, slot); let cache_hash_data =
Self::get_cache_hash_data(accounts_hash_cache_path.clone(), config, slot);
let bounds = Range { let bounds = Range {
start: 0, start: 0,
@ -7599,6 +7600,7 @@ impl AccountsDb {
None None
}, },
zero_lamport_accounts: flavor.zero_lamport_accounts(), zero_lamport_accounts: flavor.zero_lamport_accounts(),
dir_for_temp_cache_files: accounts_hash_cache_path,
}; };
// get raw data by scanning // get raw data by scanning

View File

@ -20,12 +20,13 @@ use {
convert::TryInto, convert::TryInto,
fs::File, fs::File,
io::{BufWriter, Write}, io::{BufWriter, Write},
path::PathBuf,
sync::{ sync::{
atomic::{AtomicU64, AtomicUsize, Ordering}, atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, Arc, Mutex,
}, },
}, },
tempfile::tempfile, tempfile::tempfile_in,
}; };
pub const MERKLE_FANOUT: usize = 16; pub const MERKLE_FANOUT: usize = 16;
@ -51,10 +52,11 @@ impl MmapAccountHashesFile {
} }
/// 1 file containing account hashes sorted by pubkey /// 1 file containing account hashes sorted by pubkey
#[derive(Default)]
pub struct AccountHashesFile { pub struct AccountHashesFile {
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file. /// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
count_and_writer: Option<(usize, BufWriter<File>)>, count_and_writer: Option<(usize, BufWriter<File>)>,
/// The directory where temporary cache files are put
dir_for_temp_cache_files: PathBuf,
} }
impl AccountHashesFile { impl AccountHashesFile {
@ -84,7 +86,10 @@ impl AccountHashesFile {
pub fn write(&mut self, hash: &Hash) { pub fn write(&mut self, hash: &Hash) {
if self.count_and_writer.is_none() { if self.count_and_writer.is_none() {
// we have hashes to write but no file yet, so create a file that will auto-delete on drop // we have hashes to write but no file yet, so create a file that will auto-delete on drop
self.count_and_writer = Some((0, BufWriter::new(tempfile().unwrap()))); self.count_and_writer = Some((
0,
BufWriter::new(tempfile_in(&self.dir_for_temp_cache_files).unwrap()),
));
} }
let count_and_writer = self.count_and_writer.as_mut().unwrap(); let count_and_writer = self.count_and_writer.as_mut().unwrap();
assert_eq!( assert_eq!(
@ -455,15 +460,8 @@ impl CumulativeOffsets {
pub struct AccountsHasher { pub struct AccountsHasher {
pub filler_account_suffix: Option<Pubkey>, pub filler_account_suffix: Option<Pubkey>,
pub zero_lamport_accounts: ZeroLamportAccounts, pub zero_lamport_accounts: ZeroLamportAccounts,
} /// The directory where temporary cache files are put
pub dir_for_temp_cache_files: PathBuf,
impl Default for AccountsHasher {
fn default() -> Self {
Self {
filler_account_suffix: None,
zero_lamport_accounts: ZeroLamportAccounts::Excluded,
}
}
} }
impl AccountsHasher { impl AccountsHasher {
@ -931,7 +929,10 @@ impl AccountsHasher {
// map from index of an item in first_items[] to index of the corresponding item in pubkey_division[] // map from index of an item in first_items[] to index of the corresponding item in pubkey_division[]
// this will change as items in pubkey_division[] are exhausted // this will change as items in pubkey_division[] are exhausted
let mut first_item_to_pubkey_division = Vec::with_capacity(len); let mut first_item_to_pubkey_division = Vec::with_capacity(len);
let mut hashes = AccountHashesFile::default(); let mut hashes = AccountHashesFile {
count_and_writer: None,
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
};
// initialize 'first_items', which holds the current lowest item in each slot group // initialize 'first_items', which holds the current lowest item in each slot group
pubkey_division.iter().enumerate().for_each(|(i, bins)| { pubkey_division.iter().enumerate().for_each(|(i, bins)| {
// check to make sure we can do bins[pubkey_bin] // check to make sure we can do bins[pubkey_bin]
@ -1111,12 +1112,32 @@ pub struct AccountsDeltaHash(pub Hash);
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use {super::*, std::str::FromStr}; use {super::*, std::str::FromStr, tempfile::tempdir};
impl AccountsHasher {
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
Self {
filler_account_suffix: None,
zero_lamport_accounts: ZeroLamportAccounts::Excluded,
dir_for_temp_cache_files,
}
}
}
impl AccountHashesFile {
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
Self {
count_and_writer: None,
dir_for_temp_cache_files,
}
}
}
#[test] #[test]
fn test_account_hashes_file() { fn test_account_hashes_file() {
let dir_for_temp_cache_files = tempdir().unwrap();
// 0 hashes // 0 hashes
let mut file = AccountHashesFile::default(); let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
assert!(file.get_reader().is_none()); assert!(file.get_reader().is_none());
let hashes = (0..2).map(|i| Hash::new(&[i; 32])).collect::<Vec<_>>(); let hashes = (0..2).map(|i| Hash::new(&[i; 32])).collect::<Vec<_>>();
@ -1127,7 +1148,7 @@ pub mod tests {
assert!(reader.1.read(1).is_empty()); assert!(reader.1.read(1).is_empty());
// multiple hashes // multiple hashes
let mut file = AccountHashesFile::default(); let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
assert!(file.get_reader().is_none()); assert!(file.get_reader().is_none());
hashes.iter().for_each(|hash| file.write(hash)); hashes.iter().for_each(|hash| file.write(hash));
let reader = file.get_reader().unwrap(); let reader = file.get_reader().unwrap();
@ -1137,21 +1158,22 @@ pub mod tests {
#[test] #[test]
fn test_cumulative_hashes_from_files() { fn test_cumulative_hashes_from_files() {
let dir_for_temp_cache_files = tempdir().unwrap();
(0..4).for_each(|permutation| { (0..4).for_each(|permutation| {
let hashes = (0..2).map(|i| Hash::new(&[i + 1; 32])).collect::<Vec<_>>(); let hashes = (0..2).map(|i| Hash::new(&[i + 1; 32])).collect::<Vec<_>>();
let mut combined = Vec::default(); let mut combined = Vec::default();
// 0 hashes // 0 hashes
let file0 = AccountHashesFile::default(); let file0 = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
// 1 hash // 1 hash
let mut file1 = AccountHashesFile::default(); let mut file1 = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
file1.write(&hashes[0]); file1.write(&hashes[0]);
combined.push(hashes[0]); combined.push(hashes[0]);
// multiple hashes // multiple hashes
let mut file2 = AccountHashesFile::default(); let mut file2 = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
hashes.iter().for_each(|hash| { hashes.iter().for_each(|hash| {
file2.write(hash); file2.write(hash);
combined.push(*hash); combined.push(*hash);
@ -1164,9 +1186,9 @@ pub mod tests {
vec![ vec![
file0, file0,
file1, file1,
AccountHashesFile::default(), AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()),
file2, file2,
AccountHashesFile::default(), AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()),
] ]
} else if permutation == 2 { } else if permutation == 2 {
vec![file1, file2] vec![file1, file2]
@ -1176,8 +1198,8 @@ pub mod tests {
combined.push(one); combined.push(one);
vec![ vec![
file2, file2,
AccountHashesFile::default(), AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()),
AccountHashesFile::default(), AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()),
file1, file1,
] ]
}; };
@ -1240,7 +1262,8 @@ pub mod tests {
let val = CalculateHashIntermediate::new(hash, 0, key); let val = CalculateHashIntermediate::new(hash, 0, key);
account_maps.push(val); account_maps.push(val);
let accounts_hash = AccountsHasher::default(); let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hash = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
let result = accounts_hash let result = accounts_hash
.rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default()); .rest_of_hash_calculation(for_rest(&account_maps), &mut HashStats::default());
let expected_hash = Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap(); let expected_hash = Hash::from_str("8j9ARGFv4W2GfML7d3sVJK2MePwrikqYnu6yqer28cCa").unwrap();
@ -1285,8 +1308,9 @@ pub mod tests {
}]]]; }]]];
let temp_vec = vec.to_vec(); let temp_vec = vec.to_vec();
let slice = convert_to_slice2(&temp_vec); let slice = convert_to_slice2(&temp_vec);
let (mut hashes, lamports, _) = let dir_for_temp_cache_files = tempdir().unwrap();
AccountsHasher::default().de_dup_accounts_in_parallel(&slice, 0); let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
let (mut hashes, lamports, _) = accounts_hasher.de_dup_accounts_in_parallel(&slice, 0);
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0)); assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0));
assert_eq!(lamports, 1); assert_eq!(lamports, 1);
} }
@ -1304,7 +1328,8 @@ pub mod tests {
#[test] #[test]
fn test_accountsdb_de_dup_accounts_empty() { fn test_accountsdb_de_dup_accounts_empty() {
solana_logger::setup(); solana_logger::setup();
let accounts_hash = AccountsHasher::default(); let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hash = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
let vec = vec![vec![], vec![]]; let vec = vec![vec![], vec![]];
let (hashes, lamports) = let (hashes, lamports) =
@ -1399,7 +1424,8 @@ pub mod tests {
result result
}).collect(); }).collect();
let hash = AccountsHasher::default(); let dir_for_temp_cache_files = tempdir().unwrap();
let hash = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
let mut expected_index = 0; let mut expected_index = 0;
for last_slice in 0..2 { for last_slice in 0..2 {
for start in 0..COUNT { for start in 0..COUNT {
@ -1532,7 +1558,9 @@ pub mod tests {
fn test_de_dup_accounts_in_parallel<'a>( fn test_de_dup_accounts_in_parallel<'a>(
account_maps: &'a [SortedDataByPubkey<'a>], account_maps: &'a [SortedDataByPubkey<'a>],
) -> (AccountHashesFile, u64, usize) { ) -> (AccountHashesFile, u64, usize) {
AccountsHasher::default().de_dup_accounts_in_parallel(account_maps, 0) let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
accounts_hasher.de_dup_accounts_in_parallel(account_maps, 0)
} }
#[test] #[test]
@ -1908,7 +1936,9 @@ pub mod tests {
), ),
CalculateHashIntermediate::new(Hash::new(&[2u8; 32]), offset + 1, Pubkey::new_unique()), CalculateHashIntermediate::new(Hash::new(&[2u8; 32]), offset + 1, Pubkey::new_unique()),
]; ];
AccountsHasher::default().de_dup_accounts_in_parallel(&[convert_to_slice(&[input])], 0); let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
accounts_hasher.de_dup_accounts_in_parallel(&[convert_to_slice(&[input])], 0);
} }
fn convert_to_slice( fn convert_to_slice(
@ -1944,7 +1974,9 @@ pub mod tests {
Pubkey::new_unique(), Pubkey::new_unique(),
)], )],
]; ];
AccountsHasher::default().de_dup_accounts( let dir_for_temp_cache_files = tempdir().unwrap();
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
accounts_hasher.de_dup_accounts(
&[convert_to_slice(&input)], &[convert_to_slice(&input)],
&mut HashStats::default(), &mut HashStats::default(),
2, // accounts above are in 2 groups 2, // accounts above are in 2 groups