disk index: introduce IndexEntryPlaceInBucket (#30944)

* refactor: move data_bucket_from_num_slots

* disk index: introduce IndexEntryPlaceInBucket

* remove <T> from IndexEntryPlaceInBucket

* fix and comment index_bucket_for_testing
This commit is contained in:
Jeff Washington (jwash) 2023-03-29 15:41:10 -05:00 committed by GitHub
parent d542496d10
commit 4b7cfa23c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 185 additions and 92 deletions

View File

@ -4,7 +4,7 @@ use {
bucket_map::BucketMapError,
bucket_stats::BucketMapStats,
bucket_storage::{BucketOccupied, BucketStorage, DEFAULT_CAPACITY_POW2},
index_entry::{DataBucket, IndexBucket, IndexEntry},
index_entry::{DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket},
MaxSearch, RefCount,
},
rand::{thread_rng, Rng},
@ -142,13 +142,13 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
if self.index.is_free(ii) {
continue;
}
let ix: &IndexEntry = self.index.get(ii);
let key = ix.key;
if range.map(|r| r.contains(&key)).unwrap_or(true) {
let val = ix.read_value(self);
let ix = IndexEntryPlaceInBucket::new(ii);
let key = ix.key(&self.index);
if range.map(|r| r.contains(key)).unwrap_or(true) {
let val = ix.read_value(&self.index, &self.data);
result.push(BucketItem {
pubkey: key,
ref_count: ix.ref_count(),
pubkey: *key,
ref_count: ix.ref_count(&self.index),
slot_list: val.map(|(v, _ref_count)| v.to_vec()).unwrap_or_default(),
});
}
@ -156,7 +156,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
result
}
pub fn find_index_entry(&self, key: &Pubkey) -> Option<(&IndexEntry, u64)> {
pub fn find_index_entry(&self, key: &Pubkey) -> Option<(IndexEntryPlaceInBucket, u64)> {
Self::bucket_find_index_entry(&self.index, key, self.random)
}
@ -164,11 +164,11 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
/// if entry exists, return the entry along with the index of the existing entry
/// if entry does not exist, return just the index of an empty entry appropriate for this key
/// returns (existing entry, index of the found or empty entry)
fn find_index_entry_mut<'a>(
index: &'a mut BucketStorage<IndexBucket>,
fn find_index_entry_mut(
index: &mut BucketStorage<IndexBucket>,
key: &Pubkey,
random: u64,
) -> Result<(Option<&'a mut IndexEntry>, u64), BucketMapError> {
) -> Result<(Option<IndexEntryPlaceInBucket>, u64), BucketMapError> {
let ix = Self::bucket_index_ix(index, key, random);
let mut first_free = None;
let mut m = Measure::start("bucket_find_index_entry_mut");
@ -181,15 +181,15 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
let elem = IndexEntryPlaceInBucket::new(ii);
if elem.key(index) == key {
m.stop();
index
.stats
.find_index_entry_mut_us
.fetch_add(m.as_us(), Ordering::Relaxed);
return Ok((Some(index.get_mut(ii)), ii));
return Ok((Some(elem), ii));
}
}
m.stop();
@ -203,19 +203,19 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
}
}
fn bucket_find_index_entry<'a>(
index: &'a BucketStorage<IndexBucket>,
fn bucket_find_index_entry(
index: &BucketStorage<IndexBucket>,
key: &Pubkey,
random: u64,
) -> Option<(&'a IndexEntry, u64)> {
) -> Option<(IndexEntryPlaceInBucket, u64)> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.is_free(ii) {
continue;
}
let elem: &IndexEntry = index.get(ii);
if elem.key == *key {
let elem = IndexEntryPlaceInBucket::new(ii);
if elem.key(index) == key {
return Some((elem, ii));
}
}
@ -236,10 +236,9 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
continue;
}
index.occupy(ii, is_resizing).unwrap();
let elem: &mut IndexEntry = index.get_mut(ii);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(key);
IndexEntryPlaceInBucket::new(ii).init(index, key);
//debug!( "INDEX ALLOC {:?} {} {} {}", key, ii, index.capacity, elem_uid );
m.stop();
index
@ -259,7 +258,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_index_entry(key)?;
elem.read_value(self)
elem.read_value(&self.index, &self.data)
}
pub fn try_write(
@ -281,23 +280,23 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
} else {
let is_resizing = false;
self.index.occupy(elem_ix, is_resizing).unwrap();
let elem_allocate = IndexEntryPlaceInBucket::new(elem_ix);
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
let elem_allocate: &mut IndexEntry = self.index.get_mut(elem_ix);
elem_allocate.init(key);
elem_allocate.init(&mut self.index, key);
elem_allocate
};
elem.ref_count = ref_count;
let bucket_ix = elem.data_bucket_ix();
elem.set_ref_count(&mut self.index, ref_count);
let bucket_ix = elem.data_bucket_ix(&self.index);
let current_bucket = &self.data[bucket_ix as usize];
let num_slots = data_len as u64;
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
if best_fit_bucket == bucket_ix && elem.num_slots(&self.index) > 0 {
// in place update
let elem_loc = elem.data_loc(current_bucket);
let elem_loc = elem.data_loc(&self.index, current_bucket);
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data_len as u64);
assert!(!current_bucket.is_free(elem_loc));
elem.num_slots = num_slots;
elem.set_num_slots(&mut self.index, num_slots);
slice.iter_mut().zip(data).for_each(|(dest, src)| {
*dest = *src;
@ -320,11 +319,14 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
for i in pos..pos + (max_search * 10).min(cap) {
let ix = i % cap;
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(current_bucket);
let old_slots = elem.num_slots;
elem.set_storage_offset(ix);
elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
elem.num_slots = num_slots;
let elem_loc = elem.data_loc(&self.index, current_bucket);
let old_slots = elem.num_slots(&self.index);
elem.set_storage_offset(&mut self.index, ix);
elem.set_storage_capacity_when_created_pow2(
&mut self.index,
best_bucket.capacity_pow2,
);
elem.set_num_slots(&mut self.index, num_slots);
if old_slots > 0 {
let current_bucket = &mut self.data[bucket_ix as usize];
current_bucket.free(elem_loc);
@ -347,10 +349,10 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
if elem.num_slots > 0 {
let ix = elem.data_bucket_ix() as usize;
if elem.num_slots(&self.index) > 0 {
let ix = elem.data_bucket_ix(&self.index) as usize;
let data_bucket = &self.data[ix];
let loc = elem.data_loc(data_bucket);
let loc = elem.data_loc(&self.index, data_bucket);
let data_bucket = &mut self.data[ix];
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
data_bucket.free(loc);

View File

@ -179,12 +179,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
}
}
fn get_start_offset_with_header(&self, ix: u64) -> usize {
pub(crate) fn get_start_offset_with_header(&self, ix: u64) -> usize {
assert!(ix < self.capacity(), "bad index size");
(self.cell_size * ix) as usize
}
fn get_start_offset_no_header(&self, ix: u64) -> usize {
pub(crate) fn get_start_offset_no_header(&self, ix: u64) -> usize {
self.get_start_offset_with_header(ix) + O::offset_to_first_data()
}

View File

@ -2,7 +2,6 @@
use {
crate::{
bucket::Bucket,
bucket_storage::{BucketOccupied, BucketStorage},
RefCount,
},
@ -56,16 +55,22 @@ impl BucketOccupied for BucketWithBitVec {
pub type DataBucket = BucketWithBitVec;
pub type IndexBucket = BucketWithBitVec;
/// contains the index of an entry in the index bucket.
/// This type allows us to call methods to interact with the index entry on this type.
pub struct IndexEntryPlaceInBucket {
pub ix: u64,
}
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
// one instance of this per item in the index
// stored in the index bucket
pub struct IndexEntry {
pub key: Pubkey, // can this be smaller if we have reduced the keys into buckets already?
pub ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts?
ref_count: RefCount, // can this be smaller? Do we ever need more than 4B refcounts?
storage_cap_and_offset: PackedStorage,
// if the bucket doubled, the index can be recomputed using create_bucket_capacity_pow2
pub num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list
num_slots: Slot, // can this be smaller? epoch size should ~ be the max len. this is the num elements in the slot list
}
/// Pack the storage offset and capacity-when-crated-pow2 fields into a single u64
@ -78,27 +83,6 @@ struct PackedStorage {
}
impl IndexEntry {
pub fn init(&mut self, pubkey: &Pubkey) {
self.key = *pubkey;
self.ref_count = 0;
self.storage_cap_and_offset = PackedStorage::default();
self.num_slots = 0;
}
pub fn set_storage_capacity_when_created_pow2(
&mut self,
storage_capacity_when_created_pow2: u8,
) {
self.storage_cap_and_offset
.set_capacity_when_created_pow2(storage_capacity_when_created_pow2)
}
pub fn set_storage_offset(&mut self, storage_offset: u64) {
self.storage_cap_and_offset
.set_offset_checked(storage_offset)
.expect("New storage offset must fit into 7 bytes!")
}
/// return closest bucket index fit for the slot slice.
/// Since bucket size is 2^index, the return value is
/// min index, such that 2^index >= num_slots
@ -112,47 +96,131 @@ impl IndexEntry {
(Slot::BITS - (num_slots - 1).leading_zeros()) as u64
}
}
}
pub fn data_bucket_ix(&self) -> u64 {
Self::data_bucket_from_num_slots(self.num_slots)
impl IndexEntryPlaceInBucket {
pub fn init(&self, index_bucket: &mut BucketStorage<IndexBucket>, pubkey: &Pubkey) {
let index_entry = index_bucket.get_mut::<IndexEntry>(self.ix);
index_entry.key = *pubkey;
index_entry.ref_count = 0;
index_entry.storage_cap_and_offset = PackedStorage::default();
index_entry.num_slots = 0;
}
pub fn ref_count(&self) -> RefCount {
self.ref_count
pub fn set_storage_capacity_when_created_pow2(
&self,
index_bucket: &mut BucketStorage<IndexBucket>,
storage_capacity_when_created_pow2: u8,
) {
index_bucket
.get_mut::<IndexEntry>(self.ix)
.storage_cap_and_offset
.set_capacity_when_created_pow2(storage_capacity_when_created_pow2)
}
fn storage_capacity_when_created_pow2(&self) -> u8 {
self.storage_cap_and_offset.capacity_when_created_pow2()
pub fn set_storage_offset(
&self,
index_bucket: &mut BucketStorage<IndexBucket>,
storage_offset: u64,
) {
index_bucket
.get_mut::<IndexEntry>(self.ix)
.storage_cap_and_offset
.set_offset_checked(storage_offset)
.expect("New storage offset must fit into 7 bytes!");
}
fn storage_offset(&self) -> u64 {
self.storage_cap_and_offset.offset()
pub fn data_bucket_ix(&self, index_bucket: &BucketStorage<IndexBucket>) -> u64 {
IndexEntry::data_bucket_from_num_slots(self.num_slots(index_bucket))
}
// This function maps the original data location into an index in the current bucket storage.
// This is coupled with how we resize bucket storages.
pub fn data_loc(&self, storage: &BucketStorage<DataBucket>) -> u64 {
self.storage_offset() << (storage.capacity_pow2 - self.storage_capacity_when_created_pow2())
pub fn ref_count(&self, index_bucket: &BucketStorage<IndexBucket>) -> RefCount {
let index_entry = index_bucket.get::<IndexEntry>(self.ix);
index_entry.ref_count
}
pub fn read_value<'a, T: 'static>(&self, bucket: &'a Bucket<T>) -> Option<(&'a [T], RefCount)> {
let slice = if self.num_slots > 0 {
let data_bucket_ix = self.data_bucket_ix();
let data_bucket = &bucket.data[data_bucket_ix as usize];
let loc = self.data_loc(data_bucket);
fn storage_capacity_when_created_pow2(&self, index_bucket: &BucketStorage<IndexBucket>) -> u8 {
let index_entry = index_bucket.get::<IndexEntry>(self.ix);
index_entry
.storage_cap_and_offset
.capacity_when_created_pow2()
}
pub fn storage_offset(&self, index_bucket: &BucketStorage<IndexBucket>) -> u64 {
index_bucket
.get::<IndexEntry>(self.ix)
.storage_cap_and_offset
.offset()
}
/// This function maps the original data location into an index in the current bucket storage.
/// This is coupled with how we resize bucket storages.
pub fn data_loc(
&self,
index_bucket: &BucketStorage<IndexBucket>,
storage: &BucketStorage<DataBucket>,
) -> u64 {
let index_entry = index_bucket.get::<IndexEntry>(self.ix);
self.storage_offset(index_bucket)
<< (storage.capacity_pow2
- index_entry
.storage_cap_and_offset
.capacity_when_created_pow2())
}
pub fn read_value<'a, T>(
&self,
index_bucket: &BucketStorage<IndexBucket>,
data_buckets: &'a [BucketStorage<DataBucket>],
) -> Option<(&'a [T], RefCount)> {
let num_slots = self.num_slots(index_bucket);
let slice = if num_slots > 0 {
let data_bucket_ix = self.data_bucket_ix(index_bucket);
let data_bucket = &data_buckets[data_bucket_ix as usize];
let loc = self.data_loc(index_bucket, data_bucket);
assert!(!data_bucket.is_free(loc));
data_bucket.get_cell_slice(loc, self.num_slots)
data_bucket.get_cell_slice(loc, num_slots)
} else {
// num_slots is 0. This means we don't have an actual allocation.
&[]
};
Some((slice, self.ref_count))
Some((slice, self.ref_count(index_bucket)))
}
pub fn new(ix: u64) -> Self {
Self { ix }
}
pub fn key<'a>(&self, index_bucket: &'a BucketStorage<IndexBucket>) -> &'a Pubkey {
let entry: &IndexEntry = index_bucket.get(self.ix);
&entry.key
}
pub fn set_ref_count(
&self,
index_bucket: &mut BucketStorage<IndexBucket>,
ref_count: RefCount,
) {
let index_entry = index_bucket.get_mut::<IndexEntry>(self.ix);
index_entry.ref_count = ref_count;
}
pub fn num_slots(&self, index_bucket: &BucketStorage<IndexBucket>) -> Slot {
index_bucket.get::<IndexEntry>(self.ix).num_slots
}
pub fn set_num_slots(&self, index_bucket: &mut BucketStorage<IndexBucket>, num_slots: Slot) {
index_bucket.get_mut::<IndexEntry>(self.ix).num_slots = num_slots;
}
}
#[cfg(test)]
mod tests {
use super::*;
use {
super::*,
std::{path::PathBuf, sync::Arc},
tempfile::tempdir,
};
impl IndexEntry {
pub fn new(key: Pubkey) -> Self {
@ -170,16 +238,19 @@ mod tests {
#[test]
fn test_api() {
for offset in [0, 1, u32::MAX as u64] {
let mut index = IndexEntry::new(solana_sdk::pubkey::new_rand());
let (mut index_bucket, index) = index_entry_for_testing();
if offset != 0 {
index.set_storage_offset(offset);
index.set_storage_offset(&mut index_bucket, offset);
}
assert_eq!(index.storage_offset(), offset);
assert_eq!(index.storage_capacity_when_created_pow2(), 0);
assert_eq!(index.storage_offset(&index_bucket,), offset);
assert_eq!(index.storage_capacity_when_created_pow2(&index_bucket,), 0);
for pow in [1, 255, 0] {
index.set_storage_capacity_when_created_pow2(pow);
assert_eq!(index.storage_offset(), offset);
assert_eq!(index.storage_capacity_when_created_pow2(), pow);
index.set_storage_capacity_when_created_pow2(&mut index_bucket, pow);
assert_eq!(index.storage_offset(&index_bucket,), offset);
assert_eq!(
index.storage_capacity_when_created_pow2(&index_bucket,),
pow
);
}
}
}
@ -190,12 +261,32 @@ mod tests {
assert_eq!(std::mem::size_of::<IndexEntry>(), 32 + 8 + 8 + 8);
}
fn index_bucket_for_testing() -> BucketStorage<IndexBucket> {
let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
// `new` here creates a file in `tmpdir`. Once the file is created, `tmpdir` can be dropped without issue.
BucketStorage::<IndexBucket>::new(
Arc::new(paths),
1,
std::mem::size_of::<IndexEntry>() as u64,
1,
Arc::default(),
Arc::default(),
)
}
fn index_entry_for_testing() -> (BucketStorage<IndexBucket>, IndexEntryPlaceInBucket) {
(index_bucket_for_testing(), IndexEntryPlaceInBucket::new(0))
}
#[test]
#[should_panic(expected = "New storage offset must fit into 7 bytes!")]
fn test_set_storage_offset_value_too_large() {
let too_big = 1 << 56;
let mut index = IndexEntry::new(Pubkey::new_unique());
index.set_storage_offset(too_big);
let (mut index_bucket, index) = index_entry_for_testing();
index.set_storage_offset(&mut index_bucket, too_big);
}
#[test]