From dc47a56c22c2d447916cdbfc240e47fcac40a317 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Tue, 5 Oct 2021 09:59:17 -0500 Subject: [PATCH] AcctIdx: bucket map grows with read lock (#20397) --- bucket_map/src/bucket.rs | 144 +++++++++++++++++++++++++------ bucket_map/src/bucket_api.rs | 9 +- bucket_map/src/bucket_storage.rs | 58 +++++++++---- 3 files changed, 162 insertions(+), 49 deletions(-) diff --git a/bucket_map/src/bucket.rs b/bucket_map/src/bucket.rs index edc0fe874..61e227537 100644 --- a/bucket_map/src/bucket.rs +++ b/bucket_map/src/bucket.rs @@ -13,20 +13,61 @@ use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::ops::RangeBounds; use std::path::PathBuf; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::Mutex; + +#[derive(Default)] +pub struct ReallocatedItems { + // Some if the index was reallocated + // u64 is random associated with the new index + pub index: Option<(u64, BucketStorage)>, + // Some for a data bucket reallocation + // u64 is data bucket index + pub data: Option<(u64, BucketStorage)>, +} + +#[derive(Default)] +pub struct Reallocated { + /// > 0 if reallocations are encoded + pub active_reallocations: AtomicUsize, + + /// actual reallocated bucket + /// mutex because bucket grow code runs with a read lock + pub items: Mutex, +} + +impl Reallocated { + /// specify that a reallocation has occurred + pub fn add_reallocation(&self) { + assert_eq!( + 0, + self.active_reallocations.fetch_add(1, Ordering::Relaxed), + "Only 1 reallocation can occur at a time" + ); + } + /// Return true IFF a reallocation has occurred. + /// Calling this takes conceptual ownership of the reallocation encoded in the struct. + pub fn get_reallocated(&self) -> bool { + self.active_reallocations + .compare_exchange(1, 0, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + } +} // >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data pub struct Bucket { drives: Arc>, //index - index: BucketStorage, + pub index: BucketStorage, //random offset for the index random: u64, //storage buckets to store SlotSlice up to a power of 2 in len pub data: Vec, _phantom: PhantomData, stats: Arc, + + pub reallocated: Reallocated, } impl Bucket { @@ -49,6 +90,7 @@ impl Bucket { data: vec![], _phantom: PhantomData::default(), stats, + reallocated: Reallocated::default(), } } @@ -273,10 +315,10 @@ impl Bucket { } } - pub fn grow_index(&mut self, sz: u8) { - if self.index.capacity_pow2 == sz { + pub fn grow_index(&self, current_capacity: u8) { + if self.index.capacity_pow2 == current_capacity { let mut m = Measure::start("grow_index"); - //debug!("GROW_INDEX: {}", sz); + //debug!("GROW_INDEX: {}", current_capacity); let increment = 1; for i in increment.. { //increasing the capacity by ^4 reduces the @@ -286,6 +328,7 @@ impl Bucket { Arc::clone(&self.drives), 1, std::mem::size_of::() as u64, + // *2 causes rapid growth of index buckets self.index.capacity_pow2 + i, // * 2, self.index.max_search, Arc::clone(&self.stats.index), @@ -316,17 +359,18 @@ impl Bucket { } } if valid { - self.index = index; - self.random = random; + let sz = index.capacity(); + { + let mut max = self.stats.index.max_size.lock().unwrap(); + *max = std::cmp::max(*max, sz); + } + let mut items = self.reallocated.items.lock().unwrap(); + items.index = Some((random, index)); + self.reallocated.add_reallocation(); break; } } m.stop(); - let sz = 1 << self.index.capacity_pow2; - { - let mut max = self.stats.index.max_size.lock().unwrap(); - *max = std::cmp::max(*max, sz); - } self.stats.index.resizes.fetch_add(1, Ordering::Relaxed); self.stats .index @@ -335,22 +379,45 @@ impl Bucket { } } - pub fn grow_data(&mut self, sz: (u64, u8)) { - if self.data.get(sz.0 as usize).is_none() { - for i in self.data.len() as u64..(sz.0 + 1) { + pub fn apply_grow_index(&mut self, random: u64, index: BucketStorage) { + self.random = random; + self.index = index; + } + + fn elem_size() -> u64 { + std::mem::size_of::() as u64 + } + + pub fn apply_grow_data(&mut self, ix: usize, bucket: BucketStorage) { + if self.data.get(ix).is_none() { + for i in self.data.len()..ix { + // insert empty data buckets self.data.push(BucketStorage::new( Arc::clone(&self.drives), 1 << i, - std::mem::size_of::() as u64, + Self::elem_size(), self.index.max_search, Arc::clone(&self.stats.data), )) } + self.data.push(bucket); + } else { + self.data[ix] = bucket; } - if self.data[sz.0 as usize].capacity_pow2 == sz.1 { - //debug!("GROW_DATA: {} {}", sz.0, sz.1); - self.data[sz.0 as usize].grow(); - } + } + + /// grow a data bucket + /// The application of the new bucket is deferred until the next write lock. + pub fn grow_data(&self, data_index: u64, current_capacity: u8) { + let new_bucket = self.index.grow( + self.data.get(data_index as usize), + current_capacity + 1, + 1 << data_index, + Self::elem_size(), + ); + self.reallocated.add_reallocation(); + let mut items = self.reallocated.items.lock().unwrap(); + items.data = Some((data_index, new_bucket)); } fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 { @@ -366,16 +433,34 @@ impl Bucket { //debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() ); } - /// grow the appropriate piece - pub fn grow(&mut self, err: BucketMapError) { + /// grow the appropriate piece. Note this takes an immutable ref. + /// The actual grow is set into self.reallocated and applied later on a write lock + pub fn grow(&self, err: BucketMapError) { match err { - BucketMapError::DataNoSpace(sz) => { - //debug!("GROWING SPACE {:?}", sz); - self.grow_data(sz); + BucketMapError::DataNoSpace((data_index, current_capacity)) => { + //debug!("GROWING SPACE {:?}", (data_index, current_capacity)); + self.grow_data(data_index, current_capacity); } - BucketMapError::IndexNoSpace(sz) => { + BucketMapError::IndexNoSpace(current_capacity) => { //debug!("GROWING INDEX {}", sz); - self.grow_index(sz); + self.grow_index(current_capacity); + } + } + } + + /// if a bucket was resized previously with a read lock, then apply that resize now + pub fn handle_delayed_grows(&mut self) { + if self.reallocated.get_reallocated() { + // swap out the bucket that was resized previously with a read lock + let mut items = ReallocatedItems::default(); + std::mem::swap(&mut items, &mut self.reallocated.items.lock().unwrap()); + + if let Some((random, bucket)) = items.index.take() { + self.apply_grow_index(random, bucket); + } else { + // data bucket + let (i, new_bucket) = items.data.take().unwrap(); + self.apply_grow_data(i as usize, new_bucket); } } } @@ -386,7 +471,10 @@ impl Bucket { let rv = self.try_write(key, new, refct); match rv { Ok(_) => return, - Err(err) => self.grow(err), + Err(err) => { + self.grow(err); + self.handle_delayed_grows(); + } } } } diff --git a/bucket_map/src/bucket_api.rs b/bucket_map/src/bucket_api.rs index 0fa079dd0..5cf4a40e9 100644 --- a/bucket_map/src/bucket_api.rs +++ b/bucket_map/src/bucket_api.rs @@ -89,6 +89,8 @@ impl BucketApi { self.max_search, Arc::clone(&self.stats), )); + } else { + bucket.as_mut().unwrap().handle_delayed_grows(); } bucket } @@ -111,8 +113,11 @@ impl BucketApi { } pub fn grow(&self, err: BucketMapError) { - let mut bucket = self.get_write_bucket(); - bucket.as_mut().unwrap().grow(err) + // grows are special - they get a read lock and modify 'reallocated' + // the grown changes are applied the next time there is a write lock taken + if let Some(bucket) = self.bucket.read().unwrap().as_ref() { + bucket.grow(err) + } } pub fn update(&self, key: &Pubkey, updatefn: F) diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index 0a007cc58..06f7078c9 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -299,46 +299,66 @@ impl BucketStorage { res } - pub fn grow(&mut self) { + /// copy contents from 'old_bucket' to 'self' + fn copy_contents(&mut self, old_bucket: &Self) { let mut m = Measure::start("grow"); - let old_cap = self.capacity(); - let old_map = &self.mmap; - let old_file = self.path.clone(); + let old_cap = old_bucket.capacity(); + let old_map = &old_bucket.mmap; let increment = 1; let index_grow = 1 << increment; let (new_map, new_file) = Self::new_map( - &self.drives, - self.cell_size as usize, - self.capacity_pow2 + increment, - &self.stats, + &old_bucket.drives, + old_bucket.cell_size as usize, + old_bucket.capacity_pow2 + increment, + &old_bucket.stats, ); (0..old_cap as usize).into_iter().for_each(|i| { - let old_ix = i * self.cell_size as usize; + let old_ix = i * old_bucket.cell_size as usize; let new_ix = old_ix * index_grow; - let dst_slice: &[u8] = &new_map[new_ix..new_ix + self.cell_size as usize]; - let src_slice: &[u8] = &old_map[old_ix..old_ix + self.cell_size as usize]; + let dst_slice: &[u8] = &new_map[new_ix..new_ix + old_bucket.cell_size as usize]; + let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize]; unsafe { let dst = dst_slice.as_ptr() as *mut u8; let src = src_slice.as_ptr() as *const u8; - std::ptr::copy_nonoverlapping(src, dst, self.cell_size as usize); + std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize); }; }); self.mmap = new_map; self.path = new_file; - self.capacity_pow2 += increment; - remove_file(old_file).unwrap(); m.stop(); - let sz = 1 << self.capacity_pow2; - { - let mut max = self.stats.max_size.lock().unwrap(); - *max = std::cmp::max(*max, sz); - } self.stats.resizes.fetch_add(1, Ordering::Relaxed); self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed); } + /// allocate a new bucket based on 'self', but copying data from 'bucket' + pub fn grow( + &self, + bucket: Option<&Self>, + capacity_pow_2: u8, + num_elems: u64, + elem_size: u64, + ) -> Self { + let mut new_bucket = Self::new_with_capacity( + Arc::clone(&self.drives), + num_elems, + elem_size, + capacity_pow_2, + self.max_search, + Arc::clone(&self.stats), + ); + if let Some(bucket) = bucket { + new_bucket.copy_contents(bucket); + } + let sz = new_bucket.capacity(); + { + let mut max = new_bucket.stats.max_size.lock().unwrap(); + *max = std::cmp::max(*max, sz); + } + new_bucket + } + /// Return the number of cells currently allocated pub fn capacity(&self) -> u64 { 1 << self.capacity_pow2