return file name as u128 for disk buckets (#33044)

This commit is contained in:
Jeff Washington (jwash) 2023-08-29 08:16:54 -07:00 committed by GitHub
parent 9bc09c9610
commit 551317dd31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 30 deletions

View File

@ -116,7 +116,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
stats: Arc<BucketMapStats>, stats: Arc<BucketMapStats>,
count: Arc<AtomicU64>, count: Arc<AtomicU64>,
) -> Self { ) -> Self {
let index = BucketStorage::new( let (index, _file_name) = BucketStorage::new(
Arc::clone(&drives), Arc::clone(&drives),
1, 1,
std::mem::size_of::<IndexEntry<T>>() as u64, std::mem::size_of::<IndexEntry<T>>() as u64,
@ -569,7 +569,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
count += 1; count += 1;
// grow relative to the current capacity // grow relative to the current capacity
let new_capacity = (current_capacity * 110 / 100).max(anticipated_size); let new_capacity = (current_capacity * 110 / 100).max(anticipated_size);
let mut index = BucketStorage::new_with_capacity( let (mut index, _file_name) = BucketStorage::new_with_capacity(
Arc::clone(&self.drives), Arc::clone(&self.drives),
1, 1,
std::mem::size_of::<IndexEntry<T>>() as u64, std::mem::size_of::<IndexEntry<T>>() as u64,
@ -649,14 +649,17 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
if self.data.get(ix).is_none() { if self.data.get(ix).is_none() {
for i in self.data.len()..ix { for i in self.data.len()..ix {
// insert empty data buckets // insert empty data buckets
self.add_data_bucket(BucketStorage::new( self.add_data_bucket(
Arc::clone(&self.drives), BucketStorage::new(
1 << i, Arc::clone(&self.drives),
Self::elem_size(), 1 << i,
self.index.max_search, Self::elem_size(),
Arc::clone(&self.stats.data), self.index.max_search,
Arc::default(), Arc::clone(&self.stats.data),
)); Arc::default(),
)
.0,
);
} }
self.add_data_bucket(bucket); self.add_data_bucket(bucket);
} else { } else {
@ -671,7 +674,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
/// grow a data bucket /// grow a data bucket
/// The application of the new bucket is deferred until the next write lock. /// The application of the new bucket is deferred until the next write lock.
pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8) { pub fn grow_data(&self, data_index: u64, current_capacity_pow2: u8) {
let new_bucket = BucketStorage::new_resized( let (new_bucket, _file_name) = BucketStorage::new_resized(
&self.drives, &self.drives,
self.index.max_search, self.index.max_search,
self.data.get(data_index as usize), self.data.get(data_index as usize),
@ -798,6 +801,7 @@ mod tests {
Arc::default(), Arc::default(),
Arc::default(), Arc::default(),
) )
.0
} }
#[test] #[test]

View File

@ -137,7 +137,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
max_search: MaxSearch, max_search: MaxSearch,
stats: Arc<BucketStats>, stats: Arc<BucketStats>,
count: Arc<AtomicU64>, count: Arc<AtomicU64>,
) -> Self { ) -> (Self, u128) {
let offset = O::offset_to_first_data(); let offset = O::offset_to_first_data();
let size_of_u64 = std::mem::size_of::<u64>(); let size_of_u64 = std::mem::size_of::<u64>();
assert_eq!( assert_eq!(
@ -147,16 +147,19 @@ impl<O: BucketOccupied> BucketStorage<O> {
); );
let cell_size = elem_size * num_elems + offset as u64; let cell_size = elem_size * num_elems + offset as u64;
let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size); let bytes = Self::allocate_to_fill_page(&mut capacity, cell_size);
let (mmap, path) = Self::new_map(&drives, bytes, &stats); let (mmap, path, file_name) = Self::new_map(&drives, bytes, &stats);
Self { (
path, Self {
mmap, path,
cell_size, mmap,
count, cell_size,
stats, count,
max_search, stats,
contents: O::new(capacity), max_search,
} contents: O::new(capacity),
},
file_name,
)
} }
fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 { fn allocate_to_fill_page(capacity: &mut Capacity, cell_size: u64) -> u64 {
@ -187,7 +190,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
max_search: MaxSearch, max_search: MaxSearch,
stats: Arc<BucketStats>, stats: Arc<BucketStats>,
count: Arc<AtomicU64>, count: Arc<AtomicU64>,
) -> Self { ) -> (Self, u128) {
Self::new_with_capacity( Self::new_with_capacity(
drives, drives,
num_elems, num_elems,
@ -335,11 +338,12 @@ impl<O: BucketOccupied> BucketStorage<O> {
} }
/// allocate a new memory mapped file of size `bytes` on one of `drives` /// allocate a new memory mapped file of size `bytes` on one of `drives`
fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf) { fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
let mut measure_new_file = Measure::start("measure_new_file"); let mut measure_new_file = Measure::start("measure_new_file");
let r = thread_rng().gen_range(0..drives.len()); let r = thread_rng().gen_range(0..drives.len());
let drive = &drives[r]; let drive = &drives[r];
let pos = format!("{}", thread_rng().gen_range(0..u128::MAX),); let file_random = thread_rng().gen_range(0..u128::MAX);
let pos = format!("{}", file_random,);
let file = drive.join(pos); let file = drive.join(pos);
let mut data = OpenOptions::new() let mut data = OpenOptions::new()
.read(true) .read(true)
@ -368,7 +372,11 @@ impl<O: BucketOccupied> BucketStorage<O> {
data.flush().unwrap(); // can we skip this? data.flush().unwrap(); // can we skip this?
measure_flush.stop(); measure_flush.stop();
let mut measure_mmap = Measure::start("measure_mmap"); let mut measure_mmap = Measure::start("measure_mmap");
let res = (unsafe { MmapMut::map_mut(&data).unwrap() }, file); let res = (
unsafe { MmapMut::map_mut(&data).unwrap() },
file,
file_random,
);
measure_mmap.stop(); measure_mmap.stop();
stats stats
.new_file_us .new_file_us
@ -433,8 +441,8 @@ impl<O: BucketOccupied> BucketStorage<O> {
num_elems: u64, num_elems: u64,
elem_size: u64, elem_size: u64,
stats: &Arc<BucketStats>, stats: &Arc<BucketStats>,
) -> Self { ) -> (Self, u128) {
let mut new_bucket = Self::new_with_capacity( let (mut new_bucket, file_name) = Self::new_with_capacity(
Arc::clone(drives), Arc::clone(drives),
num_elems, num_elems,
elem_size, elem_size,
@ -449,7 +457,7 @@ impl<O: BucketOccupied> BucketStorage<O> {
new_bucket.copy_contents(bucket); new_bucket.copy_contents(bucket);
} }
new_bucket.update_max_size(); new_bucket.update_max_size();
new_bucket (new_bucket, file_name)
} }
/// Return the number of bytes currently allocated /// Return the number of bytes currently allocated
@ -484,7 +492,8 @@ mod test {
1, 1,
Arc::default(), Arc::default(),
Arc::default(), Arc::default(),
); )
.0;
let ix = 0; let ix = 0;
assert!(storage.is_free(ix)); assert!(storage.is_free(ix));
assert!(storage.occupy(ix, false).is_ok()); assert!(storage.occupy(ix, false).is_ok());