disk index: store ref_count in data file (#30974)
This commit is contained in:
parent
c4648f5a26
commit
9fb22bc0be
|
@ -300,10 +300,8 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
|
|||
elem_allocate.init(&mut self.index, key);
|
||||
elem_allocate
|
||||
};
|
||||
|
||||
elem.set_ref_count(&mut self.index, ref_count);
|
||||
let num_slots = data_len as u64;
|
||||
if num_slots <= 1 {
|
||||
if num_slots <= 1 && ref_count == 1 {
|
||||
// new data stored should be stored in IndexEntry and NOT in data file
|
||||
// new data len is 0 or 1
|
||||
if let OccupiedEnum::MultipleSlots(multiple_slots) =
|
||||
|
@ -335,6 +333,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
|
|||
|
||||
if best_fit_bucket == bucket_ix as u64 {
|
||||
// in place update in same data file
|
||||
MultipleSlots::set_ref_count(current_bucket, elem_loc, ref_count);
|
||||
|
||||
// write data
|
||||
assert!(!current_bucket.is_free(elem_loc));
|
||||
let slice: &mut [T] = current_bucket.get_mut_cell_slice(
|
||||
elem_loc,
|
||||
|
@ -358,7 +359,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
|
|||
}
|
||||
|
||||
// need to move the allocation to a best fit spot
|
||||
let best_bucket = &self.data[best_fit_bucket as usize];
|
||||
let best_bucket = &mut self.data[best_fit_bucket as usize];
|
||||
let cap_power = best_bucket.contents.capacity_pow2();
|
||||
let cap = best_bucket.capacity();
|
||||
let pos = thread_rng().gen_range(0, cap);
|
||||
|
@ -379,15 +380,17 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
|
|||
multiple_slots
|
||||
.set_storage_capacity_when_created_pow2(best_bucket.contents.capacity_pow2());
|
||||
multiple_slots.set_num_slots(num_slots);
|
||||
MultipleSlots::set_ref_count(best_bucket, ix, ref_count);
|
||||
|
||||
elem.set_slot_count_enum_value(
|
||||
&mut self.index,
|
||||
OccupiedEnum::MultipleSlots(&multiple_slots),
|
||||
);
|
||||
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
|
||||
let best_bucket = &mut self.data[best_fit_bucket as usize];
|
||||
best_bucket.occupy(ix, false).unwrap();
|
||||
if num_slots > 0 {
|
||||
// copy slotlist into the data bucket
|
||||
let best_bucket = &mut self.data[best_fit_bucket as usize];
|
||||
best_bucket.occupy(ix, false).unwrap();
|
||||
let slice =
|
||||
best_bucket.get_mut_cell_slice(ix, num_slots, IncludeHeader::NoHeader);
|
||||
slice.iter_mut().zip(data).for_each(|(dest, src)| {
|
||||
|
|
|
@ -264,6 +264,18 @@ impl<O: BucketOccupied> BucketStorage<O> {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_header<T>(&self, ix: u64) -> &T {
|
||||
let slice = self.get_cell_slice::<T>(ix, 1, IncludeHeader::Header);
|
||||
// SAFETY: `get_cell_slice` ensures there's at least one element in the slice
|
||||
unsafe { slice.get_unchecked(0) }
|
||||
}
|
||||
|
||||
pub(crate) fn get_header_mut<T>(&mut self, ix: u64) -> &mut T {
|
||||
let slice = self.get_mut_cell_slice::<T>(ix, 1, IncludeHeader::Header);
|
||||
// SAFETY: `get_mut_cell_slice` ensures there's at least one element in the slice
|
||||
unsafe { slice.get_unchecked_mut(0) }
|
||||
}
|
||||
|
||||
pub(crate) fn get<T>(&self, ix: u64) -> &T {
|
||||
let slice = self.get_cell_slice::<T>(ix, 1, IncludeHeader::NoHeader);
|
||||
// SAFETY: `get_cell_slice` ensures there's at least one element in the slice
|
||||
|
@ -276,14 +288,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
|
|||
unsafe { slice.get_unchecked_mut(0) }
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_mut_from_parts<T>(item_slice: &mut [u8]) -> &mut T {
|
||||
debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
|
||||
let item = item_slice.as_mut_ptr() as *mut T;
|
||||
unsafe { &mut *item }
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn get_from_parts<T>(item_slice: &[u8]) -> &T {
|
||||
debug_assert!(std::mem::size_of::<T>() <= item_slice.len());
|
||||
let item = item_slice.as_ptr() as *const T;
|
||||
|
|
|
@ -12,13 +12,12 @@ use {
|
|||
std::{fmt::Debug, marker::PhantomData},
|
||||
};
|
||||
|
||||
/// allocated in `contents` in a BucketStorage
|
||||
pub struct BucketWithBitVec {
|
||||
occupied: BitVec,
|
||||
capacity_pow2: Capacity,
|
||||
}
|
||||
/// in use/occupied
|
||||
const OCCUPIED_OCCUPIED: u8 = 1;
|
||||
/// free, ie. not occupied
|
||||
const OCCUPIED_FREE: u8 = 0;
|
||||
|
||||
impl BucketCapacity for BucketWithBitVec {
|
||||
impl BucketCapacity for BucketWithHeader {
|
||||
fn capacity(&self) -> u64 {
|
||||
self.capacity_pow2.capacity()
|
||||
}
|
||||
|
@ -27,26 +26,47 @@ impl BucketCapacity for BucketWithBitVec {
|
|||
}
|
||||
}
|
||||
|
||||
impl BucketOccupied for BucketWithBitVec {
|
||||
/// header for elements in a bucket
|
||||
/// needs to be multiple of size_of::<u64>()
|
||||
#[derive(Copy, Clone)]
|
||||
#[repr(C)]
|
||||
struct DataBucketRefCountOccupiedHeader {
|
||||
/// stores `ref_count` and
|
||||
/// occupied = OCCUPIED_OCCUPIED or OCCUPIED_FREE
|
||||
packed_ref_count: PackedRefCount,
|
||||
}
|
||||
|
||||
/// allocated in `contents` in a BucketStorage
|
||||
#[derive(Copy, Clone)]
|
||||
#[repr(C)]
|
||||
pub struct BucketWithHeader {
|
||||
capacity_pow2: Capacity,
|
||||
}
|
||||
|
||||
impl BucketOccupied for BucketWithHeader {
|
||||
fn occupy(&mut self, element: &mut [u8], ix: usize) {
|
||||
assert!(self.is_free(element, ix));
|
||||
self.occupied.set(ix as u64, true);
|
||||
let entry: &mut DataBucketRefCountOccupiedHeader =
|
||||
BucketStorage::<BucketWithHeader>::get_mut_from_parts(element);
|
||||
entry.packed_ref_count.set_occupied(OCCUPIED_OCCUPIED);
|
||||
}
|
||||
fn free(&mut self, element: &mut [u8], ix: usize) {
|
||||
assert!(!self.is_free(element, ix));
|
||||
self.occupied.set(ix as u64, false);
|
||||
let entry: &mut DataBucketRefCountOccupiedHeader =
|
||||
BucketStorage::<BucketWithHeader>::get_mut_from_parts(element);
|
||||
entry.packed_ref_count.set_occupied(OCCUPIED_FREE);
|
||||
}
|
||||
fn is_free(&self, _element: &[u8], ix: usize) -> bool {
|
||||
!self.occupied.get(ix as u64)
|
||||
fn is_free(&self, element: &[u8], _ix: usize) -> bool {
|
||||
let entry: &DataBucketRefCountOccupiedHeader =
|
||||
BucketStorage::<BucketWithHeader>::get_from_parts(element);
|
||||
entry.packed_ref_count.occupied() == OCCUPIED_FREE
|
||||
}
|
||||
fn offset_to_first_data() -> usize {
|
||||
// no header, nothing stored in data stream
|
||||
0
|
||||
std::mem::size_of::<DataBucketRefCountOccupiedHeader>()
|
||||
}
|
||||
fn new(capacity: Capacity) -> Self {
|
||||
assert!(matches!(capacity, Capacity::Pow2(_)));
|
||||
Self {
|
||||
occupied: BitVec::new_fill(false, capacity.capacity()),
|
||||
capacity_pow2: capacity,
|
||||
}
|
||||
}
|
||||
|
@ -131,7 +151,7 @@ impl<T> BucketCapacity for IndexBucketUsingBitVecBits<T> {
|
|||
}
|
||||
}
|
||||
|
||||
pub type DataBucket = BucketWithBitVec;
|
||||
pub type DataBucket = BucketWithHeader;
|
||||
pub type IndexBucket<T> = IndexBucketUsingBitVecBits<T>;
|
||||
|
||||
/// contains the index of an entry in the index bucket.
|
||||
|
@ -147,7 +167,6 @@ pub struct IndexEntryPlaceInBucket<T: 'static> {
|
|||
/// stored in the index bucket
|
||||
pub struct IndexEntry<T: Clone + Copy> {
|
||||
pub(crate) key: Pubkey, // can this be smaller if we have reduced the keys into buckets already?
|
||||
packed_ref_count: PackedRefCount,
|
||||
/// depends on the contents of ref_count.slot_count_enum
|
||||
contents: SingleElementOrMultipleSlots<T>,
|
||||
}
|
||||
|
@ -160,8 +179,8 @@ pub(crate) const MAX_LEGAL_REFCOUNT: RefCount = RefCount::MAX >> 1;
|
|||
#[repr(C)]
|
||||
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
|
||||
pub(crate) struct PackedRefCount {
|
||||
/// currently unused
|
||||
pub(crate) unused: B1,
|
||||
/// whether this entry in the data file is occupied or not
|
||||
pub(crate) occupied: B1,
|
||||
/// ref_count of this entry. We don't need any where near 63 bits for this value
|
||||
pub(crate) ref_count: B63,
|
||||
}
|
||||
|
@ -231,6 +250,26 @@ impl MultipleSlots {
|
|||
self.storage_offset()
|
||||
<< (storage.contents.capacity_pow2() - self.storage_capacity_when_created_pow2())
|
||||
}
|
||||
|
||||
/// ref_count is stored in the header per cell, in `packed_ref_count`
|
||||
pub fn set_ref_count(
|
||||
data_bucket: &mut BucketStorage<DataBucket>,
|
||||
data_ix: u64,
|
||||
ref_count: RefCount,
|
||||
) {
|
||||
data_bucket
|
||||
.get_header_mut::<DataBucketRefCountOccupiedHeader>(data_ix)
|
||||
.packed_ref_count
|
||||
.set_ref_count(ref_count);
|
||||
}
|
||||
|
||||
/// ref_count is stored in the header per cell, in `packed_ref_count`
|
||||
pub fn ref_count(data_bucket: &BucketStorage<DataBucket>, data_ix: u64) -> RefCount {
|
||||
data_bucket
|
||||
.get_header::<DataBucketRefCountOccupiedHeader>(data_ix)
|
||||
.packed_ref_count
|
||||
.ref_count()
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
|
@ -343,45 +382,41 @@ impl<T: Copy + 'static> IndexEntryPlaceInBucket<T> {
|
|||
index_entry.key = *pubkey;
|
||||
}
|
||||
|
||||
fn ref_count(&self, index_bucket: &BucketStorage<IndexBucket<T>>) -> RefCount {
|
||||
let index_entry = index_bucket.get::<IndexEntry<T>>(self.ix);
|
||||
index_entry.packed_ref_count.ref_count()
|
||||
}
|
||||
|
||||
pub(crate) fn read_value<'a>(
|
||||
&self,
|
||||
index_bucket: &'a BucketStorage<IndexBucket<T>>,
|
||||
data_buckets: &'a [BucketStorage<DataBucket>],
|
||||
) -> (&'a [T], RefCount) {
|
||||
(
|
||||
match self.get_slot_count_enum(index_bucket) {
|
||||
OccupiedEnum::ZeroSlots => {
|
||||
// num_slots is 0. This means we don't have an actual allocation.
|
||||
&[]
|
||||
}
|
||||
OccupiedEnum::OneSlotInIndex(single_element) => {
|
||||
// only element is stored in the index entry
|
||||
// Note that the lifetime comes from `index_bucket` here.
|
||||
std::slice::from_ref(single_element)
|
||||
}
|
||||
OccupiedEnum::MultipleSlots(multiple_slots) => {
|
||||
// data is in data file, so return a ref to that data
|
||||
let data_bucket_ix = multiple_slots.data_bucket_ix();
|
||||
let data_bucket = &data_buckets[data_bucket_ix as usize];
|
||||
let loc = multiple_slots.data_loc(data_bucket);
|
||||
assert!(!data_bucket.is_free(loc));
|
||||
data_bucket.get_cell_slice::<T>(
|
||||
loc,
|
||||
multiple_slots.num_slots,
|
||||
IncludeHeader::NoHeader,
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
panic!("trying to read data from a free entry");
|
||||
}
|
||||
},
|
||||
self.ref_count(index_bucket),
|
||||
)
|
||||
let mut ref_count = 1;
|
||||
let slot_list = match self.get_slot_count_enum(index_bucket) {
|
||||
OccupiedEnum::ZeroSlots => {
|
||||
// num_slots is 0. This means empty slot list and ref_count=1
|
||||
&[]
|
||||
}
|
||||
OccupiedEnum::OneSlotInIndex(single_element) => {
|
||||
// only element is stored in the index entry
|
||||
std::slice::from_ref(single_element)
|
||||
}
|
||||
OccupiedEnum::MultipleSlots(multiple_slots) => {
|
||||
// slot list and ref_count are in data file
|
||||
let data_bucket_ix =
|
||||
MultipleSlots::data_bucket_from_num_slots(multiple_slots.num_slots);
|
||||
let data_bucket = &data_buckets[data_bucket_ix as usize];
|
||||
let loc = multiple_slots.data_loc(data_bucket);
|
||||
assert!(!data_bucket.is_free(loc));
|
||||
|
||||
ref_count = MultipleSlots::ref_count(data_bucket, loc);
|
||||
data_bucket.get_cell_slice::<T>(
|
||||
loc,
|
||||
multiple_slots.num_slots,
|
||||
IncludeHeader::NoHeader,
|
||||
)
|
||||
}
|
||||
_ => {
|
||||
panic!("trying to read data from a free entry");
|
||||
}
|
||||
};
|
||||
(slot_list, ref_count)
|
||||
}
|
||||
|
||||
pub fn new(ix: u64) -> Self {
|
||||
|
@ -395,18 +430,6 @@ impl<T: Copy + 'static> IndexEntryPlaceInBucket<T> {
|
|||
let entry: &IndexEntry<T> = index_bucket.get(self.ix);
|
||||
&entry.key
|
||||
}
|
||||
|
||||
pub fn set_ref_count(
|
||||
&self,
|
||||
index_bucket: &mut BucketStorage<IndexBucket<T>>,
|
||||
ref_count: RefCount,
|
||||
) {
|
||||
let index_entry = index_bucket.get_mut::<IndexEntry<T>>(self.ix);
|
||||
index_entry
|
||||
.packed_ref_count
|
||||
.set_ref_count_checked(ref_count)
|
||||
.expect("ref count must fit into 62 bits!");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -436,7 +459,7 @@ mod tests {
|
|||
#[test]
|
||||
fn test_size() {
|
||||
assert_eq!(std::mem::size_of::<PackedStorage>(), 1 + 7);
|
||||
assert_eq!(std::mem::size_of::<IndexEntry<u64>>(), 32 + 8 + 8 + 8);
|
||||
assert_eq!(std::mem::size_of::<IndexEntry<u64>>(), 32 + 8 + 8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue