From 1fc4264a1c3e513e6fb15c44214cc399ac553e37 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Thu, 21 Sep 2023 06:47:28 -0700 Subject: [PATCH] add bucket::load_on_restart (#33328) --- bucket_map/src/bucket_storage.rs | 163 +++++++++++++++++++++++++++++-- 1 file changed, 157 insertions(+), 6 deletions(-) diff --git a/bucket_map/src/bucket_storage.rs b/bucket_map/src/bucket_storage.rs index c331f6b78..c81c6a1a7 100644 --- a/bucket_map/src/bucket_storage.rs +++ b/bucket_map/src/bucket_storage.rs @@ -6,6 +6,7 @@ use { std::{ fs::{remove_file, OpenOptions}, io::{Seek, SeekFrom, Write}, + num::NonZeroU64, path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, Ordering}, @@ -218,6 +219,37 @@ impl BucketStorage { offset } + // temporary tag + #[allow(dead_code)] + /// load and mmap the file that is this disk bucket if possible + pub(crate) fn load_on_restart( + path: PathBuf, + elem_size: NonZeroU64, + max_search: MaxSearch, + stats: Arc, + count: Arc, + ) -> Option { + let offset = Self::get_offset_to_first_data(); + let num_elems = std::fs::metadata(&path) + .ok() + .map(|metadata| metadata.len().saturating_sub(offset) / elem_size)?; + if num_elems == 0 { + return None; + } + let mmap = Self::map_open_file(&path, false, 0, &stats)?; + Some(Self { + path, + mmap, + cell_size: elem_size.into(), + count, + stats, + max_search, + contents: O::new(Capacity::Actual(num_elems)), + // since we loaded it, it persisted from last time, so we obviously want to keep it present disk. + delete_file_on_drop: false, + }) + } + pub(crate) fn copying_entry(&mut self, ix_new: u64, other: &Self, ix_old: u64) { let start = self.get_start_offset_with_header(ix_new); let start_old = other.get_start_offset_with_header(ix_old); @@ -523,13 +555,14 @@ mod test { let paths: Vec = vec![tmpdir.path().to_path_buf()]; assert!(!paths.is_empty()); + let drives = Arc::new(paths); + let num_elems = 1; + let elem_size = std::mem::size_of::>() as u64; + let max_search = 1; + let stats = Arc::default(); + let count = Arc::default(); let mut storage = BucketStorage::>::new( - Arc::new(paths), - 1, - std::mem::size_of::>() as u64, - 1, - Arc::default(), - Arc::default(), + drives, num_elems, elem_size, max_search, stats, count, ) .0; let ix = 0; @@ -547,6 +580,124 @@ mod test { assert!(storage.is_free(ix)); } + #[test] + fn test_load_on_restart_failures() { + let tmpdir = tempdir().unwrap(); + let paths: Vec = vec![tmpdir.path().to_path_buf()]; + assert!(!paths.is_empty()); + let elem_size = std::mem::size_of::>() as u64; + let max_search = 1; + let stats = Arc::new(BucketStats::default()); + let count = Arc::new(AtomicU64::default()); + // file doesn't exist + assert!(BucketStorage::>::load_on_restart( + PathBuf::from(tmpdir.path()), + NonZeroU64::new(elem_size).unwrap(), + max_search, + stats.clone(), + count.clone(), + ) + .is_none()); + solana_logger::setup(); + for len in [0, 1, 47, 48, 49, 4097] { + // create a zero len file. That will fail to load since it is too small. + let path = tmpdir.path().join("small"); + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path.clone()) + .unwrap(); + _ = file.write_all(&vec![1u8; len]); + drop(file); + assert_eq!(std::fs::metadata(&path).unwrap().len(), len as u64); + let result = BucketStorage::>::load_on_restart( + path, + NonZeroU64::new(elem_size).unwrap(), + max_search, + stats.clone(), + count.clone(), + ); + if let Some(result) = result.as_ref() { + assert_eq!(result.capacity() as usize, len / elem_size as usize); + assert_eq!( + result.capacity_bytes() as usize, + len / elem_size as usize * elem_size as usize + ); + } + assert_eq!(result.is_none(), len < elem_size as usize, "{len}"); + } + } + + #[test] + fn test_load_on_restart() { + for request in [Some(7), None] { + let tmpdir = tempdir().unwrap(); + let paths: Vec = vec![tmpdir.path().to_path_buf()]; + assert!(!paths.is_empty()); + let drives = Arc::new(paths); + let num_elems = 1; + let elem_size = std::mem::size_of::>() as u64; + let max_search = 1; + let stats = Arc::new(BucketStats::default()); + let count = Arc::new(AtomicU64::default()); + let mut storage = if let Some(actual_elems) = request { + BucketStorage::>::new_with_capacity( + drives, + num_elems, + elem_size, + Capacity::Actual(actual_elems), + max_search, + stats.clone(), + count.clone(), + ) + .0 + } else { + BucketStorage::>::new( + drives, + num_elems, + elem_size, + max_search, + stats.clone(), + count.clone(), + ) + .0 + }; + let expected_capacity = storage.capacity(); + (0..num_elems).for_each(|ix| { + assert!(storage.is_free(ix)); + assert!(storage.occupy(ix, false).is_ok()); + }); + storage.delete_file_on_drop = false; + let len = storage.mmap.len(); + (0..expected_capacity as usize).for_each(|i| { + storage.mmap[i] = (i % 256) as u8; + }); + // close storage + let path = storage.path.clone(); + drop(storage); + + // re load and remap storage file + let storage = BucketStorage::>::load_on_restart( + path, + NonZeroU64::new(elem_size).unwrap(), + max_search, + stats, + count, + ) + .unwrap(); + assert_eq!(storage.capacity(), expected_capacity); + assert_eq!(len, storage.mmap.len()); + (0..expected_capacity as usize).for_each(|i| { + assert_eq!(storage.mmap[i], (i % 256) as u8); + }); + (0..num_elems).for_each(|ix| { + // all should be marked as free + assert!(storage.is_free(ix)); + }); + } + } + #[test] #[should_panic] fn test_header_bad_size() {