diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index c191a54a12..d3ff2e0e56 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -17,7 +17,7 @@ use { ops::RangeBounds, path::PathBuf, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, }, @@ -81,6 +81,7 @@ impl Bucket { drives: Arc>, max_search: MaxSearch, stats: Arc, + count: Arc, ) -> Self { let index = BucketStorage::new( Arc::clone(&drives), @@ -88,6 +89,7 @@ impl Bucket { std::mem::size_of::() as u64, max_search, Arc::clone(&stats.index), + count, ); Self { random: thread_rng().gen(), @@ -100,10 +102,6 @@ impl Bucket { } } - pub fn bucket_len(&self) -> u64 { - self.index.used.load(Ordering::Relaxed) - } - pub fn keys(&self) -> Vec { let mut rv = vec![]; for i in 0..self.index.capacity() { @@ -120,7 +118,7 @@ impl Bucket { where R: RangeBounds, { - let mut result = Vec::with_capacity(self.index.used.load(Ordering::Relaxed) as usize); + let mut result = Vec::with_capacity(self.index.count.load(Ordering::Relaxed) as usize); for i in 0..self.index.capacity() { let ii = i % self.index.capacity(); if self.index.uid(ii) == UID_UNLOCKED { @@ -191,6 +189,7 @@ impl Bucket { key: &Pubkey, elem_uid: Uid, random: u64, + is_resizing: bool, ) -> Result { let ix = Self::bucket_index_ix(index, key, random); for i in ix..ix + index.max_search() { @@ -198,7 +197,7 @@ impl Bucket { if index.uid(ii) != UID_UNLOCKED { continue; } - index.allocate(ii, elem_uid).unwrap(); + index.allocate(ii, elem_uid, is_resizing).unwrap(); let mut elem: &mut IndexEntry = index.get_mut(ii); elem.key = *key; // These will be overwritten after allocation by callers. @@ -226,7 +225,13 @@ impl Bucket { } fn create_key(&self, key: &Pubkey) -> Result { - Self::bucket_create_key(&self.index, key, IndexEntry::key_uid(key), self.random) + Self::bucket_create_key( + &self.index, + key, + IndexEntry::key_uid(key), + self.random, + false, + ) } pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { @@ -285,7 +290,7 @@ impl Bucket { elem.num_slots = data.len() as u64; //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); if elem.num_slots > 0 { - best_bucket.allocate(ix, elem_uid).unwrap(); + best_bucket.allocate(ix, elem_uid, false).unwrap(); let slice = best_bucket.get_mut_cell_slice(ix, data.len() as u64); slice.copy_from_slice(data); } @@ -327,6 +332,7 @@ impl Bucket { self.index.capacity_pow2 + i, // * 2, self.index.max_search, Arc::clone(&self.stats.index), + Arc::clone(&self.index.count), ); let random = thread_rng().gen(); let mut valid = true; @@ -334,7 +340,7 @@ impl Bucket { let uid = self.index.uid(ix); if UID_UNLOCKED != uid { let elem: &IndexEntry = self.index.get(ix); - let new_ix = Self::bucket_create_key(&index, &elem.key, uid, random); + let new_ix = Self::bucket_create_key(&index, &elem.key, uid, random, true); if new_ix.is_err() { valid = false; break; @@ -391,6 +397,7 @@ impl Bucket { Self::elem_size(), self.index.max_search, Arc::clone(&self.stats.data), + Arc::default(), )) } self.data.push(bucket); diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index aa0a6e3435..8ec89e2e5e 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -30,14 +30,13 @@ impl BucketApi { drives: Arc>, max_search: MaxSearch, stats: Arc, - count: Arc, ) -> Self { Self { drives, max_search, stats, bucket: RwLock::default(), - count, + count: Arc::default(), } } @@ -73,12 +72,7 @@ impl BucketApi { } pub fn bucket_len(&self) -> u64 { - self.bucket - .read() - .unwrap() - .as_ref() - .map(|bucket| bucket.bucket_len()) - .unwrap_or_default() + self.count.load(Ordering::Relaxed) } pub fn delete_key(&self, key: &Pubkey) { @@ -95,11 +89,11 @@ impl BucketApi { Arc::clone(&self.drives), self.max_search, Arc::clone(&self.stats), + Arc::clone(&self.count), )); } else { let write = bucket.as_mut().unwrap(); write.handle_delayed_grows(); - self.count.store(write.bucket_len(), Ordering::Relaxed); } bucket } diff --git a/bucket_map/src/bucket_map.rs b/bucket_map/src/bucket_map.rs index 7cc0dbdf8f..9df7651da7 100644 --- a/bucket_map/src/bucket_map.rs +++ b/bucket_map/src/bucket_map.rs @@ -79,21 +79,14 @@ impl BucketMap { }); let drives = Arc::new(drives); - let mut per_bucket_count = Vec::with_capacity(config.max_buckets); - per_bucket_count.resize_with(config.max_buckets, Arc::default); - let stats = Arc::new(BucketMapStats { - per_bucket_count, - ..BucketMapStats::default() - }); - let buckets = stats - .per_bucket_count - .iter() - .map(|per_bucket_count| { + let stats = Arc::default(); + let buckets = (0..config.max_buckets) + .into_iter() + .map(|_| { Arc::new(BucketApi::new( Arc::clone(&drives), max_search, Arc::clone(&stats), - Arc::clone(per_bucket_count), )) }) .collect(); diff --git a/bucket_map/src/bucket_stats.rs b/bucket_map/src/bucket_stats.rs index d71c7f7e16..d102f16275 100644 --- a/bucket_map/src/bucket_stats.rs +++ b/bucket_map/src/bucket_stats.rs @@ -14,5 +14,4 @@ pub struct BucketStats { pub struct BucketMapStats { pub index: Arc, pub data: Arc, - pub per_bucket_count: Vec>, } diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index d3415fa04f..3a54f111ba 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -64,7 +64,7 @@ pub struct BucketStorage { mmap: MmapMut, pub cell_size: u64, pub capacity_pow2: u8, - pub used: AtomicU64, + pub count: Arc, pub stats: Arc, pub max_search: MaxSearch, } @@ -88,6 +88,7 @@ impl BucketStorage { capacity_pow2: u8, max_search: MaxSearch, stats: Arc, + count: Arc, ) -> Self { let cell_size = elem_size * num_elems + std::mem::size_of::
() as u64; let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity_pow2, &stats); @@ -95,7 +96,7 @@ impl BucketStorage { path, mmap, cell_size, - used: AtomicU64::new(0), + count, capacity_pow2, stats, max_search, @@ -112,6 +113,7 @@ impl BucketStorage { elem_size: u64, max_search: MaxSearch, stats: Arc, + count: Arc, ) -> Self { Self::new_with_capacity( drives, @@ -120,6 +122,7 @@ impl BucketStorage { DEFAULT_CAPACITY_POW2, max_search, stats, + count, ) } @@ -133,20 +136,24 @@ impl BucketStorage { } } - pub fn allocate(&self, ix: u64, uid: Uid) -> Result<(), BucketStorageError> { + /// 'is_resizing' true if caller is resizing the index (so don't increment count) + /// 'is_resizing' false if caller is adding an item to the index (so increment count) + pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> { assert!(ix < self.capacity(), "allocate: bad index size"); assert!(UID_UNLOCKED != uid, "allocate: bad uid"); let mut e = Err(BucketStorageError::AlreadyAllocated); let ix = (ix * self.cell_size) as usize; //debug!("ALLOC {} {}", ix, uid); let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::
()]; - unsafe { + if unsafe { let hdr = hdr_slice.as_ptr() as *const Header; - if hdr.as_ref().unwrap().try_lock(uid) { - e = Ok(()); - self.used.fetch_add(1, Ordering::Relaxed); + hdr.as_ref().unwrap().try_lock(uid) + } { + e = Ok(()); + if !is_resizing { + self.count.fetch_add(1, Ordering::Relaxed); } - }; + } e } @@ -165,7 +172,7 @@ impl BucketStorage { "free: unlocked a header with a differet uid: {}", previous_uid ); - self.used.fetch_sub(1, Ordering::Relaxed); + self.count.fetch_sub(1, Ordering::Relaxed); } } @@ -324,6 +331,9 @@ impl BucketStorage { capacity_pow_2, max_search, Arc::clone(stats), + bucket + .map(|bucket| Arc::clone(&bucket.count)) + .unwrap_or_default(), ); if let Some(bucket) = bucket { new_bucket.copy_contents(bucket); diff --git a/runtime/src/bucket_map_holder_stats.rs b/runtime/src/bucket_map_holder_stats.rs index 04082cce6f..2629e5724f 100644 --- a/runtime/src/bucket_map_holder_stats.rs +++ b/runtime/src/bucket_map_holder_stats.rs @@ -170,10 +170,9 @@ impl BucketMapHolderStats { let disk = storage.disk.as_ref(); let disk_per_bucket_counts = disk .map(|disk| { - disk.stats - .per_bucket_count - .iter() - .map(|count| count.load(Ordering::Relaxed) as usize) + (0..self.bins) + .into_iter() + .map(|i| disk.get_bucket_from_index(i as usize).bucket_len() as usize) .collect::>() }) .unwrap_or_default();