rework sorting during generate_index (#33076)

* drag data len around during generate_index

* fix tests

* update comments
This commit is contained in:
Jeff Washington (jwash) 2023-08-31 12:51:56 -07:00 committed by GitHub
parent 7275caf708
commit 9a7d503fe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 71 deletions

View File

@ -103,7 +103,7 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<
flushing_active: AtomicBool,
/// info to streamline initial index generation
startup_info: StartupInfo<T>,
startup_info: StartupInfo<T, U>,
/// possible evictions for next few slots coming up
possible_evictions: RwLock<PossibleEvictions<T>>,
@ -140,9 +140,9 @@ struct StartupInfoDuplicates<T: IndexValue> {
}
#[derive(Default, Debug)]
struct StartupInfo<T: IndexValue> {
struct StartupInfo<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> {
/// entries to add next time we are flushing to disk
insert: Mutex<Vec<(Pubkey, (Slot, T))>>,
insert: Mutex<Vec<(Pubkey, (Slot, U))>>,
/// pubkeys with more than 1 entry
duplicates: Mutex<StartupInfoDuplicates<T>>,
}
@ -681,7 +681,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// todo: avoid reallocs and just allocate another vec instead of likely resizing this one over and over
items
.into_iter()
.for_each(|(k, (slot, v))| insert.push((k, (slot, v))));
.for_each(|(k, (slot, v))| insert.push((k, (slot, v.into()))));
}
pub fn insert_new_entry_if_missing_with_lock(
@ -1070,19 +1070,15 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// merge all items into the disk index now
let disk = self.bucket.as_ref().unwrap();
let mut count = insert.len() as u64;
for (k, entry, duplicate_entry) in disk.batch_insert_non_duplicates(
insert
.into_iter()
.map(|(k, (slot, v))| (k, (slot, v.into()))),
count as usize,
) {
duplicates.duplicates.push((entry.0, k, entry.1.into()));
for (i, duplicate_entry) in disk.batch_insert_non_duplicates(&insert) {
let (k, entry) = &insert[i];
duplicates.duplicates.push((entry.0, *k, entry.1.into()));
// accurately account for there being a duplicate for the first entry that was previously added to the disk index.
// That entry could not have known yet that it was a duplicate.
// It is important to capture each slot with a duplicate because of slot limits applied to clean.
duplicates
.duplicates_put_on_disk
.insert((duplicate_entry.0, k));
.insert((duplicate_entry.0, *k));
count -= 1;
}

View File

@ -281,35 +281,27 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
/// for each item in `items`, get the hash value when hashed with `random`.
/// Return a vec of tuples:
/// (hash_value, key, value)
fn index_entries(
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
random: u64,
) -> Vec<(u64, Pubkey, T)> {
let mut inserts = Vec::with_capacity(count);
items.for_each(|(key, v)| {
let ix = Self::bucket_index_ix(&key, random);
inserts.push((ix, key, v));
/// (hash_value, index in `items`)
fn index_entries(items: &[(Pubkey, T)], random: u64) -> Vec<(u64, usize)> {
let mut inserts = Vec::with_capacity(items.len());
items.iter().enumerate().for_each(|(i, (key, _v))| {
let ix = Self::bucket_index_ix(key, random);
inserts.push((ix, i));
});
inserts
}
/// insert all of `items` into the index.
/// return duplicates
pub(crate) fn batch_insert_non_duplicates(
&mut self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
/// batch insert of `items`. Assumption is a single slot list element and ref_count == 1.
/// For any pubkeys that already exist, the index in `items` of the failed insertion and the existing data (previously put in the index) are returned.
pub(crate) fn batch_insert_non_duplicates(&mut self, items: &[(Pubkey, T)]) -> Vec<(usize, T)> {
assert!(
!self.at_least_one_entry_deleted,
"efficient batch insertion can only occur prior to any deletes"
);
let current_len = self.index.count.load(Ordering::Relaxed);
let anticipated = count as u64;
let anticipated = items.len() as u64;
self.set_anticipated_count((anticipated).saturating_add(current_len));
let mut entries = Self::index_entries(items, count, self.random);
let mut entries = Self::index_entries(items, self.random);
let mut duplicates = Vec::default();
// insert, but resizes may be necessary
loop {
@ -322,6 +314,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
let result = Self::batch_insert_non_duplicates_internal(
&mut self.index,
&self.data,
items,
&mut entries,
&mut duplicates,
);
@ -330,7 +323,7 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
// everything added
self.set_anticipated_count(0);
self.index.count.fetch_add(
count.saturating_sub(duplicates.len()) as u64,
items.len().saturating_sub(duplicates.len()) as u64,
Ordering::Relaxed,
);
return duplicates;
@ -352,15 +345,17 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
pub fn batch_insert_non_duplicates_internal(
index: &mut BucketStorage<IndexBucket<T>>,
data_buckets: &[BucketStorage<DataBucket>],
reverse_sorted_entries: &mut Vec<(u64, Pubkey, T)>,
duplicates: &mut Vec<(Pubkey, T, T)>,
items: &[(Pubkey, T)],
reverse_sorted_entries: &mut Vec<(u64, usize)>,
duplicates: &mut Vec<(usize, T)>,
) -> Result<(), BucketMapError> {
let max_search = index.max_search();
let cap = index.capacity();
let search_end = max_search.min(cap);
// pop one entry at a time to insert
'outer: while let Some((ix_entry_raw, k, v)) = reverse_sorted_entries.pop() {
'outer: while let Some((ix_entry_raw, i)) = reverse_sorted_entries.pop() {
let (k, v) = &items[i];
let ix_entry = ix_entry_raw % cap;
// search for an empty spot starting at `ix_entry`
for search in 0..search_end {
@ -370,25 +365,25 @@ impl<'b, T: Clone + Copy + PartialEq + std::fmt::Debug + 'static> Bucket<T> {
// found free element and occupied it
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(index, &k);
elem.init(index, k);
// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 1
elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(&v));
elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(v));
continue 'outer; // this 'insertion' is completed: inserted successfully
} else {
// occupied, see if the key already exists here
if elem.key(index) == &k {
if elem.key(index) == k {
let (v_existing, _ref_count_existing) =
elem.read_value(index, data_buckets);
duplicates.push((k, v, *v_existing.first().unwrap()));
duplicates.push((i, *v_existing.first().unwrap()));
continue 'outer; // this 'insertion' is completed: found a duplicate entry
}
}
}
// search loop ended without finding a spot to insert this key
// so, remember the item we were trying to insert for next time after resizing
reverse_sorted_entries.push((ix_entry_raw, k, v));
reverse_sorted_entries.push((ix_entry_raw, i));
return Err(BucketMapError::IndexNoSpace(cap));
}
@ -774,14 +769,13 @@ mod tests {
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let hashed = Bucket::index_entries(&raw, random);
assert_eq!(hashed.len(), len);
(0..len).for_each(|i| {
let raw = raw[i];
let hashed = hashed[i];
assert_eq!(Bucket::<u64>::bucket_index_ix(&raw.0, random), hashed.0);
assert_eq!(raw.0, hashed.1);
assert_eq!(raw.1, hashed.2);
assert_eq!(i, hashed.1);
});
}
}
@ -815,7 +809,7 @@ mod tests {
for v in 10..12u64 {
for len in 1..4 {
let raw = (0..len).map(|l| (k, v + (l as u64))).collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let mut hashed = Bucket::index_entries(&raw, random);
let hashed_raw = hashed.clone();
let mut index = create_test_index(None);
@ -824,24 +818,25 @@ mod tests {
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&raw,
&mut hashed,
&mut duplicates,
)
.is_ok());
assert_eq!(duplicates.len(), len - 1);
assert_eq!(duplicates.len(), len as usize - 1);
assert_eq!(hashed.len(), 0);
let single_hashed_raw_inserted = hashed_raw.last().unwrap();
let elem =
IndexEntryPlaceInBucket::new(single_hashed_raw_inserted.0 % index.capacity());
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[single_hashed_raw_inserted.2]);
assert_eq!(value, &[raw[single_hashed_raw_inserted.1].1]);
let expected_duplicates = hashed_raw
.iter()
.rev()
.skip(1)
.map(|(_hash, k, v)| (*k, *v, single_hashed_raw_inserted.2))
.map(|(_hash, i)| (*i, raw[single_hashed_raw_inserted.1].1))
.collect::<Vec<_>>();
assert_eq!(expected_duplicates, duplicates);
}
@ -862,7 +857,7 @@ mod tests {
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let mut hashed = Bucket::index_entries(&raw, random);
let hashed_raw = hashed.clone();
let mut index = create_test_index(None);
@ -871,6 +866,7 @@ mod tests {
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&raw,
&mut hashed,
&mut duplicates,
)
@ -878,11 +874,11 @@ mod tests {
assert_eq!(hashed.len(), 0);
(0..len).for_each(|i| {
let raw = hashed_raw[i];
let elem = IndexEntryPlaceInBucket::new(raw.0 % index.capacity());
let raw2 = hashed_raw[i];
let elem = IndexEntryPlaceInBucket::new(raw2.0 % index.capacity());
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[hashed_raw[i].2]);
assert_eq!(value, &[raw[hashed_raw[i].1].1]);
});
}
}
@ -905,7 +901,7 @@ mod tests {
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let mut hashed = Bucket::index_entries(&raw, random);
let common_ix = 2; // both are put at same ix
hashed.iter_mut().for_each(|v| {
v.0 = common_ix;
@ -918,6 +914,7 @@ mod tests {
let result = Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&raw,
&mut hashed,
&mut duplicates,
);
@ -933,7 +930,7 @@ mod tests {
} else {
result.is_ok()
});
let raw = hashed_raw[i];
let raw2 = hashed_raw[i];
if i == 0 && len > max_search {
// max search was exceeded and the first entry was unable to be inserted, so it remained in `hashed`
assert_eq!(hashed[0], hashed_raw[0]);
@ -941,11 +938,11 @@ mod tests {
// we insert in reverse order when ix values are equal, so we expect to find item[1] in item[1]'s expected ix and item[0] will be 1 search distance away from expected ix
let search_required = (len - i - 1) as u64;
let elem = IndexEntryPlaceInBucket::new(
(raw.0 + search_required) % index.capacity(),
(raw2.0 + search_required) % index.capacity(),
);
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[hashed_raw[i].2]);
assert_eq!(value, &[raw[hashed_raw[i].1].1]);
}
});
}
@ -970,6 +967,6 @@ mod tests {
bucket.update(&key, |_| Some((vec![0], 0)));
bucket.delete_key(&key);
bucket.batch_insert_non_duplicates(std::iter::empty(), 0);
bucket.batch_insert_non_duplicates(&[]);
}
}

View File

@ -125,17 +125,10 @@ impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
}
/// batch insert of `items`. Assumption is a single slot list element and ref_count == 1.
/// For any pubkeys that already exist, the failed insertion data and the existing data are returned.
pub fn batch_insert_non_duplicates(
&self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
/// For any pubkeys that already exist, the index in `items` of the failed insertion and the existing data (previously put in the index) are returned.
pub fn batch_insert_non_duplicates(&self, items: &[(Pubkey, T)]) -> Vec<(usize, T)> {
let mut bucket = self.get_write_bucket();
bucket
.as_mut()
.unwrap()
.batch_insert_non_duplicates(items, count)
bucket.as_mut().unwrap().batch_insert_non_duplicates(items)
}
pub fn update<F>(&self, key: &Pubkey, updatefn: F)

View File

@ -507,13 +507,9 @@ mod tests {
);
duplicates += 1;
}
let count = batch_additions.len();
assert_eq!(
map.get_bucket_from_index(0)
.batch_insert_non_duplicates(
batch_additions.into_iter(),
count,
)
.batch_insert_non_duplicates(&batch_additions,)
.len(),
duplicates
);