DiskBuckets: remove unnecessary atomic on uid (#22039)
This commit is contained in:
parent
b36f7151fc
commit
1c0cd2cbb4
|
@ -185,7 +185,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bucket_create_key(
|
fn bucket_create_key(
|
||||||
index: &BucketStorage,
|
index: &mut BucketStorage,
|
||||||
key: &Pubkey,
|
key: &Pubkey,
|
||||||
elem_uid: Uid,
|
elem_uid: Uid,
|
||||||
random: u64,
|
random: u64,
|
||||||
|
@ -220,9 +220,9 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
Some(elem.ref_count)
|
Some(elem.ref_count)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_key(&self, key: &Pubkey) -> Result<u64, BucketMapError> {
|
fn create_key(&mut self, key: &Pubkey) -> Result<u64, BucketMapError> {
|
||||||
Self::bucket_create_key(
|
Self::bucket_create_key(
|
||||||
&self.index,
|
&mut self.index,
|
||||||
key,
|
key,
|
||||||
IndexEntry::key_uid(key),
|
IndexEntry::key_uid(key),
|
||||||
self.random,
|
self.random,
|
||||||
|
@ -260,12 +260,13 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
let elem_uid = self.index.uid_unchecked(elem_ix);
|
let elem_uid = self.index.uid_unchecked(elem_ix);
|
||||||
let bucket_ix = elem.data_bucket_ix();
|
let bucket_ix = elem.data_bucket_ix();
|
||||||
let current_bucket = &self.data[bucket_ix as usize];
|
let current_bucket = &self.data[bucket_ix as usize];
|
||||||
|
let num_slots = data.len() as u64;
|
||||||
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
|
if best_fit_bucket == bucket_ix && elem.num_slots > 0 {
|
||||||
// in place update
|
// in place update
|
||||||
let elem_loc = elem.data_loc(current_bucket);
|
let elem_loc = elem.data_loc(current_bucket);
|
||||||
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data.len() as u64);
|
let slice: &mut [T] = current_bucket.get_mut_cell_slice(elem_loc, data.len() as u64);
|
||||||
assert!(current_bucket.uid(elem_loc) == Some(elem_uid));
|
assert_eq!(current_bucket.uid(elem_loc), Some(elem_uid));
|
||||||
elem.num_slots = data.len() as u64;
|
elem.num_slots = num_slots;
|
||||||
slice.copy_from_slice(data);
|
slice.copy_from_slice(data);
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
@ -278,16 +279,19 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
let ix = i % cap;
|
let ix = i % cap;
|
||||||
if best_bucket.uid(ix).is_none() {
|
if best_bucket.uid(ix).is_none() {
|
||||||
let elem_loc = elem.data_loc(current_bucket);
|
let elem_loc = elem.data_loc(current_bucket);
|
||||||
if elem.num_slots > 0 {
|
let old_slots = elem.num_slots;
|
||||||
current_bucket.free(elem_loc, elem_uid);
|
|
||||||
}
|
|
||||||
elem.set_storage_offset(ix);
|
elem.set_storage_offset(ix);
|
||||||
elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
|
elem.set_storage_capacity_when_created_pow2(best_bucket.capacity_pow2);
|
||||||
elem.num_slots = data.len() as u64;
|
elem.num_slots = num_slots;
|
||||||
|
if old_slots > 0 {
|
||||||
|
let current_bucket = &mut self.data[bucket_ix as usize];
|
||||||
|
current_bucket.free(elem_loc, elem_uid);
|
||||||
|
}
|
||||||
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
|
//debug!( "DATA ALLOC {:?} {} {} {}", key, elem.data_location, best_bucket.capacity, elem_uid );
|
||||||
if elem.num_slots > 0 {
|
if num_slots > 0 {
|
||||||
|
let best_bucket = &mut self.data[best_fit_bucket as usize];
|
||||||
best_bucket.allocate(ix, elem_uid, false).unwrap();
|
best_bucket.allocate(ix, elem_uid, false).unwrap();
|
||||||
let slice = best_bucket.get_mut_cell_slice(ix, data.len() as u64);
|
let slice = best_bucket.get_mut_cell_slice(ix, num_slots);
|
||||||
slice.copy_from_slice(data);
|
slice.copy_from_slice(data);
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -301,8 +305,10 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
if let Some((elem, elem_ix)) = self.find_entry(key) {
|
if let Some((elem, elem_ix)) = self.find_entry(key) {
|
||||||
let elem_uid = self.index.uid_unchecked(elem_ix);
|
let elem_uid = self.index.uid_unchecked(elem_ix);
|
||||||
if elem.num_slots > 0 {
|
if elem.num_slots > 0 {
|
||||||
let data_bucket = &self.data[elem.data_bucket_ix() as usize];
|
let ix = elem.data_bucket_ix() as usize;
|
||||||
|
let data_bucket = &self.data[ix];
|
||||||
let loc = elem.data_loc(data_bucket);
|
let loc = elem.data_loc(data_bucket);
|
||||||
|
let data_bucket = &mut self.data[ix];
|
||||||
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
|
//debug!( "DATA FREE {:?} {} {} {}", key, elem.data_location, data_bucket.capacity, elem_uid );
|
||||||
data_bucket.free(loc, elem_uid);
|
data_bucket.free(loc, elem_uid);
|
||||||
}
|
}
|
||||||
|
@ -320,7 +326,7 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
//increasing the capacity by ^4 reduces the
|
//increasing the capacity by ^4 reduces the
|
||||||
//likelyhood of a re-index collision of 2^(max_search)^2
|
//likelyhood of a re-index collision of 2^(max_search)^2
|
||||||
//1 in 2^32
|
//1 in 2^32
|
||||||
let index = BucketStorage::new_with_capacity(
|
let mut index = BucketStorage::new_with_capacity(
|
||||||
Arc::clone(&self.drives),
|
Arc::clone(&self.drives),
|
||||||
1,
|
1,
|
||||||
std::mem::size_of::<IndexEntry>() as u64,
|
std::mem::size_of::<IndexEntry>() as u64,
|
||||||
|
@ -336,7 +342,8 @@ impl<T: Clone + Copy> Bucket<T> {
|
||||||
let uid = self.index.uid(ix);
|
let uid = self.index.uid(ix);
|
||||||
if let Some(uid) = uid {
|
if let Some(uid) = uid {
|
||||||
let elem: &IndexEntry = self.index.get(ix);
|
let elem: &IndexEntry = self.index.get(ix);
|
||||||
let new_ix = Self::bucket_create_key(&index, &elem.key, uid, random, true);
|
let new_ix =
|
||||||
|
Self::bucket_create_key(&mut index, &elem.key, uid, random, true);
|
||||||
if new_ix.is_err() {
|
if new_ix.is_err() {
|
||||||
valid = false;
|
valid = false;
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -41,25 +41,28 @@ pub(crate) type Uid = u64;
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
struct Header {
|
struct Header {
|
||||||
lock: AtomicU64,
|
lock: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Header {
|
impl Header {
|
||||||
fn try_lock(&self, uid: Uid) -> bool {
|
fn try_lock(&mut self, uid: Uid) -> bool {
|
||||||
Ok(UID_UNLOCKED)
|
if self.lock == UID_UNLOCKED {
|
||||||
== self
|
self.lock = uid;
|
||||||
.lock
|
true
|
||||||
.compare_exchange(UID_UNLOCKED, uid, Ordering::AcqRel, Ordering::Relaxed)
|
} else {
|
||||||
|
false
|
||||||
}
|
}
|
||||||
fn unlock(&self) -> Uid {
|
}
|
||||||
self.lock.swap(UID_UNLOCKED, Ordering::Release)
|
fn unlock(&mut self) -> Uid {
|
||||||
|
let old = self.lock;
|
||||||
|
self.lock = UID_UNLOCKED;
|
||||||
|
old
|
||||||
}
|
}
|
||||||
fn uid(&self) -> Option<Uid> {
|
fn uid(&self) -> Option<Uid> {
|
||||||
let result = self.lock.load(Ordering::Acquire);
|
if self.lock == UID_UNLOCKED {
|
||||||
if result == UID_UNLOCKED {
|
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(result)
|
Some(self.lock)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -141,6 +144,16 @@ impl BucketStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// return ref to header of item 'ix' in mmapped file
|
||||||
|
fn header_mut_ptr(&mut self, ix: u64) -> &mut Header {
|
||||||
|
let ix = (ix * self.cell_size) as usize;
|
||||||
|
let hdr_slice: &mut [u8] = &mut self.mmap[ix..ix + std::mem::size_of::<Header>()];
|
||||||
|
unsafe {
|
||||||
|
let hdr = hdr_slice.as_mut_ptr() as *mut Header;
|
||||||
|
hdr.as_mut().unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn uid(&self, ix: u64) -> Option<Uid> {
|
pub fn uid(&self, ix: u64) -> Option<Uid> {
|
||||||
assert!(ix < self.capacity(), "bad index size");
|
assert!(ix < self.capacity(), "bad index size");
|
||||||
self.header_ptr(ix).uid()
|
self.header_ptr(ix).uid()
|
||||||
|
@ -153,12 +166,17 @@ impl BucketStorage {
|
||||||
|
|
||||||
/// 'is_resizing' true if caller is resizing the index (so don't increment count)
|
/// 'is_resizing' true if caller is resizing the index (so don't increment count)
|
||||||
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
|
/// 'is_resizing' false if caller is adding an item to the index (so increment count)
|
||||||
pub fn allocate(&self, ix: u64, uid: Uid, is_resizing: bool) -> Result<(), BucketStorageError> {
|
pub fn allocate(
|
||||||
|
&mut self,
|
||||||
|
ix: u64,
|
||||||
|
uid: Uid,
|
||||||
|
is_resizing: bool,
|
||||||
|
) -> Result<(), BucketStorageError> {
|
||||||
assert!(ix < self.capacity(), "allocate: bad index size");
|
assert!(ix < self.capacity(), "allocate: bad index size");
|
||||||
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
|
assert!(UID_UNLOCKED != uid, "allocate: bad uid");
|
||||||
let mut e = Err(BucketStorageError::AlreadyAllocated);
|
let mut e = Err(BucketStorageError::AlreadyAllocated);
|
||||||
//debug!("ALLOC {} {}", ix, uid);
|
//debug!("ALLOC {} {}", ix, uid);
|
||||||
if self.header_ptr(ix).try_lock(uid) {
|
if self.header_mut_ptr(ix).try_lock(uid) {
|
||||||
e = Ok(());
|
e = Ok(());
|
||||||
if !is_resizing {
|
if !is_resizing {
|
||||||
self.count.fetch_add(1, Ordering::Relaxed);
|
self.count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
@ -167,10 +185,10 @@ impl BucketStorage {
|
||||||
e
|
e
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn free(&self, ix: u64, uid: Uid) {
|
pub fn free(&mut self, ix: u64, uid: Uid) {
|
||||||
assert!(ix < self.capacity(), "bad index size");
|
assert!(ix < self.capacity(), "bad index size");
|
||||||
assert!(UID_UNLOCKED != uid, "free: bad uid");
|
assert!(UID_UNLOCKED != uid, "free: bad uid");
|
||||||
let previous_uid = self.header_ptr(ix).unlock();
|
let previous_uid = self.header_mut_ptr(ix).unlock();
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
previous_uid, uid,
|
previous_uid, uid,
|
||||||
"free: unlocked a header with a differet uid: {}",
|
"free: unlocked a header with a differet uid: {}",
|
||||||
|
|
Loading…
Reference in New Issue