diff --git a/accounts-db/src/accounts_hash.rs b/accounts-db/src/accounts_hash.rs index ac4134cf8..124e5b069 100644 --- a/accounts-db/src/accounts_hash.rs +++ b/accounts-db/src/accounts_hash.rs @@ -19,8 +19,7 @@ use { std::{ borrow::Borrow, convert::TryInto, - fs::File, - io::{BufWriter, Write}, + io::{Seek, SeekFrom, Write}, path::PathBuf, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, @@ -33,81 +32,96 @@ pub const MERKLE_FANOUT: usize = 16; /// 1 file containing account hashes sorted by pubkey, mapped into memory struct MmapAccountHashesFile { + /// raw slice of `Hash` values. Can be a larger slice than `count` mmap: MmapMut, + /// # of valid Hash entries in `mmap` + count: usize, } impl MmapAccountHashesFile { /// return a slice of account hashes starting at 'index' fn read(&self, index: usize) -> &[Hash] { let start = std::mem::size_of::() * index; - let item_slice: &[u8] = &self.mmap[start..]; + let item_slice: &[u8] = &self.mmap[start..self.count * std::mem::size_of::()]; let remaining_elements = item_slice.len() / std::mem::size_of::(); unsafe { let item = item_slice.as_ptr() as *const Hash; std::slice::from_raw_parts(item, remaining_elements) } } + + /// write a hash to the end of mmap file. + fn write(&mut self, hash: &Hash) { + let start = self.count * std::mem::size_of::(); + let end = start + std::mem::size_of::(); + self.mmap[start..end].copy_from_slice(hash.as_ref()); + self.count += 1; + } } /// 1 file containing account hashes sorted by pubkey pub struct AccountHashesFile { /// # hashes and an open file that will be deleted on drop. None if there are zero hashes to represent, and thus, no file. - count_and_writer: Option<(usize, BufWriter)>, + writer: Option, /// The directory where temporary cache files are put dir_for_temp_cache_files: PathBuf, + /// # bytes allocated + capacity: usize, } impl AccountHashesFile { - /// map the file into memory and return a reader that can access it by slice - fn get_reader(&mut self) -> Option<(usize, MmapAccountHashesFile)> { - std::mem::take(&mut self.count_and_writer).map(|(count, writer)| { - let file = Some(writer.into_inner().unwrap()); - ( - count, - MmapAccountHashesFile { - mmap: unsafe { MmapMut::map_mut(file.as_ref().unwrap()).unwrap() }, - }, - ) - }) + /// return a mmap reader that can be accessed by slice + fn get_reader(&mut self) -> Option { + std::mem::take(&mut self.writer) } /// # hashes stored in this file pub fn count(&self) -> usize { - self.count_and_writer + self.writer .as_ref() - .map(|(count, _)| *count) + .map(|writer| writer.count) .unwrap_or_default() } /// write 'hash' to the file /// If the file isn't open, create it first. pub fn write(&mut self, hash: &Hash) { - if self.count_and_writer.is_none() { + if self.writer.is_none() { // we have hashes to write but no file yet, so create a file that will auto-delete on drop - self.count_and_writer = Some(( - 0, - BufWriter::new( - tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { - panic!( - "Unable to create file within {}: {err}", - self.dir_for_temp_cache_files.display() - ) - }), - ), - )); - } - let count_and_writer = self.count_and_writer.as_mut().unwrap(); - count_and_writer - .1 - .write_all(hash.as_ref()) - .unwrap_or_else(|err| { + + let mut data = tempfile_in(&self.dir_for_temp_cache_files).unwrap_or_else(|err| { panic!( - "Unable to write file within {}: {err}", + "Unable to create file within {}: {err}", self.dir_for_temp_cache_files.display() ) }); - count_and_writer.0 += 1; + // Theoretical performance optimization: write a zero to the end of + // the file so that we won't have to resize it later, which may be + // expensive. + data.seek(SeekFrom::Start((self.capacity - 1) as u64)) + .unwrap(); + data.write_all(&[0]).unwrap(); + data.rewind().unwrap(); + data.flush().unwrap(); + + //UNSAFE: Required to create a Mmap + let map = unsafe { MmapMut::map_mut(&data) }; + let map = map.unwrap_or_else(|e| { + error!( + "Failed to map the data file (size: {}): {}.\n + Please increase sysctl vm.max_map_count or equivalent for your platform.", + self.capacity, e + ); + std::process::exit(1); + }); + + self.writer = Some(MmapAccountHashesFile { + mmap: map, + count: 0, + }); + } + self.writer.as_mut().unwrap().write(hash); } } @@ -338,7 +352,8 @@ impl CumulativeHashesFromFiles { let mut readers = Vec::with_capacity(hashes.len()); let cumulative = CumulativeOffsets::new(hashes.into_iter().filter_map(|mut hash_file| { // ignores all hashfiles that have zero entries - hash_file.get_reader().map(|(count, reader)| { + hash_file.get_reader().map(|reader| { + let count = reader.count; readers.push(reader); count }) @@ -985,15 +1000,12 @@ impl<'a> AccountsHasher<'a> { // map from index of an item in first_items[] to index of the corresponding item in sorted_data_by_pubkey[] // this will change as items in sorted_data_by_pubkey[] are exhausted let mut first_item_to_pubkey_division = Vec::with_capacity(len); - let mut hashes = AccountHashesFile { - count_and_writer: None, - dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), - }; + // initialize 'first_items', which holds the current lowest item in each slot group - sorted_data_by_pubkey + let max_inclusive_num_pubkeys = sorted_data_by_pubkey .iter() .enumerate() - .for_each(|(i, hash_data)| { + .map(|(i, hash_data)| { let first_pubkey_in_bin = Self::find_first_pubkey_in_bin(hash_data, pubkey_bin, bins, &binner, stats); if let Some(first_pubkey_in_bin) = first_pubkey_in_bin { @@ -1001,8 +1013,27 @@ impl<'a> AccountsHasher<'a> { first_items.push(k); first_item_to_pubkey_division.push(i); indexes.push(first_pubkey_in_bin); + let mut first_pubkey_in_next_bin = first_pubkey_in_bin + 1; + while first_pubkey_in_next_bin < hash_data.len() { + if binner.bin_from_pubkey(&hash_data[first_pubkey_in_next_bin].pubkey) + != pubkey_bin + { + break; + } + first_pubkey_in_next_bin += 1; + } + first_pubkey_in_next_bin - first_pubkey_in_bin + } else { + 0 } - }); + }) + .sum::(); + let mut hashes = AccountHashesFile { + writer: None, + dir_for_temp_cache_files: self.dir_for_temp_cache_files.clone(), + capacity: max_inclusive_num_pubkeys * std::mem::size_of::(), + }; + let mut overall_sum = 0; let mut duplicate_pubkey_indexes = Vec::with_capacity(len); let filler_accounts_enabled = self.filler_accounts_enabled(); @@ -1238,8 +1269,9 @@ pub mod tests { impl AccountHashesFile { fn new(dir_for_temp_cache_files: PathBuf) -> Self { Self { - count_and_writer: None, + writer: None, dir_for_temp_cache_files, + capacity: 1024, /* default 1k for tests */ } } } @@ -1308,16 +1340,16 @@ pub mod tests { // 1 hash file.write(&hashes[0]); let reader = file.get_reader().unwrap(); - assert_eq!(&[hashes[0]][..], reader.1.read(0)); - assert!(reader.1.read(1).is_empty()); + assert_eq!(&[hashes[0]][..], reader.read(0)); + assert!(reader.read(1).is_empty()); // multiple hashes let mut file = AccountHashesFile::new(dir_for_temp_cache_files.path().to_path_buf()); assert!(file.get_reader().is_none()); hashes.iter().for_each(|hash| file.write(hash)); let reader = file.get_reader().unwrap(); - (0..2).for_each(|i| assert_eq!(&hashes[i..], reader.1.read(i))); - assert!(reader.1.read(2).is_empty()); + (0..2).for_each(|i| assert_eq!(&hashes[i..], reader.read(i))); + assert!(reader.read(2).is_empty()); } #[test] @@ -1476,7 +1508,7 @@ pub mod tests { let accounts_hasher = AccountsHasher::new(dir_for_temp_cache_files.path().to_path_buf()); let (mut hashes, lamports) = accounts_hasher.de_dup_accounts_in_parallel(&slice, 0, 1, &HashStats::default()); - assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().1.read(0)); + assert_eq!(&[Hash::default()], hashes.get_reader().unwrap().read(0)); assert_eq!(lamports, 1); } @@ -1486,7 +1518,7 @@ pub mod tests { fn get_vec(mut hashes: AccountHashesFile) -> Vec { hashes .get_reader() - .map(|r| r.1.read(0).to_vec()) + .map(|r| r.read(0).to_vec()) .unwrap_or_default() }