refactor bucket storage file open (#33314)

This commit is contained in:
Jeff Washington (jwash) 2023-09-19 19:26:43 -07:00 committed by GitHub
parent 288e8a682a
commit 58f980a19b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 62 additions and 40 deletions

View File

@ -6,7 +6,7 @@ use {
std::{ std::{
fs::{remove_file, OpenOptions}, fs::{remove_file, OpenOptions},
io::{Seek, SeekFrom, Write}, io::{Seek, SeekFrom, Write},
path::PathBuf, path::{Path, PathBuf},
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc,
@ -342,57 +342,79 @@ impl<O: BucketOccupied> BucketStorage<O> {
unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) } unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) }
} }
/// allocate a new memory mapped file of size `bytes` on one of `drives` /// open a disk bucket file and mmap it
fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) { /// optionally creates it.
fn map_open_file(
path: impl AsRef<Path> + std::fmt::Debug + Clone,
create: bool,
create_bytes: u64,
stats: &BucketStats,
) -> Option<MmapMut> {
let mut measure_new_file = Measure::start("measure_new_file"); let mut measure_new_file = Measure::start("measure_new_file");
let r = thread_rng().gen_range(0..drives.len()); let data = OpenOptions::new()
let drive = &drives[r];
let file_random = thread_rng().gen_range(0..u128::MAX);
let pos = format!("{}", file_random,);
let file = drive.join(pos);
let mut data = OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
.create(true) .create(create)
.open(file.clone()) .open(path.clone());
.map_err(|e| { if let Err(e) = data {
panic!( if !create {
"Unable to create data file {} in current dir({:?}): {:?}", // we can't load this file, so bail without error
file.display(), return None;
std::env::current_dir(), }
e panic!(
); "Unable to create data file {:?} in current dir({:?}): {:?}",
}) path,
.unwrap(); std::env::current_dir(),
e
);
}
let mut data = data.unwrap();
// Theoretical performance optimization: write a zero to the end of if create {
// the file so that we won't have to resize it later, which may be // Theoretical performance optimization: write a zero to the end of
// expensive. // the file so that we won't have to resize it later, which may be
//debug!("GROWING file {}", capacity * cell_size as u64); // expensive.
data.seek(SeekFrom::Start(bytes - 1)).unwrap(); //debug!("GROWING file {}", capacity * cell_size as u64);
data.write_all(&[0]).unwrap(); data.seek(SeekFrom::Start(create_bytes - 1)).unwrap();
data.rewind().unwrap(); data.write_all(&[0]).unwrap();
measure_new_file.stop(); data.rewind().unwrap();
let mut measure_flush = Measure::start("measure_flush"); measure_new_file.stop();
data.flush().unwrap(); // can we skip this? let measure_flush = Measure::start("measure_flush");
measure_flush.stop(); data.flush().unwrap(); // can we skip this?
stats
.flush_file_us
.fetch_add(measure_flush.end_as_us(), Ordering::Relaxed);
}
let mut measure_mmap = Measure::start("measure_mmap"); let mut measure_mmap = Measure::start("measure_mmap");
let res = ( let res = unsafe { MmapMut::map_mut(&data) };
unsafe { MmapMut::map_mut(&data).unwrap() }, if let Err(e) = res {
file, panic!(
file_random, "Unable to mmap file {:?} in current dir({:?}): {:?}",
); path,
std::env::current_dir(),
e
);
}
measure_mmap.stop(); measure_mmap.stop();
stats stats
.new_file_us .new_file_us
.fetch_add(measure_new_file.as_us(), Ordering::Relaxed); .fetch_add(measure_new_file.as_us(), Ordering::Relaxed);
stats
.flush_file_us
.fetch_add(measure_flush.as_us(), Ordering::Relaxed);
stats stats
.mmap_us .mmap_us
.fetch_add(measure_mmap.as_us(), Ordering::Relaxed); .fetch_add(measure_mmap.as_us(), Ordering::Relaxed);
res res.ok()
}
/// allocate a new memory mapped file of size `bytes` on one of `drives`
fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) {
let r = thread_rng().gen_range(0..drives.len());
let drive = &drives[r];
let file_random = thread_rng().gen_range(0..u128::MAX);
let pos = format!("{}", file_random,);
let file = drive.join(pos);
let res = Self::map_open_file(file.clone(), true, bytes, stats).unwrap();
(res, file, file_random)
} }
/// copy contents from 'old_bucket' to 'self' /// copy contents from 'old_bucket' to 'self'