rework accounts hash calc dedup to avoid kernel file error (#33195)
* in hash calc, calculate max_inclusive_num_pubkeys * in hash calc, dedup uses mmap files to avoid os panic * as_mut_ptr * remove unsafe code * refactor count in hash files --------- Co-authored-by: HaoranYi <haoran.yi@solana.com>
This commit is contained in:
parent
7fc6fea8d8
commit
4dfe62a2f0
|
@ -19,8 +19,7 @@ use {
|
||||||
std::{
|
std::{
|
||||||
borrow::Borrow,
|
borrow::Borrow,
|
||||||
convert::TryInto,
|
convert::TryInto,
|
||||||
fs::File,
|
io::{Seek, SeekFrom, Write},
|
||||||
io::{BufWriter, Write},
|
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicU64, AtomicUsize, Ordering},
|
atomic::{AtomicU64, AtomicUsize, Ordering},
|
||||||
|
@ -33,81 +32,96 @@ pub const MERKLE_FANOUT: usize = 16;
|
||||||
|
|
||||||
/// 1 file containing account hashes sorted by pubkey, mapped into memory
|
/// 1 file containing account hashes sorted by pubkey, mapped into memory
|
||||||
struct MmapAccountHashesFile {
|
struct MmapAccountHashesFile {
|
||||||
|
/// raw slice of `Hash` values. Can be a larger slice than `count`
|
||||||
mmap: MmapMut,
|
mmap: MmapMut,
|
||||||
|
/// # of valid Hash entries in `mmap`
|
||||||
|
count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MmapAccountHashesFile {
|
impl MmapAccountHashesFile {
|
||||||
/// return a slice of account hashes starting at 'index'
|
/// return a slice of account hashes starting at 'index'
|
||||||
fn read(&self, index: usize) -> &[Hash] {
|
fn read(&self, index: usize) -> &[Hash] {
|
||||||
let start = std::mem::size_of::<Hash>() * index;
|
let start = std::mem::size_of::<Hash>() * index;
|
||||||
let item_slice: &[u8] = &self.mmap[start..];
|
let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::<Hash>()];
|
||||||
let remaining_elements = item_slice.len() / std::mem::size_of::<Hash>();
|
let remaining_elements = item_slice.len() / std::mem::size_of::<Hash>();
|
||||||
unsafe {
|
unsafe {
|
||||||
let item = item_slice.as_ptr() as *const Hash;
|
let item = item_slice.as_ptr() as *const Hash;
|
||||||
std::slice::from_raw_parts(item, remaining_elements)
|
std::slice::from_raw_parts(item, remaining_elements)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// write a hash to the end of mmap file.
|
||||||
|
fn write(&mut self, hash: &Hash) {
|
||||||
|
let start = self.count * std::mem::size_of::<Hash>();
|
||||||
|
let end = start + std::mem::size_of::<Hash>();
|
||||||
|
self.mmap[start..end].copy_from_slice(hash.as_ref());
|
||||||
|
self.count += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 1 file containing account hashes sorted by pubkey
|
/// 1 file containing account hashes sorted by pubkey
|
||||||
pub struct AccountHashesFile {
|
pub struct AccountHashesFile {
|
||||||
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
|
/// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file.
|
||||||
count_and_writer: Option<(usize, BufWriter<File>)>,
|
writer: Option<MmapAccountHashesFile>,
|
||||||
/// The directory where temporary cache files are put
|
/// The directory where temporary cache files are put
|
||||||
dir_for_temp_cache_files: PathBuf,
|
dir_for_temp_cache_files: PathBuf,
|
||||||
|
/// # bytes allocated
|
||||||
|
capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AccountHashesFile {
|
impl AccountHashesFile {
|
||||||
/// map the file into memory and return a reader that can access it by slice
|
/// return a mmap reader that can be accessed by slice
|
||||||
fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> {
|
fn get_reader(&mut self) -> Option<MmapAccountHashesFile> {
|
||||||
std::mem::take(&mut self.count_and_writer).map(|(count, writer)| {
|
std::mem::take(&mut self.writer)
|
||||||
let file = Some(writer.into_inner().unwrap());
|
|
||||||
(
|
|
||||||
count,
|
|
||||||
MmapAccountHashesFile {
|
|
||||||
mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() },
|
|
||||||
},
|
|
||||||
)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # hashes stored in this file
|
/// # hashes stored in this file
|
||||||
pub fn count(&self) -> usize {
|
pub fn count(&self) -> usize {
|
||||||
self.count_and_writer
|
self.writer
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|(count, _)| *count)
|
.map(|writer| writer.count)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// write 'hash' to the file
|
/// write 'hash' to the file
|
||||||
/// If the file isn't open, create it first.
|
/// If the file isn't open, create it first.
|
||||||
pub fn write(&mut self, hash: &Hash) {
|
pub fn write(&mut self, hash: &Hash) {
|
||||||
if self.count_and_writer.is_none() {
|
if self.writer.is_none() {
|
||||||
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
|
// we have hashes to write but no file yet, so create a file that will auto-delete on drop
|
||||||
self.count_and_writer = Some((
|
|
||||||
0,
|
let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
|
||||||
BufWriter::new(
|
|
||||||
tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| {
|
|
||||||
panic!(
|
|
||||||
"Unable to create file within {}: {err}",
|
|
||||||
self.dir_for_temp_cache_files.display()
|
|
||||||
)
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
let count_and_writer = self.count_and_writer.as_mut().unwrap();
|
|
||||||
count_and_writer
|
|
||||||
.1
|
|
||||||
.write_all(hash.as_ref())
|
|
||||||
.unwrap_or_else(|err| {
|
|
||||||
panic!(
|
panic!(
|
||||||
"Unable to write file within {}: {err}",
|
"Unable to create file within {}: {err}",
|
||||||
self.dir_for_temp_cache_files.display()
|
self.dir_for_temp_cache_files.display()
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
count_and_writer.0 += 1;
|
// Theoretical performance optimization: write a zero to the end of
|
||||||
|
// the file so that we won't have to resize it later, which may be
|
||||||
|
// expensive.
|
||||||
|
data.seek(SeekFrom::Start((self.capacity - 1) as u64))
|
||||||
|
.unwrap();
|
||||||
|
data.write_all(&[0]).unwrap();
|
||||||
|
data.rewind().unwrap();
|
||||||
|
data.flush().unwrap();
|
||||||
|
|
||||||
|
//UNSAFE: Required to create a Mmap
|
||||||
|
let map = unsafe { MmapMut::map_mut(&data) };
|
||||||
|
let map = map.unwrap_or_else(|e| {
|
||||||
|
error!(
|
||||||
|
"Failed to map the data file (size: {}): {}.\n
|
||||||
|
Please increase sysctl vm.max_map_count or equivalent for your platform.",
|
||||||
|
self.capacity, e
|
||||||
|
);
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
self.writer = Some(MmapAccountHashesFile {
|
||||||
|
mmap: map,
|
||||||
|
count: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
self.writer.as_mut().unwrap().write(hash);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,7 +352,8 @@ impl CumulativeHashesFromFiles {
|
||||||
let mut readers = Vec::with_capacity(hashes.len());
|
let mut readers = Vec::with_capacity(hashes.len());
|
||||||
let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| {
|
let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| {
|
||||||
// ignores all hashfiles that have zero entries
|
// ignores all hashfiles that have zero entries
|
||||||
hash_file.get_reader().map(|(count, reader)| {
|
hash_file.get_reader().map(|reader| {
|
||||||
|
let count = reader.count;
|
||||||
readers.push(reader);
|
readers.push(reader);
|
||||||
count
|
count
|
||||||
})
|
})
|
||||||
|
@ -985,15 +1000,12 @@ impl<'a> AccountsHasher<'a> {
|
||||||
// map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[]
|
// map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[]
|
||||||
// this will change as items in sorted_data_by_pubkey[] are exhausted
|
// this will change as items in sorted_data_by_pubkey[] are exhausted
|
||||||
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
|
let mut first_item_to_pubkey_division = Vec::with_capacity(len);
|
||||||
let mut hashes = AccountHashesFile {
|
|
||||||
count_and_writer: None,
|
|
||||||
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
|
|
||||||
};
|
|
||||||
// initialize 'first_items', which holds the current lowest item in each slot group
|
// initialize 'first_items', which holds the current lowest item in each slot group
|
||||||
sorted_data_by_pubkey
|
let max_inclusive_num_pubkeys = sorted_data_by_pubkey
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.for_each(|(i, hash_data)| {
|
.map(|(i, hash_data)| {
|
||||||
let first_pubkey_in_bin =
|
let first_pubkey_in_bin =
|
||||||
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats);
|
Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats);
|
||||||
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
|
if let Some(first_pubkey_in_bin) = first_pubkey_in_bin {
|
||||||
|
@ -1001,8 +1013,27 @@ impl<'a> AccountsHasher<'a> {
|
||||||
first_items.push(k);
|
first_items.push(k);
|
||||||
first_item_to_pubkey_division.push(i);
|
first_item_to_pubkey_division.push(i);
|
||||||
indexes.push(first_pubkey_in_bin);
|
indexes.push(first_pubkey_in_bin);
|
||||||
|
let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1;
|
||||||
|
while first_pubkey_in_next_bin < hash_data.len() {
|
||||||
|
if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey)
|
||||||
|
!= pubkey_bin
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
first_pubkey_in_next_bin += 1;
|
||||||
|
}
|
||||||
|
first_pubkey_in_next_bin - first_pubkey_in_bin
|
||||||
|
} else {
|
||||||
|
0
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
|
.sum::<usize>();
|
||||||
|
let mut hashes = AccountHashesFile {
|
||||||
|
writer: None,
|
||||||
|
dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(),
|
||||||
|
capacity: max_inclusive_num_pubkeys * std::mem::size_of::<Hash>(),
|
||||||
|
};
|
||||||
|
|
||||||
let mut overall_sum = 0;
|
let mut overall_sum = 0;
|
||||||
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
|
let mut duplicate_pubkey_indexes = Vec::with_capacity(len);
|
||||||
let filler_accounts_enabled = self.filler_accounts_enabled();
|
let filler_accounts_enabled = self.filler_accounts_enabled();
|
||||||
|
@ -1238,8 +1269,9 @@ pub mod tests {
|
||||||
impl AccountHashesFile {
|
impl AccountHashesFile {
|
||||||
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
|
fn new(dir_for_temp_cache_files: PathBuf) -> Self {
|
||||||
Self {
|
Self {
|
||||||
count_and_writer: None,
|
writer: None,
|
||||||
dir_for_temp_cache_files,
|
dir_for_temp_cache_files,
|
||||||
|
capacity: 1024, /* default 1k for tests */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1308,16 +1340,16 @@ pub mod tests {
|
||||||
// 1 hash
|
// 1 hash
|
||||||
file.write(&hashes[0]);
|
file.write(&hashes[0]);
|
||||||
let reader = file.get_reader().unwrap();
|
let reader = file.get_reader().unwrap();
|
||||||
assert_eq!(&[hashes[0]][..], reader.1.read(0));
|
assert_eq!(&[hashes[0]][..], reader.read(0));
|
||||||
assert!(reader.1.read(1).is_empty());
|
assert!(reader.read(1).is_empty());
|
||||||
|
|
||||||
// multiple hashes
|
// multiple hashes
|
||||||
let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
|
let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf());
|
||||||
assert!(file.get_reader().is_none());
|
assert!(file.get_reader().is_none());
|
||||||
hashes.iter().for_each(|hash| file.write(hash));
|
hashes.iter().for_each(|hash| file.write(hash));
|
||||||
let reader = file.get_reader().unwrap();
|
let reader = file.get_reader().unwrap();
|
||||||
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i)));
|
(0..2).for_each(|i| assert_eq!(&hashes[i..], reader.read(i)));
|
||||||
assert!(reader.1.read(2).is_empty());
|
assert!(reader.read(2).is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -1476,7 +1508,7 @@ pub mod tests {
|
||||||
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
|
let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf());
|
||||||
let (mut hashes, lamports) =
|
let (mut hashes, lamports) =
|
||||||
accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default());
|
accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default());
|
||||||
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0));
|
assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().read(0));
|
||||||
assert_eq!(lamports, 1);
|
assert_eq!(lamports, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1486,7 +1518,7 @@ pub mod tests {
|
||||||
fn get_vec(mut hashes: AccountHashesFile) -> Vec<Hash> {
|
fn get_vec(mut hashes: AccountHashesFile) -> Vec<Hash> {
|
||||||
hashes
|
hashes
|
||||||
.get_reader()
|
.get_reader()
|
||||||
.map(|r| r.1.read(0).to_vec())
|
.map(|r| r.read(0).to_vec())
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue