diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index 4abf127bd..388f654dc 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -6,7 +6,7 @@ use { std::{ fs::{remove_file, OpenOptions}, io::{Seek, SeekFrom, Write}, - path::PathBuf, + path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -342,57 +342,79 @@ impl BucketStorage { unsafe { std::slice::from_raw_parts_mut(ptr, len as usize) } } - /// allocate a new memory mapped file of size `bytes` on one of `drives` - fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) { + /// open a disk bucket file and mmap it + /// optionally creates it. + fn map_open_file( + path: impl AsRef + std::fmt::Debug + Clone, + create: bool, + create_bytes: u64, + stats: &BucketStats, + ) -> Option { let mut measure_new_file = Measure::start("measure_new_file"); - let r = thread_rng().gen_range(0..drives.len()); - let drive = &drives[r]; - let file_random = thread_rng().gen_range(0..u128::MAX); - let pos = format!("{}", file_random,); - let file = drive.join(pos); - let mut data = OpenOptions::new() + let data = OpenOptions::new() .read(true) .write(true) - .create(true) - .open(file.clone()) - .map_err(|e| { - panic!( - "Unable to create data file {} in current dir({:?}): {:?}", - file.display(), - std::env::current_dir(), - e - ); - }) - .unwrap(); + .create(create) + .open(path.clone()); + if let Err(e) = data { + if !create { + // we can't load this file, so bail without error + return None; + } + panic!( + "Unable to create data file {:?} in current dir({:?}): {:?}", + path, + std::env::current_dir(), + e + ); + } + let mut data = data.unwrap(); - // Theoretical performance optimization: write a zero to the end of - // the file so that we won't have to resize it later, which may be - // expensive. - //debug!("GROWING file {}", capacity * cell_size as u64); - data.seek(SeekFrom::Start(bytes - 1)).unwrap(); - data.write_all(&[0]).unwrap(); - data.rewind().unwrap(); - measure_new_file.stop(); - let mut measure_flush = Measure::start("measure_flush"); - data.flush().unwrap(); // can we skip this? - measure_flush.stop(); + if create { + // Theoretical performance optimization: write a zero to the end of + // the file so that we won't have to resize it later, which may be + // expensive. + //debug!("GROWING file {}", capacity * cell_size as u64); + data.seek(SeekFrom::Start(create_bytes - 1)).unwrap(); + data.write_all(&[0]).unwrap(); + data.rewind().unwrap(); + measure_new_file.stop(); + let measure_flush = Measure::start("measure_flush"); + data.flush().unwrap(); // can we skip this? + stats + .flush_file_us + .fetch_add(measure_flush.end_as_us(), Ordering::Relaxed); + } let mut measure_mmap = Measure::start("measure_mmap"); - let res = ( - unsafe { MmapMut::map_mut(&data).unwrap() }, - file, - file_random, - ); + let res = unsafe { MmapMut::map_mut(&data) }; + if let Err(e) = res { + panic!( + "Unable to mmap file {:?} in current dir({:?}): {:?}", + path, + std::env::current_dir(), + e + ); + } measure_mmap.stop(); stats .new_file_us .fetch_add(measure_new_file.as_us(), Ordering::Relaxed); - stats - .flush_file_us - .fetch_add(measure_flush.as_us(), Ordering::Relaxed); stats .mmap_us .fetch_add(measure_mmap.as_us(), Ordering::Relaxed); - res + res.ok() + } + + /// allocate a new memory mapped file of size `bytes` on one of `drives` + fn new_map(drives: &[PathBuf], bytes: u64, stats: &BucketStats) -> (MmapMut, PathBuf, u128) { + let r = thread_rng().gen_range(0..drives.len()); + let drive = &drives[r]; + let file_random = thread_rng().gen_range(0..u128::MAX); + let pos = format!("{}", file_random,); + let file = drive.join(pos); + let res = Self::map_open_file(file.clone(), true, bytes, stats).unwrap(); + + (res, file, file_random) } /// copy contents from 'old_bucket' to 'self'