diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index 505b1a79a..d6cdbd3ac 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -4,7 +4,7 @@ use { bucket_map::BucketMapError, bucket_stats::BucketMapStats, bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2}, - index_entry::{DataBucket, IndexBucket, IndexEntry}, + index_entry::{DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket}, MaxSearch, RefCount, }, rand::{thread_rng, Rng}, @@ -142,13 +142,13 @@ impl<'b, T: Clone + Copy + 'static> Bucket { if self.index.is_free(ii) { continue; } - let ix: &IndexEntry = self.index.get(ii); - let key = ix.key; - if range.map(|r| r.contains(&key)).unwrap_or(true) { - let val = ix.read_value(self); + let ix = IndexEntryPlaceInBucket::new(ii); + let key = ix.key(&self.index); + if range.map(|r| r.contains(key)).unwrap_or(true) { + let val = ix.read_value(&self.index, &self.data); result.push(BucketItem { - pubkey: key, - ref_count: ix.ref_count(), + pubkey: *key, + ref_count: ix.ref_count(&self.index), slot_list: val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(), }); } @@ -156,7 +156,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { result } - pub fn find_index_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> { + pub fn find_index_entry(&self, key: &Pubkey) -> Option<(IndexEntryPlaceInBucket, u64)> { Self::bucket_find_index_entry(&self.index, key, self.random) } @@ -164,11 +164,11 @@ impl<'b, T: Clone + Copy + 'static> Bucket { /// if entry exists, return the entry along with the index of the existing entry /// if entry does not exist, return just the index of an empty entry appropriate for this key /// returns (existing entry, index of the found or empty entry) - fn find_index_entry_mut<'a>( - index: &'a mut BucketStorage, + fn find_index_entry_mut( + index: &mut BucketStorage, key: &Pubkey, random: u64, - ) -> Result<(Option<&'a mut IndexEntry>, u64), BucketMapError> { + ) -> Result<(Option, u64), BucketMapError> { let ix = Self::bucket_index_ix(index, key, random); let mut first_free = None; let mut m = Measure::start("bucket_find_index_entry_mut"); @@ -181,15 +181,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } continue; } - let elem: &IndexEntry = index.get(ii); - if elem.key == *key { + let elem = IndexEntryPlaceInBucket::new(ii); + if elem.key(index) == key { m.stop(); index .stats .find_index_entry_mut_us .fetch_add(m.as_us(), Ordering::Relaxed); - return Ok((Some(index.get_mut(ii)), ii)); + return Ok((Some(elem), ii)); } } m.stop(); @@ -203,19 +203,19 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } } - fn bucket_find_index_entry<'a>( - index: &'a BucketStorage, + fn bucket_find_index_entry( + index: &BucketStorage, key: &Pubkey, random: u64, - ) -> Option<(&'a IndexEntry, u64)> { + ) -> Option<(IndexEntryPlaceInBucket, u64)> { let ix = Self::bucket_index_ix(index, key, random); for i in ix..ix + index.max_search() { let ii = i % index.capacity(); if index.is_free(ii) { continue; } - let elem: &IndexEntry = index.get(ii); - if elem.key == *key { + let elem = IndexEntryPlaceInBucket::new(ii); + if elem.key(index) == key { return Some((elem, ii)); } } @@ -236,10 +236,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket { continue; } index.occupy(ii, is_resizing).unwrap(); - let elem: &mut IndexEntry = index.get_mut(ii); // These fields will be overwritten after allocation by callers. // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. - elem.init(key); + IndexEntryPlaceInBucket::new(ii).init(index, key); //debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid ); m.stop(); index @@ -259,7 +258,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket { pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { //debug!("READ_VALUE: {:?}", key); let (elem, _) = self.find_index_entry(key)?; - elem.read_value(self) + elem.read_value(&self.index, &self.data) } pub fn try_write( @@ -281,23 +280,23 @@ impl<'b, T: Clone + Copy + 'static> Bucket { } else { let is_resizing = false; self.index.occupy(elem_ix, is_resizing).unwrap(); + let elem_allocate = IndexEntryPlaceInBucket::new(elem_ix); // These fields will be overwritten after allocation by callers. // Since this part of the mmapped file could have previously been used by someone else, there can be garbage here. - let elem_allocate: &mut IndexEntry = self.index.get_mut(elem_ix); - elem_allocate.init(key); + elem_allocate.init(&mut self.index, key); elem_allocate }; - elem.ref_count = ref_count; - let bucket_ix = elem.data_bucket_ix(); + elem.set_ref_count(&mut self.index, ref_count); + let bucket_ix = elem.data_bucket_ix(&self.index); let current_bucket = &self.data[bucket_ix as usize]; let num_slots = data_len as u64; - if best_fit_bucket == bucket_ix && elem.num_slots > 0 { + if best_fit_bucket == bucket_ix && elem.num_slots(&self.index) > 0 { // in place update - let elem_loc = elem.data_loc(current_bucket); + let elem_loc = elem.data_loc(&self.index, current_bucket); let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64); assert!(!current_bucket.is_free(elem_loc)); - elem.num_slots = num_slots; + elem.set_num_slots(&mut self.index, num_slots); slice.iter_mut().zip(data).for_each(|(dest, src)| { *dest = *src; @@ -320,11 +319,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket { for i in pos..pos + (max_search * 10).min(cap) { let ix = i % cap; if best_bucket.is_free(ix) { - let elem_loc = elem.data_loc(current_bucket); - let old_slots = elem.num_slots; - elem.set_storage_offset(ix); - elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2); - elem.num_slots = num_slots; + let elem_loc = elem.data_loc(&self.index, current_bucket); + let old_slots = elem.num_slots(&self.index); + elem.set_storage_offset(&mut self.index, ix); + elem.set_storage_capacity_when_created_pow2( + &mut self.index, + best_bucket.capacity_pow2, + ); + elem.set_num_slots(&mut self.index, num_slots); if old_slots > 0 { let current_bucket = &mut self.data[bucket_ix as usize]; current_bucket.free(elem_loc); @@ -347,10 +349,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket { pub fn delete_key(&mut self, key: &Pubkey) { if let Some((elem, elem_ix)) = self.find_index_entry(key) { - if elem.num_slots > 0 { - let ix = elem.data_bucket_ix() as usize; + if elem.num_slots(&self.index) > 0 { + let ix = elem.data_bucket_ix(&self.index) as usize; let data_bucket = &self.data[ix]; - let loc = elem.data_loc(data_bucket); + let loc = elem.data_loc(&self.index, data_bucket); let data_bucket = &mut self.data[ix]; //debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid ); data_bucket.free(loc); diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index 12373c8ce..76939cb19 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -179,12 +179,12 @@ impl BucketStorage { } } - fn get_start_offset_with_header(&self, ix: u64) -> usize { + pub(crate) fn get_start_offset_with_header(&self, ix: u64) -> usize { assert!(ix < self.capacity(), "bad index size"); (self.cell_size * ix) as usize } - fn get_start_offset_no_header(&self, ix: u64) -> usize { + pub(crate) fn get_start_offset_no_header(&self, ix: u64) -> usize { self.get_start_offset_with_header(ix) + O::offset_to_first_data() } diff --git a/bucket_map/src/index_entry.rs b/bucket_map/src/index_entry.rs index 4cfb14afe..8a5f360ef 100644 --- a/bucket_map/src/index_entry.rs +++ b/bucket_map/src/index_entry.rs @@ -2,7 +2,6 @@ use { crate::{ - bucket::Bucket, bucket_storage::{BucketOccupied, BucketStorage}, RefCount, }, @@ -56,16 +55,22 @@ impl BucketOccupied for BucketWithBitVec { pub type DataBucket = BucketWithBitVec; pub type IndexBucket = BucketWithBitVec; +/// contains the index of an entry in the index bucket. +/// This type allows us to call methods to interact with the index entry on this type. +pub struct IndexEntryPlaceInBucket { + pub ix: u64, +} + #[repr(C)] #[derive(Debug, Copy, Clone, PartialEq, Eq)] // one instance of this per item in the index // stored in the index bucket pub struct IndexEntry { pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? - pub ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts? + ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts? storage_cap_and_offset: PackedStorage, // if the bucket doubled, the index can be recomputed using create_bucket_capacity_pow2 - pub num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list + num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list } /// Pack the storage offset and capacity-when-crated-pow2 fields into a single u64 @@ -78,27 +83,6 @@ struct PackedStorage { } impl IndexEntry { - pub fn init(&mut self, pubkey: &Pubkey) { - self.key = *pubkey; - self.ref_count = 0; - self.storage_cap_and_offset = PackedStorage::default(); - self.num_slots = 0; - } - - pub fn set_storage_capacity_when_created_pow2( - &mut self, - storage_capacity_when_created_pow2: u8, - ) { - self.storage_cap_and_offset - .set_capacity_when_created_pow2(storage_capacity_when_created_pow2) - } - - pub fn set_storage_offset(&mut self, storage_offset: u64) { - self.storage_cap_and_offset - .set_offset_checked(storage_offset) - .expect("New storage offset must fit into 7 bytes!") - } - /// return closest bucket index fit for the slot slice. /// Since bucket size is 2^index, the return value is /// min index, such that 2^index >= num_slots @@ -112,47 +96,131 @@ impl IndexEntry { (Slot::BITS - (num_slots - 1).leading_zeros()) as u64 } } +} - pub fn data_bucket_ix(&self) -> u64 { - Self::data_bucket_from_num_slots(self.num_slots) +impl IndexEntryPlaceInBucket { + pub fn init(&self, index_bucket: &mut BucketStorage, pubkey: &Pubkey) { + let index_entry = index_bucket.get_mut::(self.ix); + index_entry.key = *pubkey; + index_entry.ref_count = 0; + index_entry.storage_cap_and_offset = PackedStorage::default(); + index_entry.num_slots = 0; } - pub fn ref_count(&self) -> RefCount { - self.ref_count + pub fn set_storage_capacity_when_created_pow2( + &self, + index_bucket: &mut BucketStorage, + storage_capacity_when_created_pow2: u8, + ) { + index_bucket + .get_mut::(self.ix) + .storage_cap_and_offset + .set_capacity_when_created_pow2(storage_capacity_when_created_pow2) } - fn storage_capacity_when_created_pow2(&self) -> u8 { - self.storage_cap_and_offset.capacity_when_created_pow2() + pub fn set_storage_offset( + &self, + index_bucket: &mut BucketStorage, + storage_offset: u64, + ) { + index_bucket + .get_mut::(self.ix) + .storage_cap_and_offset + .set_offset_checked(storage_offset) + .expect("New storage offset must fit into 7 bytes!"); } - fn storage_offset(&self) -> u64 { - self.storage_cap_and_offset.offset() + pub fn data_bucket_ix(&self, index_bucket: &BucketStorage) -> u64 { + IndexEntry::data_bucket_from_num_slots(self.num_slots(index_bucket)) } - // This function maps the original data location into an index in the current bucket storage. - // This is coupled with how we resize bucket storages. - pub fn data_loc(&self, storage: &BucketStorage) -> u64 { - self.storage_offset() << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2()) + pub fn ref_count(&self, index_bucket: &BucketStorage) -> RefCount { + let index_entry = index_bucket.get::(self.ix); + index_entry.ref_count } - pub fn read_value<'a, T: 'static>(&self, bucket: &'a Bucket) -> Option<(&'a [T], RefCount)> { - let slice = if self.num_slots > 0 { - let data_bucket_ix = self.data_bucket_ix(); - let data_bucket = &bucket.data[data_bucket_ix as usize]; - let loc = self.data_loc(data_bucket); + fn storage_capacity_when_created_pow2(&self, index_bucket: &BucketStorage) -> u8 { + let index_entry = index_bucket.get::(self.ix); + index_entry + .storage_cap_and_offset + .capacity_when_created_pow2() + } + + pub fn storage_offset(&self, index_bucket: &BucketStorage) -> u64 { + index_bucket + .get::(self.ix) + .storage_cap_and_offset + .offset() + } + + /// This function maps the original data location into an index in the current bucket storage. + /// This is coupled with how we resize bucket storages. + pub fn data_loc( + &self, + index_bucket: &BucketStorage, + storage: &BucketStorage, + ) -> u64 { + let index_entry = index_bucket.get::(self.ix); + self.storage_offset(index_bucket) + << (storage.capacity_pow2 + - index_entry + .storage_cap_and_offset + .capacity_when_created_pow2()) + } + + pub fn read_value<'a, T>( + &self, + index_bucket: &BucketStorage, + data_buckets: &'a [BucketStorage], + ) -> Option<(&'a [T], RefCount)> { + let num_slots = self.num_slots(index_bucket); + let slice = if num_slots > 0 { + let data_bucket_ix = self.data_bucket_ix(index_bucket); + let data_bucket = &data_buckets[data_bucket_ix as usize]; + let loc = self.data_loc(index_bucket, data_bucket); assert!(!data_bucket.is_free(loc)); - data_bucket.get_cell_slice(loc, self.num_slots) + data_bucket.get_cell_slice(loc, num_slots) } else { // num_slots is 0. This means we don't have an actual allocation. &[] }; - Some((slice, self.ref_count)) + Some((slice, self.ref_count(index_bucket))) + } + + pub fn new(ix: u64) -> Self { + Self { ix } + } + + pub fn key<'a>(&self, index_bucket: &'a BucketStorage) -> &'a Pubkey { + let entry: &IndexEntry = index_bucket.get(self.ix); + &entry.key + } + + pub fn set_ref_count( + &self, + index_bucket: &mut BucketStorage, + ref_count: RefCount, + ) { + let index_entry = index_bucket.get_mut::(self.ix); + index_entry.ref_count = ref_count; + } + + pub fn num_slots(&self, index_bucket: &BucketStorage) -> Slot { + index_bucket.get::(self.ix).num_slots + } + + pub fn set_num_slots(&self, index_bucket: &mut BucketStorage, num_slots: Slot) { + index_bucket.get_mut::(self.ix).num_slots = num_slots; } } #[cfg(test)] mod tests { - use super::*; + use { + super::*, + std::{path::PathBuf, sync::Arc}, + tempfile::tempdir, + }; impl IndexEntry { pub fn new(key: Pubkey) -> Self { @@ -170,16 +238,19 @@ mod tests { #[test] fn test_api() { for offset in [0, 1, u32::MAX as u64] { - let mut index = IndexEntry::new(solana_sdk::pubkey::new_rand()); + let (mut index_bucket, index) = index_entry_for_testing(); if offset != 0 { - index.set_storage_offset(offset); + index.set_storage_offset(&mut index_bucket, offset); } - assert_eq!(index.storage_offset(), offset); - assert_eq!(index.storage_capacity_when_created_pow2(), 0); + assert_eq!(index.storage_offset(&index_bucket,), offset); + assert_eq!(index.storage_capacity_when_created_pow2(&index_bucket,), 0); for pow in [1, 255, 0] { - index.set_storage_capacity_when_created_pow2(pow); - assert_eq!(index.storage_offset(), offset); - assert_eq!(index.storage_capacity_when_created_pow2(), pow); + index.set_storage_capacity_when_created_pow2(&mut index_bucket, pow); + assert_eq!(index.storage_offset(&index_bucket,), offset); + assert_eq!( + index.storage_capacity_when_created_pow2(&index_bucket,), + pow + ); } } } @@ -190,12 +261,32 @@ mod tests { assert_eq!(std::mem::size_of::(), 32 + 8 + 8 + 8); } + fn index_bucket_for_testing() -> BucketStorage { + let tmpdir = tempdir().unwrap(); + let paths: Vec = vec![tmpdir.path().to_path_buf()]; + assert!(!paths.is_empty()); + + // `new` here creates a file in `tmpdir`. Once the file is created, `tmpdir` can be dropped without issue. + BucketStorage::::new( + Arc::new(paths), + 1, + std::mem::size_of::() as u64, + 1, + Arc::default(), + Arc::default(), + ) + } + + fn index_entry_for_testing() -> (BucketStorage, IndexEntryPlaceInBucket) { + (index_bucket_for_testing(), IndexEntryPlaceInBucket::new(0)) + } + #[test] #[should_panic(expected = "New storage offset must fit into 7 bytes!")] fn test_set_storage_offset_value_too_large() { let too_big = 1 << 56; - let mut index = IndexEntry::new(Pubkey::new_unique()); - index.set_storage_offset(too_big); + let (mut index_bucket, index) = index_entry_for_testing(); + index.set_storage_offset(&mut index_bucket, too_big); } #[test]