data bucket holds RestartableBucket (#33381)
This commit is contained in:
parent
456563b9e9
commit
c750ac5d38
|
@ -11,6 +11,7 @@ use {
|
||||||
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
|
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
|
||||||
OccupiedEnum,
|
OccupiedEnum,
|
||||||
},
|
},
|
||||||
|
restart::RestartableBucket,
|
||||||
MaxSearch, RefCount,
|
MaxSearch, RefCount,
|
||||||
},
|
},
|
||||||
rand::{thread_rng, Rng},
|
rand::{thread_rng, Rng},
|
||||||
|
@ -107,6 +108,9 @@ pub struct Bucket<T: Copy + PartialEq + 'static> {
|
||||||
/// set to true once any entries have been deleted from the index.
|
/// set to true once any entries have been deleted from the index.
|
||||||
/// Deletes indicate that there can be free slots and that the full search range must be searched for an entry.
|
/// Deletes indicate that there can be free slots and that the full search range must be searched for an entry.
|
||||||
at_least_one_entry_deleted: bool,
|
at_least_one_entry_deleted: bool,
|
||||||
|
|
||||||
|
/// keep track of which index file this bucket is using so on restart we can try to reuse it
|
||||||
|
restartable_bucket: RestartableBucket,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
|
@ -115,6 +119,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
max_search: MaxSearch,
|
max_search: MaxSearch,
|
||||||
stats: Arc<BucketMapStats>,
|
stats: Arc<BucketMapStats>,
|
||||||
count: Arc<AtomicU64>,
|
count: Arc<AtomicU64>,
|
||||||
|
restartable_bucket: RestartableBucket,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let (index, _file_name) = BucketStorage::new(
|
let (index, _file_name) = BucketStorage::new(
|
||||||
Arc::clone(&drives),
|
Arc::clone(&drives),
|
||||||
|
@ -125,9 +130,10 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
count,
|
count,
|
||||||
);
|
);
|
||||||
stats.index.resize_grow(0, index.capacity_bytes());
|
stats.index.resize_grow(0, index.capacity_bytes());
|
||||||
|
let random = thread_rng().gen();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
random: thread_rng().gen(),
|
random,
|
||||||
drives,
|
drives,
|
||||||
index,
|
index,
|
||||||
data: vec![],
|
data: vec![],
|
||||||
|
@ -135,6 +141,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
reallocated: Reallocated::default(),
|
reallocated: Reallocated::default(),
|
||||||
anticipated_size: 0,
|
anticipated_size: 0,
|
||||||
at_least_one_entry_deleted: false,
|
at_least_one_entry_deleted: false,
|
||||||
|
restartable_bucket,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -570,7 +577,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
count += 1;
|
count += 1;
|
||||||
// grow relative to the current capacity
|
// grow relative to the current capacity
|
||||||
let new_capacity = (current_capacity * 110 / 100).max(anticipated_size);
|
let new_capacity = (current_capacity * 110 / 100).max(anticipated_size);
|
||||||
let (mut index, _file_name) = BucketStorage::new_with_capacity(
|
let (mut index, file_name) = BucketStorage::new_with_capacity(
|
||||||
Arc::clone(&self.drives),
|
Arc::clone(&self.drives),
|
||||||
1,
|
1,
|
||||||
std::mem::size_of::<IndexEntry<T>>() as u64,
|
std::mem::size_of::<IndexEntry<T>>() as u64,
|
||||||
|
@ -596,13 +603,6 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
let new_elem: &mut IndexEntry<T> = index.get_mut(new_ix);
|
let new_elem: &mut IndexEntry<T> = index.get_mut(new_ix);
|
||||||
*new_elem = *elem;
|
*new_elem = *elem;
|
||||||
index.copying_entry(new_ix, &self.index, ix);
|
index.copying_entry(new_ix, &self.index, ix);
|
||||||
/*
|
|
||||||
let dbg_elem: IndexEntry = *new_elem;
|
|
||||||
assert_eq!(
|
|
||||||
Self::bucket_find_index_entry(&index, &elem.key, random).unwrap(),
|
|
||||||
(&dbg_elem, new_ix)
|
|
||||||
);
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if valid {
|
if valid {
|
||||||
|
@ -610,6 +610,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
||||||
let mut items = self.reallocated.items.lock().unwrap();
|
let mut items = self.reallocated.items.lock().unwrap();
|
||||||
items.index = Some(index);
|
items.index = Some(index);
|
||||||
self.reallocated.add_reallocation();
|
self.reallocated.add_reallocation();
|
||||||
|
self.restartable_bucket.set_file(file_name, self.random);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1075,7 +1076,13 @@ mod tests {
|
||||||
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
|
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
|
||||||
assert!(!paths.is_empty());
|
assert!(!paths.is_empty());
|
||||||
let max_search = 2;
|
let max_search = 2;
|
||||||
let mut bucket = Bucket::new(Arc::new(paths), max_search, Arc::default(), Arc::default());
|
let mut bucket = Bucket::new(
|
||||||
|
Arc::new(paths),
|
||||||
|
max_search,
|
||||||
|
Arc::default(),
|
||||||
|
Arc::default(),
|
||||||
|
RestartableBucket::default(),
|
||||||
|
);
|
||||||
|
|
||||||
let key = Pubkey::new_unique();
|
let key = Pubkey::new_unique();
|
||||||
assert_eq!(bucket.read_value(&key), None);
|
assert_eq!(bucket.read_value(&key), None);
|
||||||
|
|
|
@ -97,6 +97,7 @@ impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
|
||||||
self.max_search,
|
self.max_search,
|
||||||
Arc::clone(&self.stats),
|
Arc::clone(&self.stats),
|
||||||
Arc::clone(&self.count),
|
Arc::clone(&self.count),
|
||||||
|
self.restartable_bucket.clone(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue