disk index: store single slot list in index entry (#31002)

This commit is contained in:
Jeff Washington (jwash) 2023-04-03 08:51:59 -05:00 committed by GitHub
parent 07f4789257
commit 8051aea88e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 209 additions and 118 deletions

View File

@ -6,6 +6,7 @@ use {
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2}, bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{ index_entry::{
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots, DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
OccupiedEnum,
}, },
MaxSearch, RefCount, MaxSearch, RefCount,
}, },
@ -78,8 +79,14 @@ impl<I: BucketOccupied, D: BucketOccupied> Reallocated<I, D> {
} }
} }
/// when updating the index, this keeps track of the previous data entry which will need to be freed
struct DataFileEntryToFree {
bucket_ix: usize,
location: u64,
}
// >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data // >= 2 instances of BucketStorage per 'bucket' in the bucket map. 1 for index, >= 1 for data
pub struct Bucket<T: 'static> { pub struct Bucket<T: Copy + 'static> {
drives: Arc<Vec<PathBuf>>, drives: Arc<Vec<PathBuf>>,
//index //index
pub index: BucketStorage<IndexBucket<T>>, pub index: BucketStorage<IndexBucket<T>>,
@ -263,7 +270,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn try_write( pub fn try_write(
&mut self, &mut self,
key: &Pubkey, key: &Pubkey,
data: impl Iterator<Item = &'b T>, mut data: impl Iterator<Item = &'b T>,
data_len: usize, data_len: usize,
ref_count: RefCount, ref_count: RefCount,
) -> Result<(), BucketMapError> { ) -> Result<(), BucketMapError> {
@ -287,29 +294,63 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}; };
elem.set_ref_count(&mut self.index, ref_count); elem.set_ref_count(&mut self.index, ref_count);
let current_multiple_slots = elem.get_multiple_slots(&self.index);
let bucket_ix = current_multiple_slots.data_bucket_ix();
let num_slots = data_len as u64; let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && current_multiple_slots.num_slots() > 0 { if num_slots <= 1 {
let current_bucket = &mut self.data[bucket_ix as usize]; // new data stored should be stored in IndexEntry and NOT in data file
// in place update // new data len is 0 or 1
let elem_loc = current_multiple_slots.data_loc(current_bucket); if let OccupiedEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
let bucket_ix = multiple_slots.data_bucket_ix() as usize;
// free the entry in the data bucket the data was previously stored in
let loc = multiple_slots.data_loc(&self.data[bucket_ix]);
self.data[bucket_ix].free(loc);
}
elem.set_slot_count_enum_value(
&mut self.index,
if let Some(single_element) = data.next() {
OccupiedEnum::OneSlotInIndex(single_element)
} else {
OccupiedEnum::ZeroSlots
},
);
return Ok(());
}
// storing the slot list requires using the data file
let mut old_data_entry_to_free = None;
// see if old elements were in a data file
if let Some(multiple_slots) = elem.get_multiple_slots_mut(&mut self.index) {
let bucket_ix = multiple_slots.data_bucket_ix() as usize;
let current_bucket = &mut self.data[bucket_ix];
let elem_loc = multiple_slots.data_loc(current_bucket);
if best_fit_bucket == bucket_ix as u64 {
// in place update in same data file
assert!(!current_bucket.is_free(elem_loc)); assert!(!current_bucket.is_free(elem_loc));
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64); let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
let current_multiple_slots = elem.get_multiple_slots_mut(&mut self.index); multiple_slots.set_num_slots(num_slots);
current_multiple_slots.set_num_slots(num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| { slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src; *dest = *src;
}); });
Ok(()) return Ok(());
} else { }
// not updating in place, so remember old entry to free
// Wait to free until we make sure we don't have to resize the best_fit_bucket
old_data_entry_to_free = Some(DataFileEntryToFree {
bucket_ix,
location: elem_loc,
});
}
// need to move the allocation to a best fit spot // need to move the allocation to a best fit spot
let best_bucket = &self.data[best_fit_bucket as usize]; let best_bucket = &self.data[best_fit_bucket as usize];
let current_bucket = &self.data[bucket_ix as usize];
let cap_power = best_bucket.capacity_pow2; let cap_power = best_bucket.capacity_pow2;
let cap = best_bucket.capacity(); let cap = best_bucket.capacity();
let pos = thread_rng().gen_range(0, cap); let pos = thread_rng().gen_range(0, cap);
let mut success = false;
// max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere. // max search is increased here by a lot for this search. The idea is that we just have to find an empty bucket somewhere.
// We don't mind waiting on a new write (by searching longer). Writing is done in the background only. // We don't mind waiting on a new write (by searching longer). Writing is done in the background only.
// Wasting space by doubling the bucket size is worse behavior. We expect more // Wasting space by doubling the bucket size is worse behavior. We expect more
@ -321,19 +362,17 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
for i in pos..pos + (max_search * 10).min(cap) { for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap; let ix = i % cap;
if best_bucket.is_free(ix) { if best_bucket.is_free(ix) {
let elem_loc = current_multiple_slots.data_loc(current_bucket); let mut multiple_slots = MultipleSlots::default();
let old_slots = current_multiple_slots.num_slots();
let multiple_slots = elem.get_multiple_slots_mut(&mut self.index);
multiple_slots.set_storage_offset(ix); multiple_slots.set_storage_offset(ix);
multiple_slots multiple_slots.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
multiple_slots.set_num_slots(num_slots); multiple_slots.set_num_slots(num_slots);
if old_slots > 0 { elem.set_slot_count_enum_value(
let current_bucket = &mut self.data[bucket_ix as usize]; &mut self.index,
current_bucket.free(elem_loc); OccupiedEnum::MultipleSlots(&multiple_slots),
} );
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid ); //debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
if num_slots > 0 { if num_slots > 0 {
// copy slotlist into the data bucket
let best_bucket = &mut self.data[best_fit_bucket as usize]; let best_bucket = &mut self.data[best_fit_bucket as usize];
best_bucket.occupy(ix, false).unwrap(); best_bucket.occupy(ix, false).unwrap();
let slice = best_bucket.get_mut_cell_slice(ix, num_slots); let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
@ -341,17 +380,29 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
*dest = *src; *dest = *src;
}); });
} }
return Ok(()); success = true;
break;
} }
} }
Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power))) if !success {
return Err(BucketMapError::DataNoSpace((best_fit_bucket, cap_power)));
} }
if let Some(DataFileEntryToFree {
bucket_ix,
location,
}) = old_data_entry_to_free
{
// free the entry in the data bucket the data was previously stored in
self.data[bucket_ix].free(location);
}
Ok(())
} }
pub fn delete_key(&mut self, key: &Pubkey) { pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) { if let Some((elem, elem_ix)) = self.find_index_entry(key) {
let multiple_slots = elem.get_multiple_slots_mut(&mut self.index); if let OccupiedEnum::MultipleSlots(multiple_slots) =
if multiple_slots.num_slots() > 0 { elem.get_slot_count_enum(&self.index)
{
let ix = multiple_slots.data_bucket_ix() as usize; let ix = multiple_slots.data_bucket_ix() as usize;
let data_bucket = &self.data[ix]; let data_bucket = &self.data[ix];
let loc = multiple_slots.data_loc(data_bucket); let loc = multiple_slots.data_loc(data_bucket);

View File

@ -190,13 +190,13 @@ impl<O: BucketOccupied> BucketStorage<O> {
unsafe { slice.get_unchecked_mut(0) } unsafe { slice.get_unchecked_mut(0) }
} }
pub(crate) fn get_mut_from_parts<T: Sized>(item_slice: &mut [u8]) -> &mut T { pub(crate) fn get_mut_from_parts<T>(item_slice: &mut [u8]) -> &mut T {
debug_assert!(std::mem::size_of::<T>() <= item_slice.len()); debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
let item = item_slice.as_mut_ptr() as *mut T; let item = item_slice.as_mut_ptr() as *mut T;
unsafe { &mut *item } unsafe { &mut *item }
} }
pub(crate) fn get_from_parts<T: Sized>(item_slice: &[u8]) -> &T { pub(crate) fn get_from_parts<T>(item_slice: &[u8]) -> &T {
debug_assert!(std::mem::size_of::<T>() <= item_slice.len()); debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
let item = item_slice.as_ptr() as *const T; let item = item_slice.as_ptr() as *const T;
unsafe { &*item } unsafe { &*item }

View File

@ -44,12 +44,12 @@ pub struct IndexBucketUsingRefCountBits<T: 'static> {
_phantom: PhantomData<&'static T>, _phantom: PhantomData<&'static T>,
} }
impl<T: 'static> BucketOccupied for IndexBucketUsingRefCountBits<T> { impl<T: Copy> BucketOccupied for IndexBucketUsingRefCountBits<T> {
fn occupy(&mut self, element: &mut [u8], ix: usize) { fn occupy(&mut self, element: &mut [u8], ix: usize) {
assert!(self.is_free(element, ix)); assert!(self.is_free(element, ix));
let entry: &mut IndexEntry<T> = let entry: &mut IndexEntry<T> =
BucketStorage::<IndexBucketUsingRefCountBits<T>>::get_mut_from_parts(element); BucketStorage::<IndexBucketUsingRefCountBits<T>>::get_mut_from_parts(element);
entry.set_slot_count_enum_value(OccupiedEnum::Occupied); entry.set_slot_count_enum_value(OccupiedEnum::ZeroSlots);
} }
fn free(&mut self, element: &mut [u8], ix: usize) { fn free(&mut self, element: &mut [u8], ix: usize) {
assert!(!self.is_free(element, ix)); assert!(!self.is_free(element, ix));
@ -86,11 +86,11 @@ pub struct IndexEntryPlaceInBucket<T: 'static> {
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
/// one instance of this per item in the index /// one instance of this per item in the index
/// stored in the index bucket /// stored in the index bucket
pub struct IndexEntry<T: 'static> { pub struct IndexEntry<T: Clone + Copy> {
pub(crate) key: Pubkey, // can this be smaller if we have reduced the keys into buckets already? pub(crate) key: Pubkey, // can this be smaller if we have reduced the keys into buckets already?
packed_ref_count: PackedRefCount, packed_ref_count: PackedRefCount,
multiple_slots: MultipleSlots, /// depends on the contents of ref_count.slot_count_enum
_phantom: PhantomData<&'static T>, contents: SingleElementOrMultipleSlots<T>,
} }
/// 62 bits available for ref count /// 62 bits available for ref count
@ -173,32 +173,66 @@ impl MultipleSlots {
} }
} }
#[repr(u8)] #[repr(C)]
#[derive(Debug, Eq, PartialEq)] #[derive(Copy, Clone)]
pub(crate) enum OccupiedEnum { pub(crate) union SingleElementOrMultipleSlots<T: Clone + Copy> {
/// this spot is free (ie. not occupied) /// the slot list contains a single element. No need for an entry in the data file.
Free = 0, /// The element itself is stored in place in the index entry
/// this spot is occupied pub(crate) single_element: T,
Occupied = 1, /// the slot list ocntains more than one element. This contains the reference to the data file.
pub(crate) multiple_slots: MultipleSlots,
} }
impl<T: 'static> IndexEntry<T> { #[repr(u8)]
/// enum value stored in 2 spare bits taken from ref_count #[derive(Debug, Eq, PartialEq)]
fn get_slot_count_enum(&self) -> OccupiedEnum { pub(crate) enum OccupiedEnum<'a, T> {
/// this spot is not occupied.
/// ALL other enum values ARE occupied.
Free = 0,
/// zero slots in the slot list
ZeroSlots = 1,
/// one slot in the slot list, it is stored in the index
OneSlotInIndex(&'a T) = 2,
/// data is stored in data file
MultipleSlots(&'a MultipleSlots) = 3,
}
impl<T: Copy> IndexEntry<T> {
pub(crate) fn get_slot_count_enum(&self) -> OccupiedEnum<'_, T> {
unsafe {
match self.packed_ref_count.slot_count_enum() { match self.packed_ref_count.slot_count_enum() {
0 => OccupiedEnum::Free, 0 => OccupiedEnum::Free,
1 => OccupiedEnum::Occupied, 1 => OccupiedEnum::ZeroSlots,
2 => OccupiedEnum::OneSlotInIndex(&self.contents.single_element),
3 => OccupiedEnum::MultipleSlots(&self.contents.multiple_slots),
_ => { _ => {
panic!("unexpected value"); panic!("unexpected value");
} }
} }
} }
}
/// enum value stored in 2 spare bits taken from ref_count pub(crate) fn get_multiple_slots_mut(&mut self) -> Option<&mut MultipleSlots> {
fn set_slot_count_enum_value(&mut self, value: OccupiedEnum) { unsafe {
match self.packed_ref_count.slot_count_enum() {
3 => Some(&mut self.contents.multiple_slots),
_ => None,
}
}
}
pub(crate) fn set_slot_count_enum_value<'a>(&'a mut self, value: OccupiedEnum<'a, T>) {
self.packed_ref_count.set_slot_count_enum(match value { self.packed_ref_count.set_slot_count_enum(match value {
OccupiedEnum::Free => 0, OccupiedEnum::Free => 0,
OccupiedEnum::Occupied => 1, OccupiedEnum::ZeroSlots => 1,
OccupiedEnum::OneSlotInIndex(single_element) => {
self.contents.single_element = *single_element;
2
}
OccupiedEnum::MultipleSlots(multiple_slots) => {
self.contents.multiple_slots = *multiple_slots;
3
}
}); });
} }
} }
@ -212,42 +246,36 @@ struct PackedStorage {
offset: B56, offset: B56,
} }
impl<T: 'static> IndexEntryPlaceInBucket<T> { impl<T: Copy> IndexEntryPlaceInBucket<T> {
pub fn init(&self, index_bucket: &mut BucketStorage<IndexBucket<T>>, pubkey: &Pubkey) { pub fn init(&self, index_bucket: &mut BucketStorage<IndexBucket<T>>, pubkey: &Pubkey) {
self.set_slot_count_enum_value(index_bucket, OccupiedEnum::ZeroSlots);
let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix); let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix);
index_entry.key = *pubkey; index_entry.key = *pubkey;
index_entry.packed_ref_count.set_ref_count(0); index_entry.packed_ref_count.set_ref_count(0);
index_entry.multiple_slots = MultipleSlots::default();
}
pub(crate) fn get_multiple_slots<'a>(
&self,
index_bucket: &'a BucketStorage<IndexBucket<T>>,
) -> &'a MultipleSlots {
&index_bucket.get::<IndexEntry<T>>(self.ix).multiple_slots
} }
/// return Some(MultipleSlots) if this item's data is stored in the data file
pub(crate) fn get_multiple_slots_mut<'a>( pub(crate) fn get_multiple_slots_mut<'a>(
&self, &self,
index_bucket: &'a mut BucketStorage<IndexBucket<T>>, index_bucket: &'a mut BucketStorage<IndexBucket<T>>,
) -> &'a mut MultipleSlots { ) -> Option<&'a mut MultipleSlots> {
&mut index_bucket let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix);
.get_mut::<IndexEntry<T>>(self.ix) index_entry.get_multiple_slots_mut()
.multiple_slots
} }
pub(crate) fn get_slot_count_enum( pub(crate) fn get_slot_count_enum<'a>(
&self, &self,
index_bucket: &BucketStorage<IndexBucket<T>>, index_bucket: &'a BucketStorage<IndexBucket<T>>,
) -> OccupiedEnum { ) -> OccupiedEnum<'a, T> {
let index_entry = index_bucket.get::<IndexEntry<T>>(self.ix); let index_entry = index_bucket.get::<IndexEntry<T>>(self.ix);
index_entry.get_slot_count_enum() index_entry.get_slot_count_enum()
} }
pub(crate) fn set_slot_count_enum_value( /// make this index entry reflect `value`
pub(crate) fn set_slot_count_enum_value<'a>(
&self, &self,
index_bucket: &mut BucketStorage<IndexBucket<T>>, index_bucket: &'a mut BucketStorage<IndexBucket<T>>,
value: OccupiedEnum, value: OccupiedEnum<'a, T>,
) { ) {
let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix); let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix);
index_entry.set_slot_count_enum_value(value); index_entry.set_slot_count_enum_value(value);
@ -260,22 +288,34 @@ impl<T: 'static> IndexEntryPlaceInBucket<T> {
pub fn read_value<'a>( pub fn read_value<'a>(
&self, &self,
index_bucket: &BucketStorage<IndexBucket<T>>, index_bucket: &'a BucketStorage<IndexBucket<T>>,
data_buckets: &'a [BucketStorage<DataBucket>], data_buckets: &'a [BucketStorage<DataBucket>],
) -> Option<(&'a [T], RefCount)> { ) -> Option<(&'a [T], RefCount)> {
let multiple_slots = self.get_multiple_slots(index_bucket); Some((
let num_slots = multiple_slots.num_slots(); match self.get_slot_count_enum(index_bucket) {
let slice = if num_slots > 0 { OccupiedEnum::ZeroSlots => {
// num_slots is 0. This means we don't have an actual allocation.
&[]
}
OccupiedEnum::OneSlotInIndex(single_element) => {
// only element is stored in the index entry
// Note that the lifetime comes from `index_bucket` here.
std::slice::from_ref(single_element)
}
OccupiedEnum::MultipleSlots(multiple_slots) => {
// data is in data file, so return a ref to that data
let data_bucket_ix = multiple_slots.data_bucket_ix(); let data_bucket_ix = multiple_slots.data_bucket_ix();
let data_bucket = &data_buckets[data_bucket_ix as usize]; let data_bucket = &data_buckets[data_bucket_ix as usize];
let loc = multiple_slots.data_loc(data_bucket); let loc = multiple_slots.data_loc(data_bucket);
assert!(!data_bucket.is_free(loc)); assert!(!data_bucket.is_free(loc));
data_bucket.get_cell_slice(loc, num_slots) data_bucket.get_cell_slice::<T>(loc, multiple_slots.num_slots)
} else { }
// num_slots is 0. This means we don't have an actual allocation. _ => {
&[] panic!("trying to read data from a free entry");
}; }
Some((slice, self.ref_count(index_bucket))) },
self.ref_count(index_bucket),
))
} }
pub fn new(ix: u64) -> Self { pub fn new(ix: u64) -> Self {