add bucket::load_on_restart (#33328)

This commit is contained in:
Jeff Washington (jwash) 2023-09-21 06:47:28 -07:00 committed by GitHub
parent 357eabd5f3
commit 1fc4264a1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 157 additions and 6 deletions

View File

@ -6,6 +6,7 @@ use {
std::{
fs::{remove_file, OpenOptions},
io::{Seek, SeekFrom, Write},
num::NonZeroU64,
path::{Path, PathBuf},
sync::{
atomic::{AtomicU64, Ordering},
@ -218,6 +219,37 @@ impl<O: BucketOccupied> BucketStorage<O> {
offset
}
// temporary tag
#[allow(dead_code)]
/// load and mmap the file that is this disk bucket if possible
pub(crate) fn load_on_restart(
path: PathBuf,
elem_size: NonZeroU64,
max_search: MaxSearch,
stats: Arc<BucketStats>,
count: Arc<AtomicU64>,
) -> Option<Self> {
let offset = Self::get_offset_to_first_data();
let num_elems = std::fs::metadata(&path)
.ok()
.map(|metadata| metadata.len().saturating_sub(offset) / elem_size)?;
if num_elems == 0 {
return None;
}
let mmap = Self::map_open_file(&path, false, 0, &stats)?;
Some(Self {
path,
mmap,
cell_size: elem_size.into(),
count,
stats,
max_search,
contents: O::new(Capacity::Actual(num_elems)),
// since we loaded it, it persisted from last time, so we obviously want to keep it present disk.
delete_file_on_drop: false,
})
}
pub(crate) fn copying_entry(&mut self, ix_new: u64, other: &Self, ix_old: u64) {
let start = self.get_start_offset_with_header(ix_new);
let start_old = other.get_start_offset_with_header(ix_old);
@ -523,13 +555,14 @@ mod test {
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let drives = Arc::new(paths);
let num_elems = 1;
let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
let max_search = 1;
let stats = Arc::default();
let count = Arc::default();
let mut storage = BucketStorage::<IndexBucket<u64>>::new(
Arc::new(paths),
1,
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64,
1,
Arc::default(),
Arc::default(),
drives, num_elems, elem_size, max_search, stats, count,
)
.0;
let ix = 0;
@ -547,6 +580,124 @@ mod test {
assert!(storage.is_free(ix));
}
#[test]
fn test_load_on_restart_failures() {
let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
let max_search = 1;
let stats = Arc::new(BucketStats::default());
let count = Arc::new(AtomicU64::default());
// file doesn't exist
assert!(BucketStorage::<IndexBucket<u64>>::load_on_restart(
PathBuf::from(tmpdir.path()),
NonZeroU64::new(elem_size).unwrap(),
max_search,
stats.clone(),
count.clone(),
)
.is_none());
solana_logger::setup();
for len in [0, 1, 47, 48, 49, 4097] {
// create a zero len file. That will fail to load since it is too small.
let path = tmpdir.path().join("small");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path.clone())
.unwrap();
_ = file.write_all(&vec![1u8; len]);
drop(file);
assert_eq!(std::fs::metadata(&path).unwrap().len(), len as u64);
let result = BucketStorage::<IndexBucket<u64>>::load_on_restart(
path,
NonZeroU64::new(elem_size).unwrap(),
max_search,
stats.clone(),
count.clone(),
);
if let Some(result) = result.as_ref() {
assert_eq!(result.capacity() as usize, len / elem_size as usize);
assert_eq!(
result.capacity_bytes() as usize,
len / elem_size as usize * elem_size as usize
);
}
assert_eq!(result.is_none(), len < elem_size as usize, "{len}");
}
}
#[test]
fn test_load_on_restart() {
for request in [Some(7), None] {
let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let drives = Arc::new(paths);
let num_elems = 1;
let elem_size = std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64;
let max_search = 1;
let stats = Arc::new(BucketStats::default());
let count = Arc::new(AtomicU64::default());
let mut storage = if let Some(actual_elems) = request {
BucketStorage::<IndexBucket<u64>>::new_with_capacity(
drives,
num_elems,
elem_size,
Capacity::Actual(actual_elems),
max_search,
stats.clone(),
count.clone(),
)
.0
} else {
BucketStorage::<IndexBucket<u64>>::new(
drives,
num_elems,
elem_size,
max_search,
stats.clone(),
count.clone(),
)
.0
};
let expected_capacity = storage.capacity();
(0..num_elems).for_each(|ix| {
assert!(storage.is_free(ix));
assert!(storage.occupy(ix, false).is_ok());
});
storage.delete_file_on_drop = false;
let len = storage.mmap.len();
(0..expected_capacity as usize).for_each(|i| {
storage.mmap[i] = (i % 256) as u8;
});
// close storage
let path = storage.path.clone();
drop(storage);
// re load and remap storage file
let storage = BucketStorage::<IndexBucket<u64>>::load_on_restart(
path,
NonZeroU64::new(elem_size).unwrap(),
max_search,
stats,
count,
)
.unwrap();
assert_eq!(storage.capacity(), expected_capacity);
assert_eq!(len, storage.mmap.len());
(0..expected_capacity as usize).for_each(|i| {
assert_eq!(storage.mmap[i], (i % 256) as u8);
});
(0..num_elems).for_each(|ix| {
// all should be marked as free
assert!(storage.is_free(ix));
});
}
}
#[test]
#[should_panic]
fn test_header_bad_size() {