disk index: set_anticipated_count to optimally grow disk buckets at startup (#31033)

* disk index: set_anticipated_count to optimally grow disk buckets at startup

* remove atomic
This commit is contained in:
Jeff Washington (jwash) 2023-04-03 22:28:50 -05:00 committed by GitHub
parent 3442f184f7
commit bc343a431c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 5 deletions

View File

@ -96,6 +96,10 @@ pub struct Bucket<T: Copy + 'static> {
pub data: Vec<BucketStorage<DataBucket>>,
stats: Arc<BucketMapStats>,
/// # entries caller expects the map to need to contain.
/// Used as a hint for the next time we need to grow.
anticipated_size: u64,
pub reallocated: Reallocated<IndexBucket<T>, DataBucket>,
}
@ -123,6 +127,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
data: vec![],
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
}
}
@ -420,21 +425,28 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}
pub(crate) fn set_anticipated_count(&mut self, count: u64) {
self.anticipated_size = count;
}
pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.capacity_pow2 == current_capacity_pow2 {
let mut starting_size_pow2 = self.index.capacity_pow2;
if self.anticipated_size > 0 {
// start the growth at the next pow2 larger than what would be required to hold `anticipated_size`.
// This will prevent unnecessary repeated grows at startup.
starting_size_pow2 = starting_size_pow2.max(self.anticipated_size.ilog2() as u8);
}
let mut m = Measure::start("grow_index");
//debug!("GROW_INDEX: {}", current_capacity_pow2);
let increment = 1;
for i in increment.. {
//increasing the capacity by ^4 reduces the
//likelihood of a re-index collision of 2^(max_search)^2
//1 in 2^32
let mut index = BucketStorage::new_with_capacity(
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry<T>>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
// the subtle `+ i` here causes us to grow from the starting size by a power of 2 on each iteration of the for loop
starting_size_pow2 + i,
self.index.max_search,
Arc::clone(&self.stats.index),
Arc::clone(&self.index.count),

View File

@ -111,6 +111,13 @@ impl<T: Clone + Copy> BucketApi<T> {
}
}
/// caller can specify that the index needs to hold approximately `count` entries soon.
/// This gives a hint to the resizing algorithm and prevents repeated incremental resizes.
pub fn set_anticipated_count(&self, count: u64) {
let mut bucket = self.get_write_bucket();
bucket.as_mut().unwrap().set_anticipated_count(count);
}
pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,

View File

@ -1061,6 +1061,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
let mut count = 0;
let current_len = disk.bucket_len();
let anticipated = insert.len();
disk.set_anticipated_count((anticipated as u64).saturating_add(current_len));
insert.into_iter().for_each(|(slot, k, v)| {
let entry = (slot, v);
let new_ref_count = u64::from(!v.is_cached());
@ -1085,6 +1088,8 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}
});
});
// remove the guidance for how many entries the bucket will eventually contain since we have added all we knew about
disk.set_anticipated_count(0);
self.stats().inc_insert_count(count);
}