parent
fbfcc3febf
commit
4d15e774ee
|
@ -142,38 +142,43 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
Self::bucket_find_entry(&self.index, key, self.random)
|
Self::bucket_find_entry(&self.index, key, self.random)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_entry_mut(&self, key: &Pubkey) -> Option<(&mut IndexEntry, u64)> {
|
fn find_entry_mut<'a>(
|
||||||
Self::bucket_find_entry_mut(&self.index, key, self.random)
|
&'a self,
|
||||||
}
|
|
||||||
|
|
||||||
fn bucket_find_entry_mut<'a>(
|
|
||||||
index: &'a BucketStorage,
|
|
||||||
key: &Pubkey,
|
key: &Pubkey,
|
||||||
random: u64,
|
) -> Result<(bool, &'a mut IndexEntry, u64), BucketMapError> {
|
||||||
) -> Option<(&'a mut IndexEntry, u64)> {
|
let ix = Self::bucket_index_ix(&self.index, key, self.random);
|
||||||
|
let mut first_free = None;
|
||||||
let mut m = Measure::start("bucket_find_entry_mut");
|
let mut m = Measure::start("bucket_find_entry_mut");
|
||||||
let ix = Self::bucket_index_ix(index, key, random);
|
for i in ix..ix + self.index.max_search() {
|
||||||
for i in ix..ix + index.max_search() {
|
let ii = i % self.index.capacity();
|
||||||
let ii = i % index.capacity();
|
if self.index.is_free(ii) {
|
||||||
if index.is_free(ii) {
|
if first_free.is_none() {
|
||||||
|
first_free = Some(ii);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let elem: &mut IndexEntry = index.get_mut(ii);
|
let elem: &mut IndexEntry = self.index.get_mut(ii);
|
||||||
if elem.key == *key {
|
if elem.key == *key {
|
||||||
m.stop();
|
m.stop();
|
||||||
index
|
self.stats
|
||||||
.stats
|
.index
|
||||||
.find_entry_mut_us
|
.find_entry_mut_us
|
||||||
.fetch_add(m.as_us(), Ordering::Relaxed);
|
.fetch_add(m.as_us(), Ordering::Relaxed);
|
||||||
return Some((elem, ii));
|
return Ok((true, elem, ii));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.stop();
|
m.stop();
|
||||||
index
|
self.stats
|
||||||
.stats
|
.index
|
||||||
.find_entry_mut_us
|
.find_entry_mut_us
|
||||||
.fetch_add(m.as_us(), Ordering::Relaxed);
|
.fetch_add(m.as_us(), Ordering::Relaxed);
|
||||||
None
|
match first_free {
|
||||||
|
Some(ii) => {
|
||||||
|
let elem: &mut IndexEntry = self.index.get_mut(ii);
|
||||||
|
Ok((false, elem, ii))
|
||||||
|
}
|
||||||
|
None => Err(self.index_no_space()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bucket_find_entry<'a>(
|
fn bucket_find_entry<'a>(
|
||||||
|
@ -231,25 +236,23 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn addref(&mut self, key: &Pubkey) -> Option<RefCount> {
|
pub fn addref(&mut self, key: &Pubkey) -> Option<RefCount> {
|
||||||
let (elem, _) = self.find_entry_mut(key)?;
|
if let Ok((found, elem, _)) = self.find_entry_mut(key) {
|
||||||
|
if found {
|
||||||
elem.ref_count += 1;
|
elem.ref_count += 1;
|
||||||
Some(elem.ref_count)
|
return Some(elem.ref_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unref(&mut self, key: &Pubkey) -> Option<RefCount> {
|
pub fn unref(&mut self, key: &Pubkey) -> Option<RefCount> {
|
||||||
let (elem, _) = self.find_entry_mut(key)?;
|
if let Ok((found, elem, _)) = self.find_entry_mut(key) {
|
||||||
|
if found {
|
||||||
elem.ref_count -= 1;
|
elem.ref_count -= 1;
|
||||||
Some(elem.ref_count)
|
return Some(elem.ref_count);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
fn create_key(&mut self, key: &Pubkey) -> Result<u64, BucketMapError> {
|
None
|
||||||
Self::bucket_create_key(
|
|
||||||
&mut self.index,
|
|
||||||
key,
|
|
||||||
IndexEntry::key_uid(key),
|
|
||||||
self.random,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
|
pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
|
||||||
|
@ -258,6 +261,10 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
elem.read_value(self)
|
elem.read_value(self)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn index_no_space(&self) -> BucketMapError {
|
||||||
|
BucketMapError::IndexNoSpace(self.index.capacity_pow2)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn try_write(
|
pub fn try_write(
|
||||||
&mut self,
|
&mut self,
|
||||||
key: &Pubkey,
|
key: &Pubkey,
|
||||||
|
@ -269,15 +276,15 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
// fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated
|
// fail early if the data bucket we need doesn't exist - we don't want the index entry partially allocated
|
||||||
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
|
return Err(BucketMapError::DataNoSpace((best_fit_bucket, 0)));
|
||||||
}
|
}
|
||||||
let index_entry = self.find_entry_mut(key);
|
let (found, elem, elem_ix) = self.find_entry_mut(key)?;
|
||||||
let (elem, elem_ix) = match index_entry {
|
if !found {
|
||||||
None => {
|
let is_resizing = false;
|
||||||
let ii = self.create_key(key)?;
|
let elem_uid = IndexEntry::key_uid(key);
|
||||||
let elem: &mut IndexEntry = self.index.get_mut(ii);
|
self.index.allocate(elem_ix, elem_uid, is_resizing).unwrap();
|
||||||
(elem, ii)
|
// These fields will be overwritten after allocation by callers.
|
||||||
|
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
|
||||||
|
elem.init(key);
|
||||||
}
|
}
|
||||||
Some(res) => res,
|
|
||||||
};
|
|
||||||
elem.ref_count = ref_count;
|
elem.ref_count = ref_count;
|
||||||
let elem_uid = self.index.uid_unchecked(elem_ix);
|
let elem_uid = self.index.uid_unchecked(elem_ix);
|
||||||
let bucket_ix = elem.data_bucket_ix();
|
let bucket_ix = elem.data_bucket_ix();
|
||||||
|
|
|
@ -152,11 +152,12 @@ impl BucketStorage {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// return ref to header of item 'ix' in mmapped file
|
/// return ref to header of item 'ix' in mmapped file
|
||||||
fn header_mut_ptr(&mut self, ix: u64) -> &mut Header {
|
#[allow(clippy::mut_from_ref)]
|
||||||
|
fn header_mut_ptr(&self, ix: u64) -> &mut Header {
|
||||||
let ix = (ix * self.cell_size) as usize;
|
let ix = (ix * self.cell_size) as usize;
|
||||||
let hdr_slice: &mut [u8] = &mut self.mmap[ix..ix + std::mem::size_of::<Header>()];
|
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
|
||||||
unsafe {
|
unsafe {
|
||||||
let hdr = hdr_slice.as_mut_ptr() as *mut Header;
|
let hdr = hdr_slice.as_ptr() as *mut Header;
|
||||||
hdr.as_mut().unwrap()
|
hdr.as_mut().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,12 +182,7 @@ impl BucketStorage {
|
||||||
|
|
||||||
/// 'is_resizing' true if caller is resizing the index (so don't increment count)
|
/// 'is_resizing' true if caller is resizing the index (so don't increment count)
|
||||||
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
|
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
|
||||||
pub fn allocate(
|
pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> {
|
||||||
&mut self,
|
|
||||||
ix: u64,
|
|
||||||
uid: Uid,
|
|
||||||
is_resizing: bool,
|
|
||||||
) -> Result<(), BucketStorageError> {
|
|
||||||
assert!(ix < self.capacity(), "allocate: bad index size");
|
assert!(ix < self.capacity(), "allocate: bad index size");
|
||||||
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
|
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
|
||||||
let mut e = Err(BucketStorageError::AlreadyAllocated);
|
let mut e = Err(BucketStorageError::AlreadyAllocated);
|
||||||
|
|
Loading…
Reference in New Issue