disk index: add `Capacity` (#31031)

* disk index: add `Capacity`

* pr feedback
This commit is contained in:
Jeff Washington (jwash) 2023-04-04 08:57:09 -05:00 committed by GitHub
parent bc343a431c
commit b0540ff5ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 32 deletions

View File

@ -3,7 +3,10 @@ use {
bucket_item::BucketItem, bucket_item::BucketItem,
bucket_map::BucketMapError, bucket_map::BucketMapError,
bucket_stats::BucketMapStats, bucket_stats::BucketMapStats,
bucket_storage::{BucketOccupied, BucketStorage, IncludeHeader, DEFAULT_CAPACITY_POW2}, bucket_storage::{
BucketCapacity, BucketOccupied, BucketStorage, Capacity, IncludeHeader,
DEFAULT_CAPACITY_POW2,
},
index_entry::{ index_entry::{
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots, DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
OccupiedEnum, OccupiedEnum,
@ -88,11 +91,11 @@ struct DataFileEntryToFree {
// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data // >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T: Copy + 'static> { pub struct Bucket<T: Copy + 'static> {
drives: Arc<Vec<PathBuf>>, drives: Arc<Vec<PathBuf>>,
//index /// index
pub index: BucketStorage<IndexBucket<T>>, pub index: BucketStorage<IndexBucket<T>>,
//random offset for the index /// random offset for the index
random: u64, random: u64,
//storage buckets to store SlotSlice up to a power of 2 in len /// storage buckets to store SlotSlice up to a power of 2 in len
pub data: Vec<BucketStorage<DataBucket>>, pub data: Vec<BucketStorage<DataBucket>>,
stats: Arc<BucketMapStats>, stats: Arc<BucketMapStats>,
@ -210,7 +213,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.fetch_add(m.as_us(), Ordering::Relaxed); .fetch_add(m.as_us(), Ordering::Relaxed);
match first_free { match first_free {
Some(ii) => Ok((None, ii)), Some(ii) => Ok((None, ii)),
None => Err(BucketMapError::IndexNoSpace(index.capacity_pow2)), None => Err(BucketMapError::IndexNoSpace(index.capacity.capacity_pow2())),
} }
} }
@ -263,7 +266,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
.stats .stats
.find_index_entry_mut_us .find_index_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed); .fetch_add(m.as_us(), Ordering::Relaxed);
Err(BucketMapError::IndexNoSpace(index.capacity_pow2)) Err(BucketMapError::IndexNoSpace(index.capacity.capacity_pow2()))
} }
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> { pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
@ -356,7 +359,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
// need to move the allocation to a best fit spot // need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize]; let best_bucket = &self.data[best_fit_bucket as usize];
let cap_power = best_bucket.capacity_pow2; let cap_power = best_bucket.capacity.capacity_pow2();
let cap = best_bucket.capacity(); let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap); let pos = thread_rng().gen_range(0, cap);
let mut success = false; let mut success = false;
@ -373,7 +376,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if best_bucket.is_free(ix) { if best_bucket.is_free(ix) {
let mut multiple_slots = MultipleSlots::default(); let mut multiple_slots = MultipleSlots::default();
multiple_slots.set_storage_offset(ix); multiple_slots.set_storage_offset(ix);
multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2); multiple_slots
.set_storage_capacity_when_created_pow2(best_bucket.capacity.capacity_pow2());
multiple_slots.set_num_slots(num_slots); multiple_slots.set_num_slots(num_slots);
elem.set_slot_count_enum_value( elem.set_slot_count_enum_value(
&mut self.index, &mut self.index,
@ -430,8 +434,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
} }
pub fn grow_index(&self, current_capacity_pow2: u8) { pub fn grow_index(&self, current_capacity_pow2: u8) {
if self.index.capacity_pow2 == current_capacity_pow2 { if self.index.capacity.capacity_pow2() == current_capacity_pow2 {
let mut starting_size_pow2 = self.index.capacity_pow2; let mut starting_size_pow2 = self.index.capacity.capacity_pow2();
if self.anticipated_size > 0 { if self.anticipated_size > 0 {
// start the growth at the next pow2 larger than what would be required to hold `anticipated_size`. // start the growth at the next pow2 larger than what would be required to hold `anticipated_size`.
// This will prevent unnecessary repeated grows at startup. // This will prevent unnecessary repeated grows at startup.
@ -446,7 +450,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
1, 1,
std::mem::size_of::<IndexEntry<T>>() as u64, std::mem::size_of::<IndexEntry<T>>() as u64,
// the subtle `+ i` here causes us to grow from the starting size by a power of 2 on each iteration of the for loop // the subtle `+ i` here causes us to grow from the starting size by a power of 2 on each iteration of the for loop
starting_size_pow2 + i, Capacity::Pow2(starting_size_pow2 + i),
self.index.max_search, self.index.max_search,
Arc::clone(&self.stats.index), Arc::clone(&self.stats.index),
Arc::clone(&self.index.count), Arc::clone(&self.index.count),
@ -540,7 +544,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
&self.drives, &self.drives,
self.index.max_search, self.index.max_search,
self.data.get(data_index as usize), self.data.get(data_index as usize),
std::cmp::max(current_capacity_pow2 + 1, DEFAULT_CAPACITY_POW2), Capacity::Pow2(std::cmp::max(
current_capacity_pow2 + 1,
DEFAULT_CAPACITY_POW2,
)),
1 << data_index, 1 << data_index,
Self::elem_size(), Self::elem_size(),
&self.stats.data, &self.stats.data,
@ -564,7 +571,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
/// grow the appropriate piece. Note this takes an immutable ref. /// grow the appropriate piece. Note this takes an immutable ref.
/// The actual grow is set into self.reallocated and applied later on a write lock /// The actual grow is set into self.reallocated and applied later on a write lock
pub fn grow(&self, err: BucketMapError) { pub(crate) fn grow(&self, err: BucketMapError) {
match err { match err {
BucketMapError::DataNoSpace((data_index, current_capacity_pow2)) => { BucketMapError::DataNoSpace((data_index, current_capacity_pow2)) => {
//debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2)); //debug!("GROWING SPACE {:?}", (data_index, current_capacity_pow2));

View File

@ -47,11 +47,15 @@ impl<T: Clone + Copy + Debug> std::fmt::Debug for BucketMap<T> {
} }
} }
/// used to communicate resize necessary and current size.
#[derive(Debug)] #[derive(Debug)]
pub enum BucketMapError { pub enum BucketMapError {
/// (bucket_index, current_capacity_pow2) /// (bucket_index, current_capacity_pow2)
/// Note that this is specific to data buckets
DataNoSpace((u64, u8)), DataNoSpace((u64, u8)),
/// current_capacity_pow2 /// current_capacity_pow2
/// Note that this is specific to index buckets
IndexNoSpace(u8), IndexNoSpace(u8),
} }

View File

@ -49,8 +49,8 @@ pub trait BucketOccupied {
/// This must be a multiple of sizeof(u64) /// This must be a multiple of sizeof(u64)
fn offset_to_first_data() -> usize; fn offset_to_first_data() -> usize;
/// initialize this struct /// initialize this struct
/// `num_elements` is the number of elements allocated in the bucket /// `capacity` is the number of elements allocated in the bucket
fn new(num_elements: usize) -> Self; fn new(capacity: Capacity) -> Self;
/// copying entry. Any in-memory (per-bucket) data structures may need to be copied for this `ix_old`. /// copying entry. Any in-memory (per-bucket) data structures may need to be copied for this `ix_old`.
/// no-op by default /// no-op by default
fn copying_entry( fn copying_entry(
@ -64,11 +64,19 @@ pub trait BucketOccupied {
} }
} }
pub trait BucketCapacity {
fn capacity(&self) -> u64;
fn capacity_pow2(&self) -> u8 {
unimplemented!();
}
}
pub struct BucketStorage<O: BucketOccupied> { pub struct BucketStorage<O: BucketOccupied> {
path: PathBuf, path: PathBuf,
mmap: MmapMut, mmap: MmapMut,
pub cell_size: u64, pub cell_size: u64,
pub capacity_pow2: u8, /// number of cells this bucket can hold
pub capacity: Capacity,
pub count: Arc<AtomicU64>, pub count: Arc<AtomicU64>,
pub stats: Arc<BucketStats>, pub stats: Arc<BucketStats>,
pub max_search: MaxSearch, pub max_search: MaxSearch,
@ -94,12 +102,40 @@ pub(crate) enum IncludeHeader {
NoHeader, NoHeader,
} }
/// 2 common ways of specifying capacity
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum Capacity {
/// 1 << Pow2 produces # elements
Pow2(u8),
/// Actual # elements
Actual(u64),
}
impl BucketCapacity for Capacity {
fn capacity(&self) -> u64 {
match self {
Capacity::Pow2(pow2) => 1 << *pow2,
Capacity::Actual(elements) => *elements,
}
}
fn capacity_pow2(&self) -> u8 {
match self {
Capacity::Pow2(pow2) => *pow2,
Capacity::Actual(_elements) => {
panic!("illegal to ask for pow2 from random capacity");
}
}
}
}
impl<O: BucketOccupied> BucketStorage<O> { impl<O: BucketOccupied> BucketStorage<O> {
/// allocate a bucket of at least `capacity` elements.
/// if capacity can be random, more may be allocated to fill the last page.
pub fn new_with_capacity( pub fn new_with_capacity(
drives: Arc<Vec<PathBuf>>, drives: Arc<Vec<PathBuf>>,
num_elems: u64, num_elems: u64,
elem_size: u64, elem_size: u64,
capacity_pow2: u8, mut capacity: Capacity,
max_search: MaxSearch, max_search: MaxSearch,
stats: Arc<BucketStats>, stats: Arc<BucketStats>,
count: Arc<AtomicU64>, count: Arc<AtomicU64>,
@ -112,20 +148,37 @@ impl<O: BucketOccupied> BucketStorage<O> {
"header size must be a multiple of u64" "header size must be a multiple of u64"
); );
let cell_size = elem_size * num_elems + offset as u64; let cell_size = elem_size * num_elems + offset as u64;
let bytes = (1u64 << capacity_pow2) * cell_size; let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
let (mmap, path) = Self::new_map(&drives, bytes, &stats); let (mmap, path) = Self::new_map(&drives, bytes, &stats);
Self { Self {
path, path,
mmap, mmap,
cell_size, cell_size,
count, count,
capacity_pow2, capacity,
stats, stats,
max_search, max_search,
contents: O::new(1 << capacity_pow2), contents: O::new(capacity),
} }
} }
fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
let mut bytes = capacity.capacity() * cell_size;
if let Capacity::Actual(_) = capacity {
// maybe bump up allocation to fit a page size
const PAGE_SIZE: u64 = 4 * 1024;
let full_page_bytes = bytes / PAGE_SIZE * PAGE_SIZE / cell_size * cell_size;
if full_page_bytes < bytes {
let bytes_new = ((bytes / PAGE_SIZE) + 1) * PAGE_SIZE / cell_size * cell_size;
assert!(bytes_new >= bytes, "allocating less than requested, capacity: {}, bytes: {}, bytes_new: {}, full_page_bytes: {}", capacity.capacity(), bytes, bytes_new, full_page_bytes);
assert_eq!(bytes_new % cell_size, 0);
bytes = bytes_new;
*capacity = Capacity::Actual(bytes / cell_size);
}
}
bytes
}
pub fn max_search(&self) -> u64 { pub fn max_search(&self) -> u64 {
self.max_search as u64 self.max_search as u64
} }
@ -142,7 +195,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
drives, drives,
num_elems, num_elems,
elem_size, elem_size,
DEFAULT_CAPACITY_POW2, Capacity::Pow2(DEFAULT_CAPACITY_POW2),
max_search, max_search,
stats, stats,
count, count,
@ -330,7 +383,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
let old_cap = old_bucket.capacity(); let old_cap = old_bucket.capacity();
let old_map = &old_bucket.mmap; let old_map = &old_bucket.mmap;
let increment = self.capacity_pow2 - old_bucket.capacity_pow2; let increment = self.capacity.capacity_pow2() - old_bucket.capacity.capacity_pow2();
let index_grow = 1 << increment; let index_grow = 1 << increment;
(0..old_cap as usize).for_each(|i| { (0..old_cap as usize).for_each(|i| {
if !old_bucket.is_free(i as u64) { if !old_bucket.is_free(i as u64) {
@ -370,7 +423,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
drives: &Arc<Vec<PathBuf>>, drives: &Arc<Vec<PathBuf>>,
max_search: MaxSearch, max_search: MaxSearch,
bucket: Option<&Self>, bucket: Option<&Self>,
capacity_pow_2: u8, capacity: Capacity,
num_elems: u64, num_elems: u64,
elem_size: u64, elem_size: u64,
stats: &Arc<BucketStats>, stats: &Arc<BucketStats>,
@ -379,7 +432,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
Arc::clone(drives), Arc::clone(drives),
num_elems, num_elems,
elem_size, elem_size,
capacity_pow_2, capacity,
max_search, max_search,
Arc::clone(stats), Arc::clone(stats),
bucket bucket
@ -400,7 +453,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
/// Return the number of cells currently allocated /// Return the number of cells currently allocated
pub fn capacity(&self) -> u64 { pub fn capacity(&self) -> u64 {
1 << self.capacity_pow2 self.capacity.capacity()
} }
} }
@ -458,11 +511,17 @@ mod test {
std::mem::size_of::<u64>() - 1 std::mem::size_of::<u64>() - 1
} }
/// initialize this struct /// initialize this struct
fn new(_num_elements: usize) -> Self { fn new(_num_elements: Capacity) -> Self {
Self {} Self {}
} }
} }
impl BucketCapacity for BucketBadHeader {
fn capacity(&self) -> u64 {
unimplemented!();
}
}
#[test] #[test]
#[should_panic(expected = "assertion failed: `(left == right)`")] #[should_panic(expected = "assertion failed: `(left == right)`")]
fn test_header_size() { fn test_header_size() {
@ -470,7 +529,7 @@ mod test {
Arc::default(), Arc::default(),
0, 0,
0, 0,
0, Capacity::Pow2(0),
0, 0,
Arc::default(), Arc::default(),
Arc::default(), Arc::default(),

View File

@ -2,7 +2,7 @@
use { use {
crate::{ crate::{
bucket_storage::{BucketOccupied, BucketStorage, IncludeHeader}, bucket_storage::{BucketCapacity, BucketOccupied, BucketStorage, Capacity, IncludeHeader},
RefCount, RefCount,
}, },
bv::BitVec, bv::BitVec,
@ -33,9 +33,9 @@ impl BucketOccupied for BucketWithBitVec {
// no header, nothing stored in data stream // no header, nothing stored in data stream
0 0
} }
fn new(num_elements: usize) -> Self { fn new(capacity: Capacity) -> Self {
Self { Self {
occupied: BitVec::new_fill(false, num_elements as u64), occupied: BitVec::new_fill(false, capacity.capacity()),
} }
} }
} }
@ -64,9 +64,10 @@ impl<T: Copy> BucketOccupied for IndexBucketUsingRefCountBits<T> {
matches!(entry.get_slot_count_enum(), OccupiedEnum::Free) matches!(entry.get_slot_count_enum(), OccupiedEnum::Free)
} }
fn offset_to_first_data() -> usize { fn offset_to_first_data() -> usize {
// no header, nothing stored in data stream
0 0
} }
fn new(_num_elements: usize) -> Self { fn new(_capacity: Capacity) -> Self {
Self { Self {
_phantom: PhantomData, _phantom: PhantomData,
} }
@ -170,7 +171,8 @@ impl MultipleSlots {
/// This function maps the original data location into an index in the current bucket storage. /// This function maps the original data location into an index in the current bucket storage.
/// This is coupled with how we resize bucket storages. /// This is coupled with how we resize bucket storages.
pub(crate) fn data_loc(&self, storage: &BucketStorage<DataBucket>) -> u64 { pub(crate) fn data_loc(&self, storage: &BucketStorage<DataBucket>) -> u64 {
self.storage_offset() << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2()) self.storage_offset()
<< (storage.capacity.capacity_pow2() - self.storage_capacity_when_created_pow2())
} }
} }