diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index b0cbe4e543..7e051ce7eb 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -22,6 +22,8 @@ macro_rules! u64_align { }; } +const MAXIMUM_APPEND_VEC_FILE_SIZE: usize = 16 * 1024 * 1024 * 1024; // 16 GiB + /// Meta contains enough context to recover the index from storage itself /// This struct will be backed by mmaped and snapshotted data files. /// So the data layout must be stable and consistent across the entire cluster! @@ -108,6 +110,9 @@ impl Drop for AppendVec { impl AppendVec { #[allow(clippy::mutex_atomic)] pub fn new(file: &Path, create: bool, size: usize) -> Self { + let initial_len = 0; + AppendVec::sanitize_len_and_size(initial_len, size).unwrap(); + if create { let _ignored = remove_file(file); } @@ -148,12 +153,46 @@ impl AppendVec { map, // This mutex forces append to be single threaded, but concurrent with reads // See UNSAFE usage in `append_ptr` - append_offset: Mutex::new(0), - current_len: AtomicUsize::new(0), + append_offset: Mutex::new(initial_len), + current_len: AtomicUsize::new(initial_len), file_size: size as u64, } } + #[allow(clippy::mutex_atomic)] + fn new_empty_map(current_len: usize) -> Self { + let map = MmapMut::map_anon(1).expect("failed to map the data file"); + + AppendVec { + path: PathBuf::from(String::default()), + map, + append_offset: Mutex::new(current_len), + current_len: AtomicUsize::new(current_len), + file_size: 0, // will be filled by set_file() + } + } + + fn sanitize_len_and_size(current_len: usize, file_size: usize) -> io::Result<()> { + if file_size == 0 { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("too small file size {} for AppendVec", file_size), + )) + } else if file_size > MAXIMUM_APPEND_VEC_FILE_SIZE { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("too large file size {} for AppendVec", file_size), + )) + } else if current_len > file_size { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("current_len is larger than file size ({})", file_size), + )) + } else { + Ok(()) + } + } + pub fn flush(&self) -> io::Result<()> { self.map.flush() } @@ -190,14 +229,25 @@ impl AppendVec { #[allow(clippy::mutex_atomic)] pub fn set_file>(&mut self, path: P) -> io::Result<()> { - self.path = path.as_ref().to_path_buf(); + // this AppendVec must not hold actual file; + assert_eq!(self.file_size, 0); + let data = OpenOptions::new() .read(true) .write(true) .create(false) .open(&path)?; + let current_len = self.current_len.load(Ordering::Relaxed); + assert_eq!(current_len, *self.append_offset.lock().unwrap()); + + let file_size = std::fs::metadata(&path)?.len(); + AppendVec::sanitize_len_and_size(current_len, file_size as usize)?; + let map = unsafe { MmapMut::map_mut(&data)? }; + + self.file_size = file_size; + self.path = path.as_ref().to_path_buf(); self.map = map; if !self.sanitize_layout_and_length() { @@ -441,9 +491,6 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { formatter.write_str("Expecting AppendVec") } - #[allow(clippy::mutex_atomic)] - // Note this does not initialize a valid Mmap in the AppendVec, needs to be done - // externally fn visit_bytes(self, data: &[u8]) -> std::result::Result where E: serde::de::Error, @@ -451,14 +498,9 @@ impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { use serde::de::Error; let mut rd = Cursor::new(&data[..]); let current_len: usize = deserialize_from(&mut rd).map_err(Error::custom)?; - let map = MmapMut::map_anon(1).map_err(|e| Error::custom(e.to_string()))?; - Ok(AppendVec { - path: PathBuf::from(String::default()), - map, - append_offset: Mutex::new(current_len), - current_len: AtomicUsize::new(current_len), - file_size: current_len as u64, - }) + // Note this does not initialize a valid Mmap in the AppendVec, needs to be done + // externally + Ok(AppendVec::new_empty_map(current_len)) } } @@ -513,6 +555,60 @@ pub mod tests { } } + #[test] + #[should_panic(expected = "too small file size 0 for AppendVec")] + fn test_append_vec_new_bad_size() { + let path = get_append_vec_path("test_append_vec_new_bad_size"); + let _av = AppendVec::new(&path.path, true, 0); + } + + #[test] + fn test_append_vec_set_file_bad_size() { + let file = get_append_vec_path("test_append_vec_set_file_bad_size"); + let path = &file.path; + let mut av = AppendVec::new_empty_map(0); + + let _data = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .expect("create a test file for mmap"); + + let result = av.set_file(path); + assert_matches!(result, Err(ref message) if message.to_string() == *"too small file size 0 for AppendVec"); + } + + #[test] + fn test_append_vec_sanitize_len_and_size_too_small() { + let result = AppendVec::sanitize_len_and_size(0, 0); + assert_matches!(result, Err(ref message) if message.to_string() == *"too small file size 0 for AppendVec"); + } + + #[test] + fn test_append_vec_sanitize_len_and_size_maximum() { + let result = AppendVec::sanitize_len_and_size(0, 16 * 1024 * 1024 * 1024); + assert_matches!(result, Ok(_)); + } + + #[test] + fn test_append_vec_sanitize_len_and_size_too_large() { + let result = AppendVec::sanitize_len_and_size(0, 16 * 1024 * 1024 * 1024 + 1); + assert_matches!(result, Err(ref message) if message.to_string() == *"too large file size 17179869185 for AppendVec"); + } + + #[test] + fn test_append_vec_sanitize_len_and_size_full_and_same_as_current_len() { + let result = AppendVec::sanitize_len_and_size(1 * 1024 * 1024, 1 * 1024 * 1024); + assert_matches!(result, Ok(_)); + } + + #[test] + fn test_append_vec_sanitize_len_and_size_larger_current_len() { + let result = AppendVec::sanitize_len_and_size(1 * 1024 * 1024 + 1, 1 * 1024 * 1024); + assert_matches!(result, Err(ref message) if message.to_string() == *"current_len is larger than file size (1048576)"); + } + #[test] fn test_append_vec_one() { let path = get_append_vec_path("test_append");