disk idx: try to reuse disk index's exisiting data on startup (#33388)
* disk idx: try to reuse disk index's exisiting data on startup * add tests * fix test and add test * update test comments * update comments
This commit is contained in:
parent
85cc6ace05
commit
027f3dc6de
|
@ -9,7 +9,7 @@ use {
|
|||
},
|
||||
index_entry::{
|
||||
DataBucket, IndexBucket, IndexEntry, IndexEntryPlaceInBucket, MultipleSlots,
|
||||
OccupiedEnum,
|
||||
OccupiedEnum, OccupyIfMatches,
|
||||
},
|
||||
restart::RestartableBucket,
|
||||
MaxSearch, RefCount,
|
||||
|
@ -360,6 +360,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
|||
&mut entries,
|
||||
&mut entries_created_on_disk,
|
||||
&mut duplicates,
|
||||
self.reused_file_at_startup,
|
||||
);
|
||||
match result {
|
||||
Ok(_result) => {
|
||||
|
@ -392,7 +393,56 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// sort `entries` by hash value
|
||||
/// insert every entry in `reverse_sorted_entries` into the index as long as we can find a location where the data in the index
|
||||
/// file already matches the data we want to insert for the pubkey.
|
||||
/// for every entry that already exists in `index`, add it (and the value already in the index) to `duplicates`
|
||||
/// `reverse_sorted_entries` is (raw index (range = U64::MAX) in hash map, index in `items`)
|
||||
/// Any entries where the disk couldn't be updated are returned in `reverse_sorted_entries` or `duplicates`.
|
||||
/// The remaining items in `reverse_sorted_entries` can be inserted by over-writing non-matchingnew data to the index file.
|
||||
pub fn batch_insert_non_duplicates_reusing_file(
|
||||
index: &mut BucketStorage<IndexBucket<T>>,
|
||||
data_buckets: &[BucketStorage<DataBucket>],
|
||||
items: &[(Pubkey, T)],
|
||||
reverse_sorted_entries: &mut Vec<(u64, usize)>,
|
||||
duplicates: &mut Vec<(usize, T)>,
|
||||
) {
|
||||
let max_search = index.max_search();
|
||||
let cap = index.capacity();
|
||||
let search_end = max_search.min(cap);
|
||||
let mut not_found = Vec::default();
|
||||
// pop one entry at a time to insert
|
||||
'outer: while let Some((ix_entry_raw, ix)) = reverse_sorted_entries.pop() {
|
||||
let (k, v) = &items[ix];
|
||||
// search for an empty spot starting at `ix_entry`
|
||||
for search in 0..search_end {
|
||||
let ix_index = (ix_entry_raw + search) % cap;
|
||||
let elem = IndexEntryPlaceInBucket::new(ix_index);
|
||||
match elem.occupy_if_matches(index, v, k) {
|
||||
OccupyIfMatches::SuccessfulInit => {}
|
||||
OccupyIfMatches::FoundDuplicate => {
|
||||
// pubkey is same, and it is occupied, so we found a duplicate
|
||||
let (v_existing, _ref_count_existing) =
|
||||
elem.read_value(index, data_buckets);
|
||||
// someone is already allocated with this pubkey, so we found a duplicate
|
||||
duplicates.push((ix, *v_existing.first().unwrap()));
|
||||
}
|
||||
OccupyIfMatches::PubkeyMismatch => {
|
||||
// fall through and look at next search value
|
||||
continue;
|
||||
}
|
||||
}
|
||||
continue 'outer; // this 'insertion' is completed - either because we found a duplicate or we occupied an entry in the file
|
||||
}
|
||||
// this pubkey did not exist in the file already and we exhausted the search space, so have to try the old way
|
||||
not_found.push((ix_entry_raw, ix));
|
||||
}
|
||||
// now add all entries that were not found
|
||||
// they were pushed in order since we popped off input
|
||||
// So, to keep them 'reversed', we need to reverse them here.
|
||||
// This isn't required for correctness, but fits the optimal iteration order.
|
||||
*reverse_sorted_entries = not_found.into_iter().rev().collect();
|
||||
}
|
||||
|
||||
/// insert as much of `entries` as possible into `index`.
|
||||
/// return an error if the index needs to resize.
|
||||
/// for every entry that already exists in `index`, add it (and the value already in the index) to `duplicates`
|
||||
|
@ -404,15 +454,26 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
|||
reverse_sorted_entries: &mut Vec<(u64, usize)>,
|
||||
entries_created_on_disk: &mut usize,
|
||||
duplicates: &mut Vec<(usize, T)>,
|
||||
try_to_reuse_disk_data: bool,
|
||||
) -> Result<(), BucketMapError> {
|
||||
if reverse_sorted_entries.is_empty() {
|
||||
return Ok(());
|
||||
if try_to_reuse_disk_data {
|
||||
// First, insert everything we can into disk contents that already have the right pubkey and hopefully the right data.
|
||||
// Ideally this results in no disk updates to insert these entries.
|
||||
// Any entries that were unable to be inserted would remain in `reverse_sorted_entries` so that we fall through and insert those
|
||||
// in any free slot we find.
|
||||
Self::batch_insert_non_duplicates_reusing_file(
|
||||
index,
|
||||
data_buckets,
|
||||
items,
|
||||
reverse_sorted_entries,
|
||||
duplicates,
|
||||
);
|
||||
}
|
||||
let max_search = index.max_search();
|
||||
let cap = index.capacity();
|
||||
let search_end = max_search.min(cap);
|
||||
|
||||
// pop one entry at a time to insert
|
||||
// pop one entry at a time to insert in the first free location we find
|
||||
'outer: while let Some((ix_entry_raw, i)) = reverse_sorted_entries.pop() {
|
||||
let (k, v) = &items[i];
|
||||
let ix_entry = ix_entry_raw % cap;
|
||||
|
@ -422,7 +483,11 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
|||
let elem = IndexEntryPlaceInBucket::new(ix_index);
|
||||
if index.try_lock(ix_index) {
|
||||
*entries_created_on_disk += 1;
|
||||
// found free element and occupied it
|
||||
// found free element and occupied it.
|
||||
// Note that since we are in the startup phase where we only add and do not remove, it is NOT possible to find this same pubkey AFTER
|
||||
// the index we started searching at, or we would have found it as occupied BEFORE we were able to lock it here.
|
||||
// This precondition is not true once we are able to delete entries.
|
||||
|
||||
// These fields will be overwritten after allocation by callers.
|
||||
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
|
||||
elem.init(index, k);
|
||||
|
@ -823,6 +888,358 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
|
|||
mod tests {
|
||||
use {super::*, crate::index_entry::OccupyIfMatches, tempfile::tempdir};
|
||||
|
||||
#[test]
|
||||
fn test_batch_insert_non_duplicates_reusing_file_many_entries() {
|
||||
// 3 variations of reuse
|
||||
for reuse_type in 0..3 {
|
||||
let data_buckets = Vec::default();
|
||||
let v = 12u64;
|
||||
let random = 1;
|
||||
// with random=1, 6 entries is the most that don't collide on a single hash % cap value.
|
||||
for len in 0..7 {
|
||||
log::error!("testing with {len}");
|
||||
// cannot use pubkey [0,0,...] because that matches a zeroed out default file contents.
|
||||
let raw = (0..len)
|
||||
.map(|l| (Pubkey::from([(l + 1) as u8; 32]), v + (l as u64)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let hashed_raw = hashed.clone();
|
||||
|
||||
let tmpdir = tempdir().unwrap();
|
||||
let paths: Arc<Vec<PathBuf>> = Arc::new(vec![tmpdir.path().to_path_buf()]);
|
||||
assert!(!paths.is_empty());
|
||||
let max_search = 2;
|
||||
let (mut index, file_name) = BucketStorage::<IndexBucket<u64>>::new(
|
||||
paths.clone(),
|
||||
1,
|
||||
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64,
|
||||
max_search,
|
||||
Arc::default(),
|
||||
Arc::default(),
|
||||
);
|
||||
index.delete_file_on_drop = false;
|
||||
let cap = index.capacity();
|
||||
|
||||
hashed.sort_unstable_by(|a, b| (a.0 % cap).cmp(&(b.0 % cap)).reverse());
|
||||
hashed.windows(2).for_each(|two| {
|
||||
assert_ne!(two[0].0 % cap, two[1].0 % cap, "{two:?}, cap: {cap}");
|
||||
});
|
||||
|
||||
// file is blank, so nothing matches, so everything returned in `hashed` to retry.
|
||||
let mut duplicates = Vec::default();
|
||||
let mut entries_created = 0;
|
||||
// insert normally
|
||||
Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
assert!(hashed.is_empty());
|
||||
assert!(duplicates.is_empty());
|
||||
|
||||
hashed_raw.iter().for_each(|(hash, i)| {
|
||||
let (k, v) = raw[*i];
|
||||
let ix = *hash % cap;
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
assert_eq!(entry.key(&index), &k);
|
||||
assert_eq!(
|
||||
entry.get_slot_count_enum(&index),
|
||||
OccupiedEnum::OneSlotInIndex(&v)
|
||||
);
|
||||
});
|
||||
|
||||
drop(index);
|
||||
let path = paths.first().unwrap().join(file_name.to_string());
|
||||
let mut index = BucketStorage::<IndexBucket<u64>>::load_on_restart(
|
||||
path,
|
||||
NonZeroU64::new(
|
||||
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64
|
||||
)
|
||||
.unwrap(),
|
||||
max_search,
|
||||
Arc::default(),
|
||||
Arc::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// verify index file is unoccupied, but that contents match
|
||||
hashed_raw.iter().for_each(|(hash, i)| {
|
||||
let (k, _v) = raw[*i];
|
||||
let ix = *hash % cap;
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
assert_eq!(entry.key(&index), &k);
|
||||
assert_eq!(entry.get_slot_count_enum(&index), OccupiedEnum::Free);
|
||||
});
|
||||
|
||||
// this was wiped out by the last call to batch_insert..., so recreate it.
|
||||
hashed = hashed_raw.clone();
|
||||
let mut duplicates = Vec::default();
|
||||
if reuse_type == 0 {
|
||||
Bucket::<u64>::batch_insert_non_duplicates_reusing_file(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut duplicates,
|
||||
);
|
||||
} else if reuse_type == 1 {
|
||||
// just overwrite all data instead of trying to reuse it
|
||||
let mut entries_created = 0;
|
||||
_ = Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
false,
|
||||
);
|
||||
assert_eq!(entries_created, hashed_raw.len());
|
||||
} else if reuse_type == 2 {
|
||||
// call the higher level fn
|
||||
// That fn will call batch_insert_non_duplicates_reusing_file.
|
||||
// The inner fn should insert everything, reusing data, so there should be no entries created.
|
||||
let mut entries_created = 0;
|
||||
_ = Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
// call re-use code first
|
||||
true,
|
||||
);
|
||||
assert_eq!(entries_created, 0);
|
||||
}
|
||||
assert!(hashed.is_empty());
|
||||
assert!(duplicates.is_empty());
|
||||
|
||||
hashed_raw.iter().for_each(|(hash, i)| {
|
||||
let (k, v) = raw[*i];
|
||||
let ix = *hash % cap;
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
assert_eq!(entry.key(&index), &k);
|
||||
assert_eq!(
|
||||
entry.get_slot_count_enum(&index),
|
||||
OccupiedEnum::OneSlotInIndex(&v),
|
||||
"i: {i}"
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_insert_non_duplicates_reusing_file_blank_file() {
|
||||
let data_buckets = Vec::default();
|
||||
let v = 12u64;
|
||||
let random = 1;
|
||||
for len in 1..4 {
|
||||
// cannot use pubkey [0,0,...] because that matches a zeroed out default file contents.
|
||||
let raw = (0..len)
|
||||
.map(|l| (Pubkey::from([(l + 1) as u8; 32]), v + (l as u64)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let hashed_raw = hashed.clone();
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
|
||||
let cap = index.capacity();
|
||||
let ix = hashed[0].0 % cap;
|
||||
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
|
||||
// file is blank, so nothing matches, so everything returned in `hashed` to retry.
|
||||
let mut duplicates = Vec::default();
|
||||
Bucket::<u64>::batch_insert_non_duplicates_reusing_file(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut duplicates,
|
||||
);
|
||||
|
||||
assert_eq!(entry.get_slot_count_enum(&index), OccupiedEnum::Free);
|
||||
assert_eq!(entry.key(&index), &Pubkey::default());
|
||||
assert_eq!(hashed, hashed_raw, "len: {len}");
|
||||
assert!(duplicates.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[should_panic(expected = "index asked to insert the same data twice")]
|
||||
#[test]
|
||||
fn test_batch_insert_non_duplicates_reusing_file_insert_twice() {
|
||||
let data_buckets = Vec::default();
|
||||
let v = 12u64;
|
||||
let random = 1;
|
||||
// cannot use pubkey [0,0,...] because that matches a zeroed out default file contents.
|
||||
let len = 1;
|
||||
let raw = (0..len)
|
||||
.map(|l| (Pubkey::from([(l + 1) as u8; 32]), v + (l as u64)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
let cap = index.capacity();
|
||||
let ix = hashed[0].0 % cap;
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
entry.init(&mut index, &raw[0].0);
|
||||
entry.set_slot_count_enum_value(&mut index, OccupiedEnum::OneSlotInIndex(&raw[0].1));
|
||||
|
||||
let mut duplicates = Vec::default();
|
||||
// this will assert because the same k,v pair are already occupied in the index.
|
||||
Bucket::<u64>::batch_insert_non_duplicates_reusing_file(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut duplicates,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_insert_non_duplicates_reusing_file_insert_duplicate() {
|
||||
let data_buckets = Vec::default();
|
||||
let v = 12u64;
|
||||
let random = 1;
|
||||
// cannot use pubkey [0,0,...] because that matches a zeroed out default file contents.
|
||||
let len = 1;
|
||||
let raw = (0..len)
|
||||
.map(|l| (Pubkey::from([(l + 1) as u8; 32]), v + (l as u64)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
let cap = index.capacity();
|
||||
let ix = hashed[0].0 % cap;
|
||||
|
||||
// occupy the index data entry with same pubkey, different value.
|
||||
// This causes it to be treated as a duplicate.
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
entry.init(&mut index, &(raw[0].0));
|
||||
let non_matching_v = raw[0].1 + 1;
|
||||
entry.set_slot_count_enum_value(&mut index, OccupiedEnum::OneSlotInIndex(&non_matching_v));
|
||||
|
||||
// since the same key is already in use with a different value, it is a duplicate
|
||||
let mut duplicates = Vec::default();
|
||||
Bucket::<u64>::batch_insert_non_duplicates_reusing_file(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut duplicates,
|
||||
);
|
||||
assert_eq!(
|
||||
entry.get_slot_count_enum(&index),
|
||||
OccupiedEnum::OneSlotInIndex(&non_matching_v)
|
||||
);
|
||||
|
||||
assert!(hashed.is_empty());
|
||||
assert_eq!(duplicates, vec![(0, non_matching_v)], "len: {len}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_insert_non_duplicates_reusing_file_skip_one() {
|
||||
let data_buckets = Vec::default();
|
||||
let v = 12u64;
|
||||
let random = 1;
|
||||
// cannot use pubkey [0,0,...] because that matches a zeroed out default file contents.
|
||||
let len = 1;
|
||||
let mut raw = (0..len + 1)
|
||||
.map(|l| (Pubkey::from([(l + 1) as u8; 32]), v + (l as u64)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let other = raw.pop().unwrap();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
let cap = index.capacity();
|
||||
let ix = hashed[0].0 % cap;
|
||||
|
||||
// occupy the index data entry with a different pubkey
|
||||
// This causes it to be skipped.
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
entry.init(&mut index, &(other.0));
|
||||
let entry = IndexEntryPlaceInBucket::new(ix + 1);
|
||||
// sets pubkey value and enum value of ZeroSlots. Leaving it at zero causes issues.
|
||||
entry.init(&mut index, &(raw[0].0));
|
||||
// marks as free but does not clear out pubkey data in the file. This simulates finding the correct pubkey in the data file in a free entry and occupying it.
|
||||
entry.set_slot_count_enum_value(&mut index, OccupiedEnum::Free);
|
||||
|
||||
// since the same key is already in use with a different value, it is a duplicate
|
||||
let mut duplicates = Vec::default();
|
||||
Bucket::<u64>::batch_insert_non_duplicates_reusing_file(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut duplicates,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
entry.get_slot_count_enum(&index),
|
||||
OccupiedEnum::OneSlotInIndex(&raw[0].1)
|
||||
);
|
||||
|
||||
assert!(hashed.is_empty());
|
||||
assert!(duplicates.is_empty());
|
||||
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
assert_eq!(entry.key(&index), &other.0);
|
||||
let entry = IndexEntryPlaceInBucket::new(ix + 1);
|
||||
assert_eq!(entry.key(&index), &raw[0].0);
|
||||
}
|
||||
|
||||
#[should_panic(expected = "called `Option::unwrap()` on a `None` value")]
|
||||
#[test]
|
||||
fn test_batch_insert_non_duplicates_reusing_file_existing_zero() {
|
||||
let data_buckets = Vec::default();
|
||||
let v = 12u64;
|
||||
let random = 1;
|
||||
// cannot use pubkey [0,0,...] because that matches a zeroed out default file contents.
|
||||
let len = 1;
|
||||
let mut raw = (0..len + 1)
|
||||
.map(|l| (Pubkey::from([(l + 1) as u8; 32]), v + (l as u64)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let other = raw.pop().unwrap();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
let cap = index.capacity();
|
||||
let ix = hashed[0].0 % cap;
|
||||
|
||||
// occupy the index data entry with a different pubkey
|
||||
// This causes it to be skipped.
|
||||
let entry = IndexEntryPlaceInBucket::new(ix);
|
||||
entry.init(&mut index, &(other.0));
|
||||
let entry = IndexEntryPlaceInBucket::new(ix + 1);
|
||||
// sets pubkey value and enum value of ZeroSlots. Leaving it at zero is illegal at startup, so we'll assert when we find this duplicate.
|
||||
entry.init(&mut index, &(raw[0].0));
|
||||
|
||||
// since the same key is already in use with a different value, it is a duplicate.
|
||||
// But, it is a zero length entry. This is not supported at startup. Startup would have never generated a zero length occupied entry.
|
||||
// So, it is ok for this to assert.
|
||||
let mut duplicates = Vec::default();
|
||||
Bucket::<u64>::batch_insert_non_duplicates_reusing_file(
|
||||
&mut index,
|
||||
&data_buckets,
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut duplicates,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_entries() {
|
||||
for v in 10..12u64 {
|
||||
|
@ -865,105 +1282,62 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn batch_insert_duplicates_internal_simple() {
|
||||
solana_logger::setup();
|
||||
// add the same duplicate key several times.
|
||||
// make sure the resulting index and returned `duplicates` is correct.
|
||||
let random = 1;
|
||||
let data_buckets = Vec::default();
|
||||
let k = Pubkey::from([1u8; 32]);
|
||||
for v in 10..12u64 {
|
||||
for len in 1..4 {
|
||||
let raw = (0..len).map(|l| (k, v + (l as u64))).collect::<Vec<_>>();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let hashed_raw = hashed.clone();
|
||||
for try_to_reuse_disk_data in [false, true] {
|
||||
// add the same duplicate key several times.
|
||||
// make sure the resulting index and returned `duplicates` is correct.
|
||||
let random = 1;
|
||||
let data_buckets = Vec::default();
|
||||
let k = Pubkey::from([1u8; 32]);
|
||||
for v in 10..12u64 {
|
||||
for len in 1..4 {
|
||||
let raw = (0..len).map(|l| (k, v + (l as u64))).collect::<Vec<_>>();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let hashed_raw = hashed.clone();
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
let mut index = create_test_index(None);
|
||||
|
||||
let mut entries_created = 0;
|
||||
let mut duplicates = Vec::default();
|
||||
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&Vec::default(),
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
)
|
||||
.is_ok());
|
||||
let mut entries_created = 0;
|
||||
let mut duplicates = Vec::default();
|
||||
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&Vec::default(),
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
try_to_reuse_disk_data,
|
||||
)
|
||||
.is_ok());
|
||||
|
||||
assert_eq!(duplicates.len(), len as usize - 1);
|
||||
assert_eq!(hashed.len(), 0);
|
||||
let single_hashed_raw_inserted = hashed_raw.last().unwrap();
|
||||
let elem =
|
||||
IndexEntryPlaceInBucket::new(single_hashed_raw_inserted.0 % index.capacity());
|
||||
let (value, ref_count) = elem.read_value(&index, &data_buckets);
|
||||
assert_eq!(ref_count, 1);
|
||||
assert_eq!(value, &[raw[single_hashed_raw_inserted.1].1]);
|
||||
let expected_duplicates = hashed_raw
|
||||
.iter()
|
||||
.rev()
|
||||
.skip(1)
|
||||
.map(|(_hash, i)| (*i, raw[single_hashed_raw_inserted.1].1))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(expected_duplicates, duplicates);
|
||||
assert_eq!(duplicates.len(), len as usize - 1);
|
||||
assert_eq!(hashed.len(), 0);
|
||||
let single_hashed_raw_inserted = hashed_raw.last().unwrap();
|
||||
let elem = IndexEntryPlaceInBucket::new(
|
||||
single_hashed_raw_inserted.0 % index.capacity(),
|
||||
);
|
||||
let (value, ref_count) = elem.read_value(&index, &data_buckets);
|
||||
assert_eq!(ref_count, 1);
|
||||
assert_eq!(value, &[raw[single_hashed_raw_inserted.1].1]);
|
||||
let expected_duplicates = hashed_raw
|
||||
.iter()
|
||||
.rev()
|
||||
.skip(1)
|
||||
.map(|(_hash, i)| (*i, raw[single_hashed_raw_inserted.1].1))
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(expected_duplicates, duplicates);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batch_insert_non_duplicates_internal_simple() {
|
||||
solana_logger::setup();
|
||||
// add 2 entries, make sure they are added in the buckets we expect
|
||||
let random = 1;
|
||||
let data_buckets = Vec::default();
|
||||
for v in 10..12u64 {
|
||||
for len in 1..3 {
|
||||
let raw = (0..len)
|
||||
.map(|l| {
|
||||
let k = Pubkey::from([l as u8; 32]);
|
||||
(k, v + (l as u64))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let hashed_raw = hashed.clone();
|
||||
|
||||
let mut index = create_test_index(None);
|
||||
|
||||
let mut duplicates = Vec::default();
|
||||
let mut entries_created = 0;
|
||||
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&Vec::default(),
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
)
|
||||
.is_ok());
|
||||
|
||||
assert_eq!(hashed.len(), 0);
|
||||
(0..len).for_each(|i| {
|
||||
let raw2 = hashed_raw[i];
|
||||
let elem = IndexEntryPlaceInBucket::new(raw2.0 % index.capacity());
|
||||
let (value, ref_count) = elem.read_value(&index, &data_buckets);
|
||||
assert_eq!(ref_count, 1);
|
||||
assert_eq!(value, &[raw[hashed_raw[i].1].1]);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batch_insert_non_duplicates_internal_same_ix_exceeds_max_search() {
|
||||
solana_logger::setup();
|
||||
// add `len` entries with the same ix, make sure they are added in subsequent buckets.
|
||||
// adjust `max_search`. If we try to add an entry that causes us to exceed `max_search`, then assert that the adding fails with an error and
|
||||
// the colliding item remains in `entries`
|
||||
let random = 1;
|
||||
let data_buckets = Vec::default();
|
||||
for max_search in [2usize, 3] {
|
||||
for try_to_reuse_disk_data in [false, true] {
|
||||
// add 2 entries, make sure they are added in the buckets we expect
|
||||
let random = 1;
|
||||
let data_buckets = Vec::default();
|
||||
for v in 10..12u64 {
|
||||
for len in 1..(max_search + 1) {
|
||||
for len in 1..3 {
|
||||
let raw = (0..len)
|
||||
.map(|l| {
|
||||
let k = Pubkey::from([l as u8; 32]);
|
||||
|
@ -971,56 +1345,107 @@ mod tests {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let common_ix = 2; // both are put at same ix
|
||||
hashed.iter_mut().for_each(|v| {
|
||||
v.0 = common_ix;
|
||||
});
|
||||
let hashed_raw = hashed.clone();
|
||||
|
||||
let mut index = create_test_index(Some(max_search as u8));
|
||||
let mut index = create_test_index(None);
|
||||
|
||||
let mut duplicates = Vec::default();
|
||||
let mut entries_created = 0;
|
||||
let result = Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&Vec::default(),
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
);
|
||||
try_to_reuse_disk_data,
|
||||
)
|
||||
.is_ok());
|
||||
|
||||
assert_eq!(
|
||||
hashed.len(),
|
||||
if len > max_search { 1 } else { 0 },
|
||||
"len: {len}"
|
||||
);
|
||||
assert_eq!(hashed.len(), 0);
|
||||
(0..len).for_each(|i| {
|
||||
assert!(if len > max_search {
|
||||
result.is_err()
|
||||
} else {
|
||||
result.is_ok()
|
||||
});
|
||||
let raw2 = hashed_raw[i];
|
||||
if i == 0 && len > max_search {
|
||||
// max search was exceeded and the first entry was unable to be inserted, so it remained in `hashed`
|
||||
assert_eq!(hashed[0], hashed_raw[0]);
|
||||
} else {
|
||||
// we insert in reverse order when ix values are equal, so we expect to find item[1] in item[1]'s expected ix and item[0] will be 1 search distance away from expected ix
|
||||
let search_required = (len - i - 1) as u64;
|
||||
let elem = IndexEntryPlaceInBucket::new(
|
||||
(raw2.0 + search_required) % index.capacity(),
|
||||
);
|
||||
let (value, ref_count) = elem.read_value(&index, &data_buckets);
|
||||
assert_eq!(ref_count, 1);
|
||||
assert_eq!(value, &[raw[hashed_raw[i].1].1]);
|
||||
}
|
||||
let elem = IndexEntryPlaceInBucket::new(raw2.0 % index.capacity());
|
||||
let (value, ref_count) = elem.read_value(&index, &data_buckets);
|
||||
assert_eq!(ref_count, 1);
|
||||
assert_eq!(value, &[raw[hashed_raw[i].1].1]);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batch_insert_non_duplicates_internal_same_ix_exceeds_max_search() {
|
||||
for try_to_reuse_disk_data in [false, true] {
|
||||
// add `len` entries with the same ix, make sure they are added in subsequent buckets.
|
||||
// adjust `max_search`. If we try to add an entry that causes us to exceed `max_search`, then assert that the adding fails with an error and
|
||||
// the colliding item remains in `entries`
|
||||
let random = 1;
|
||||
let data_buckets = Vec::default();
|
||||
for max_search in [2usize, 3] {
|
||||
for v in 10..12u64 {
|
||||
for len in 1..(max_search + 1) {
|
||||
let raw = (0..len)
|
||||
.map(|l| {
|
||||
// +1 because pubkey[0,0,...] matches default contents of index file
|
||||
let k = Pubkey::from([(l + 1) as u8; 32]);
|
||||
(k, v + (l as u64))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut hashed = Bucket::index_entries(&raw, random);
|
||||
let common_ix = 2; // both are put at same ix
|
||||
hashed.iter_mut().for_each(|v| {
|
||||
v.0 = common_ix;
|
||||
});
|
||||
let hashed_raw = hashed.clone();
|
||||
|
||||
let mut index = create_test_index(Some(max_search as u8));
|
||||
|
||||
let mut duplicates = Vec::default();
|
||||
let mut entries_created = 0;
|
||||
let result = Bucket::<u64>::batch_insert_non_duplicates_internal(
|
||||
&mut index,
|
||||
&Vec::default(),
|
||||
&raw,
|
||||
&mut hashed,
|
||||
&mut entries_created,
|
||||
&mut duplicates,
|
||||
try_to_reuse_disk_data,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
hashed.len(),
|
||||
if len > max_search { 1 } else { 0 },
|
||||
"len: {len}"
|
||||
);
|
||||
(0..len).for_each(|i| {
|
||||
assert!(if len > max_search {
|
||||
result.is_err()
|
||||
} else {
|
||||
result.is_ok()
|
||||
});
|
||||
let raw2 = hashed_raw[i];
|
||||
if i == 0 && len > max_search {
|
||||
// max search was exceeded and the first entry was unable to be inserted, so it remained in `hashed`
|
||||
assert_eq!(hashed[0], hashed_raw[0]);
|
||||
} else {
|
||||
// we insert in reverse order when ix values are equal, so we expect to find item[1] in item[1]'s expected ix and item[0] will be 1 search distance away from expected ix
|
||||
let search_required = (len - i - 1) as u64;
|
||||
let elem = IndexEntryPlaceInBucket::new(
|
||||
(raw2.0 + search_required) % index.capacity(),
|
||||
);
|
||||
let (value, ref_count) = elem.read_value(&index, &data_buckets);
|
||||
assert_eq!(ref_count, 1);
|
||||
assert_eq!(value, &[raw[hashed_raw[i].1].1]);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_occupy_if_matches() {
|
||||
let random = 1;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
//! Persistent info of disk index files to allow files to be reused on restart.
|
||||
#![allow(dead_code)]
|
||||
use {
|
||||
crate::bucket_map::{BucketMapConfig, MAX_SEARCH_DEFAULT},
|
||||
bytemuck::{Pod, Zeroable},
|
||||
|
|
Loading…
Reference in New Issue