AcctIdx: bucket map grows with read lock (#20397)

This commit is contained in:
Jeff Washington (jwash) 2021-10-05 09:59:17 -05:00 committed by GitHub
parent 4e3818e5c1
commit dc47a56c22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 162 additions and 49 deletions

View File

@ -13,20 +13,61 @@ use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::ops::RangeBounds;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Default)]
pub struct ReallocatedItems {
// Some if the index was reallocated
// u64 is random associated with the new index
pub index: Option<(u64, BucketStorage)>,
// Some for a data bucket reallocation
// u64 is data bucket index
pub data: Option<(u64, BucketStorage)>,
}
#[derive(Default)]
pub struct Reallocated {
/// > 0 if reallocations are encoded
pub active_reallocations: AtomicUsize,
/// actual reallocated bucket
/// mutex because bucket grow code runs with a read lock
pub items: Mutex<ReallocatedItems>,
}
impl Reallocated {
/// specify that a reallocation has occurred
pub fn add_reallocation(&self) {
assert_eq!(
0,
self.active_reallocations.fetch_add(1, Ordering::Relaxed),
"Only 1 reallocation can occur at a time"
);
}
/// Return true IFF a reallocation has occurred.
/// Calling this takes conceptual ownership of the reallocation encoded in the struct.
pub fn get_reallocated(&self) -> bool {
self.active_reallocations
.compare_exchange(1, 0, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
}
}
// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T> {
drives: Arc<Vec<PathBuf>>,
//index
index: BucketStorage,
pub index: BucketStorage,
//random offset for the index
random: u64,
//storage buckets to store SlotSlice up to a power of 2 in len
pub data: Vec<BucketStorage>,
_phantom: PhantomData<T>,
stats: Arc<BucketMapStats>,
pub reallocated: Reallocated,
}
impl<T: Clone + Copy> Bucket<T> {
@ -49,6 +90,7 @@ impl<T: Clone + Copy> Bucket<T> {
data: vec![],
_phantom: PhantomData::default(),
stats,
reallocated: Reallocated::default(),
}
}
@ -273,10 +315,10 @@ impl<T: Clone + Copy> Bucket<T> {
}
}
pub fn grow_index(&mut self, sz: u8) {
if self.index.capacity_pow2 == sz {
pub fn grow_index(&self, current_capacity: u8) {
if self.index.capacity_pow2 == current_capacity {
let mut m = Measure::start("grow_index");
//debug!("GROW_INDEX: {}", sz);
//debug!("GROW_INDEX: {}", current_capacity);
let increment = 1;
for i in increment.. {
//increasing the capacity by ^4 reduces the
@ -286,6 +328,7 @@ impl<T: Clone + Copy> Bucket<T> {
Arc::clone(&self.drives),
1,
std::mem::size_of::<IndexEntry>() as u64,
// *2 causes rapid growth of index buckets
self.index.capacity_pow2 + i, // * 2,
self.index.max_search,
Arc::clone(&self.stats.index),
@ -316,17 +359,18 @@ impl<T: Clone + Copy> Bucket<T> {
}
}
if valid {
self.index = index;
self.random = random;
let sz = index.capacity();
{
let mut max = self.stats.index.max_size.lock().unwrap();
*max = std::cmp::max(*max, sz);
}
let mut items = self.reallocated.items.lock().unwrap();
items.index = Some((random, index));
self.reallocated.add_reallocation();
break;
}
}
m.stop();
let sz = 1 << self.index.capacity_pow2;
{
let mut max = self.stats.index.max_size.lock().unwrap();
*max = std::cmp::max(*max, sz);
}
self.stats.index.resizes.fetch_add(1, Ordering::Relaxed);
self.stats
.index
@ -335,22 +379,45 @@ impl<T: Clone + Copy> Bucket<T> {
}
}
pub fn grow_data(&mut self, sz: (u64, u8)) {
if self.data.get(sz.0 as usize).is_none() {
for i in self.data.len() as u64..(sz.0 + 1) {
pub fn apply_grow_index(&mut self, random: u64, index: BucketStorage) {
self.random = random;
self.index = index;
}
fn elem_size() -> u64 {
std::mem::size_of::<T>() as u64
}
pub fn apply_grow_data(&mut self, ix: usize, bucket: BucketStorage) {
if self.data.get(ix).is_none() {
for i in self.data.len()..ix {
// insert empty data buckets
self.data.push(BucketStorage::new(
Arc::clone(&self.drives),
1 << i,
std::mem::size_of::<T>() as u64,
Self::elem_size(),
self.index.max_search,
Arc::clone(&self.stats.data),
))
}
self.data.push(bucket);
} else {
self.data[ix] = bucket;
}
if self.data[sz.0 as usize].capacity_pow2 == sz.1 {
//debug!("GROW_DATA: {} {}", sz.0, sz.1);
self.data[sz.0 as usize].grow();
}
}
/// grow a data bucket
/// The application of the new bucket is deferred until the next write lock.
pub fn grow_data(&self, data_index: u64, current_capacity: u8) {
let new_bucket = self.index.grow(
self.data.get(data_index as usize),
current_capacity + 1,
1 << data_index,
Self::elem_size(),
);
self.reallocated.add_reallocation();
let mut items = self.reallocated.items.lock().unwrap();
items.data = Some((data_index, new_bucket));
}
fn bucket_index_ix(index: &BucketStorage, key: &Pubkey, random: u64) -> u64 {
@ -366,16 +433,34 @@ impl<T: Clone + Copy> Bucket<T> {
//debug!( "INDEX_IX: {:?} uid:{} loc: {} cap:{}", key, uid, location, index.capacity() );
}
/// grow the appropriate piece
pub fn grow(&mut self, err: BucketMapError) {
/// grow the appropriate piece. Note this takes an immutable ref.
/// The actual grow is set into self.reallocated and applied later on a write lock
pub fn grow(&self, err: BucketMapError) {
match err {
BucketMapError::DataNoSpace(sz) => {
//debug!("GROWING SPACE {:?}", sz);
self.grow_data(sz);
BucketMapError::DataNoSpace((data_index, current_capacity)) => {
//debug!("GROWING SPACE {:?}", (data_index, current_capacity));
self.grow_data(data_index, current_capacity);
}
BucketMapError::IndexNoSpace(sz) => {
BucketMapError::IndexNoSpace(current_capacity) => {
//debug!("GROWING INDEX {}", sz);
self.grow_index(sz);
self.grow_index(current_capacity);
}
}
}
/// if a bucket was resized previously with a read lock, then apply that resize now
pub fn handle_delayed_grows(&mut self) {
if self.reallocated.get_reallocated() {
// swap out the bucket that was resized previously with a read lock
let mut items = ReallocatedItems::default();
std::mem::swap(&mut items, &mut self.reallocated.items.lock().unwrap());
if let Some((random, bucket)) = items.index.take() {
self.apply_grow_index(random, bucket);
} else {
// data bucket
let (i, new_bucket) = items.data.take().unwrap();
self.apply_grow_data(i as usize, new_bucket);
}
}
}
@ -386,7 +471,10 @@ impl<T: Clone + Copy> Bucket<T> {
let rv = self.try_write(key, new, refct);
match rv {
Ok(_) => return,
Err(err) => self.grow(err),
Err(err) => {
self.grow(err);
self.handle_delayed_grows();
}
}
}
}

View File

@ -89,6 +89,8 @@ impl<T: Clone + Copy> BucketApi<T> {
self.max_search,
Arc::clone(&self.stats),
));
} else {
bucket.as_mut().unwrap().handle_delayed_grows();
}
bucket
}
@ -111,8 +113,11 @@ impl<T: Clone + Copy> BucketApi<T> {
}
pub fn grow(&self, err: BucketMapError) {
let mut bucket = self.get_write_bucket();
bucket.as_mut().unwrap().grow(err)
// grows are special - they get a read lock and modify 'reallocated'
// the grown changes are applied the next time there is a write lock taken
if let Some(bucket) = self.bucket.read().unwrap().as_ref() {
bucket.grow(err)
}
}
pub fn update<F>(&self, key: &Pubkey, updatefn: F)

View File

@ -299,46 +299,66 @@ impl BucketStorage {
res
}
pub fn grow(&mut self) {
/// copy contents from 'old_bucket' to 'self'
fn copy_contents(&mut self, old_bucket: &Self) {
let mut m = Measure::start("grow");
let old_cap = self.capacity();
let old_map = &self.mmap;
let old_file = self.path.clone();
let old_cap = old_bucket.capacity();
let old_map = &old_bucket.mmap;
let increment = 1;
let index_grow = 1 << increment;
let (new_map, new_file) = Self::new_map(
&self.drives,
self.cell_size as usize,
self.capacity_pow2 + increment,
&self.stats,
&old_bucket.drives,
old_bucket.cell_size as usize,
old_bucket.capacity_pow2 + increment,
&old_bucket.stats,
);
(0..old_cap as usize).into_iter().for_each(|i| {
let old_ix = i * self.cell_size as usize;
let old_ix = i * old_bucket.cell_size as usize;
let new_ix = old_ix * index_grow;
let dst_slice: &[u8] = &new_map[new_ix..new_ix + self.cell_size as usize];
let src_slice: &[u8] = &old_map[old_ix..old_ix + self.cell_size as usize];
let dst_slice: &[u8] = &new_map[new_ix..new_ix + old_bucket.cell_size as usize];
let src_slice: &[u8] = &old_map[old_ix..old_ix + old_bucket.cell_size as usize];
unsafe {
let dst = dst_slice.as_ptr() as *mut u8;
let src = src_slice.as_ptr() as *const u8;
std::ptr::copy_nonoverlapping(src, dst, self.cell_size as usize);
std::ptr::copy_nonoverlapping(src, dst, old_bucket.cell_size as usize);
};
});
self.mmap = new_map;
self.path = new_file;
self.capacity_pow2 += increment;
remove_file(old_file).unwrap();
m.stop();
let sz = 1 << self.capacity_pow2;
{
let mut max = self.stats.max_size.lock().unwrap();
*max = std::cmp::max(*max, sz);
}
self.stats.resizes.fetch_add(1, Ordering::Relaxed);
self.stats.resize_us.fetch_add(m.as_us(), Ordering::Relaxed);
}
/// allocate a new bucket based on 'self', but copying data from 'bucket'
pub fn grow(
&self,
bucket: Option<&Self>,
capacity_pow_2: u8,
num_elems: u64,
elem_size: u64,
) -> Self {
let mut new_bucket = Self::new_with_capacity(
Arc::clone(&self.drives),
num_elems,
elem_size,
capacity_pow_2,
self.max_search,
Arc::clone(&self.stats),
);
if let Some(bucket) = bucket {
new_bucket.copy_contents(bucket);
}
let sz = new_bucket.capacity();
{
let mut max = new_bucket.stats.max_size.lock().unwrap();
*max = std::cmp::max(*max, sz);
}
new_bucket
}
/// Return the number of cells currently allocated
pub fn capacity(&self) -> u64 {
1 << self.capacity_pow2