SortedStoragesIter to remove random access requirement (#22713)

This commit is contained in:
Jeff Washington (jwash) 2022-01-27 10:25:13 -06:00 committed by GitHub
parent 75658e2a96
commit 18d69c8d2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 191 additions and 13 deletions

View File

@ -5336,8 +5336,7 @@ impl AccountsDb {
let mut load_from_cache = true;
let mut hasher = std::collections::hash_map::DefaultHasher::new(); // wrong one?
for slot in start..end {
let sub_storages = snapshot_storages.get(slot);
for (slot, sub_storages) in snapshot_storages.iter_range(start..end) {
bin_range.start.hash(&mut hasher);
bin_range.end.hash(&mut hasher);
if let Some(sub_storages) = sub_storages {
@ -5395,8 +5394,7 @@ impl AccountsDb {
}
}
for slot in start..end {
let sub_storages = snapshot_storages.get(slot);
for (slot, sub_storages) in snapshot_storages.iter_range(start..end) {
let valid_slot = sub_storages.is_some();
if let Some((cache, ancestors, accounts_index)) = accounts_cache_and_ancestors {
if let Some(slot_cache) = cache.slot_cache(slot) {
@ -5452,8 +5450,8 @@ impl AccountsDb {
let acceptable_straggler_slot_count = 100; // do nothing special for these old stores which will likely get cleaned up shortly
let sub = slots_per_epoch + acceptable_straggler_slot_count;
let in_epoch_range_start = max.saturating_sub(sub);
for slot in storages.range().start..in_epoch_range_start {
if let Some(storages) = storages.get(slot) {
for (slot, storages) in storages.iter_range(..in_epoch_range_start) {
if let Some(storages) = storages {
storages.iter().for_each(|store| {
self.dirty_stores
.insert((slot, store.append_vec_id()), store.clone());

View File

@ -1,17 +1,32 @@
use {
crate::accounts_db::SnapshotStorage, log::*, solana_measure::measure::Measure,
solana_sdk::clock::Slot, std::ops::Range,
crate::accounts_db::SnapshotStorage,
log::*,
solana_measure::measure::Measure,
solana_sdk::clock::Slot,
std::ops::Range,
std::ops::{Bound, RangeBounds},
};
/// Provide access to SnapshotStorages sorted by slot
pub struct SortedStorages<'a> {
/// range of slots where storages exist (likely sparse)
range: Range<Slot>,
/// the actual storages. index is (slot - range.start)
storages: Vec<Option<&'a SnapshotStorage>>,
slot_count: usize,
storage_count: usize,
}
impl<'a> SortedStorages<'a> {
pub fn get(&self, slot: Slot) -> Option<&SnapshotStorage> {
/// primary method of retreiving (Slot, SnapshotStorage)
pub fn iter_range<R>(&'a self, range: R) -> SortedStoragesIter<'a>
where
R: RangeBounds<Slot>,
{
SortedStoragesIter::new(self, range)
}
fn get(&self, slot: Slot) -> Option<&SnapshotStorage> {
if !self.range.contains(&slot) {
None
} else {
@ -52,10 +67,9 @@ impl<'a> SortedStorages<'a> {
Self::new_with_slots(source.iter().zip(slots.iter()), None, None)
}
// source[i] is in slot slots[i]
// assumptions:
// 1. slots vector contains unique slot #s.
// 2. slots and source are the same len
/// create `SortedStorages` from 'source' iterator.
/// 'source' contains a SnapshotStorage and its associated slot
/// 'source' does not have to be sorted in any way, but is assumed to not have duplicate slot #s
pub fn new_with_slots<'b>(
source: impl Iterator<Item = (&'a SnapshotStorage, &'b Slot)> + Clone,
// A slot used as a lower bound, but potentially smaller than the smallest slot in the given 'source' iterator
@ -120,6 +134,66 @@ impl<'a> SortedStorages<'a> {
}
}
/// Iterator over successive slots in 'storages' within 'range'.
/// This enforces sequential access so that random access does not have to be implemented.
/// Random access could be expensive with large sparse sets.
pub struct SortedStoragesIter<'a> {
/// range for the iterator to iterate over (start_inclusive..end_exclusive)
range: Range<Slot>,
/// the data to return per slot
storages: &'a SortedStorages<'a>,
/// the slot to be returned the next time 'Next' is called
next_slot: Slot,
}
impl<'a> Iterator for SortedStoragesIter<'a> {
type Item = (Slot, Option<&'a SnapshotStorage>);
fn next(&mut self) -> Option<Self::Item> {
let slot = self.next_slot;
if slot < self.range.end {
// iterator is still in range. Storage may or may not exist at this slot, but the iterator still needs to return the slot
self.next_slot += 1;
Some((slot, self.storages.get(slot)))
} else {
// iterator passed the end of the range, so from now on it returns None
None
}
}
}
impl<'a> SortedStoragesIter<'a> {
pub fn new<R: RangeBounds<Slot>>(
storages: &'a SortedStorages<'a>,
range: R,
) -> SortedStoragesIter<'a> {
let storage_range = storages.range();
let next_slot = match range.start_bound() {
Bound::Unbounded => {
storage_range.start // unbounded beginning starts with the min known slot (which is inclusive)
}
Bound::Included(x) => *x,
Bound::Excluded(x) => *x + 1, // make inclusive
};
let end_exclusive_slot = match range.end_bound() {
Bound::Unbounded => {
storage_range.end // unbounded end ends with the max known slot (which is exclusive)
}
Bound::Included(x) => *x + 1, // make exclusive
Bound::Excluded(x) => *x,
};
// Note that the range can be outside the range of known storages.
// This is because the storages may not be the only source of valid slots.
// The write cache is another source of slots that 'storages' knows nothing about.
let range = next_slot..end_exclusive_slot;
SortedStoragesIter {
range,
storages,
next_slot,
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
@ -144,6 +218,112 @@ pub mod tests {
}
}
#[test]
fn test_sorted_storages_range_iter() {
let storages = SortedStorages::new_with_slots([].iter().zip([].iter()), None, None);
let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| {
assert!(storages.is_none());
slot
};
assert_eq!(
(0..5).into_iter().collect::<Vec<_>>(),
storages.iter_range(..5).map(check).collect::<Vec<_>>()
);
assert_eq!(
(1..5).into_iter().collect::<Vec<_>>(),
storages.iter_range(1..5).map(check).collect::<Vec<_>>()
);
assert_eq!(
(0..0).into_iter().collect::<Vec<_>>(),
storages.iter_range(..).map(check).collect::<Vec<_>>()
);
assert_eq!(
(0..0).into_iter().collect::<Vec<_>>(),
storages.iter_range(1..).map(check).collect::<Vec<_>>()
);
// only item is slot 3
let s1 = Vec::new();
let storages =
SortedStorages::new_with_slots([&s1].into_iter().zip([&3].into_iter()), None, None);
let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| {
assert!(
(slot != 3) ^ storages.is_some(),
"slot: {}, storages: {:?}",
slot,
storages
);
slot
};
for start in 0..5 {
for end in 0..5 {
assert_eq!(
(start..end).into_iter().collect::<Vec<_>>(),
storages
.iter_range(start..end)
.map(check)
.collect::<Vec<_>>()
);
}
}
assert_eq!(
(3..5).into_iter().collect::<Vec<_>>(),
storages.iter_range(..5).map(check).collect::<Vec<_>>()
);
assert_eq!(
(1..=3).into_iter().collect::<Vec<_>>(),
storages.iter_range(1..).map(check).collect::<Vec<_>>()
);
assert_eq!(
(3..=3).into_iter().collect::<Vec<_>>(),
storages.iter_range(..).map(check).collect::<Vec<_>>()
);
// items in slots 2 and 4
let s2 = Vec::with_capacity(2);
let s4 = Vec::with_capacity(4);
let storages = SortedStorages::new_with_slots(
[&s2, &s4].into_iter().zip([&2, &4].into_iter()),
None,
None,
);
let check = |(slot, storages): (Slot, Option<&SnapshotStorage>)| {
assert!(
(slot != 2 && slot != 4)
^ storages
.map(|storages| storages.capacity() == (slot as usize))
.unwrap_or(false),
"slot: {}, storages: {:?}",
slot,
storages
);
slot
};
for start in 0..5 {
for end in 0..5 {
assert_eq!(
(start..end).into_iter().collect::<Vec<_>>(),
storages
.iter_range(start..end)
.map(check)
.collect::<Vec<_>>()
);
}
}
assert_eq!(
(2..5).into_iter().collect::<Vec<_>>(),
storages.iter_range(..5).map(check).collect::<Vec<_>>()
);
assert_eq!(
(1..=4).into_iter().collect::<Vec<_>>(),
storages.iter_range(1..).map(check).collect::<Vec<_>>()
);
assert_eq!(
(2..=4).into_iter().collect::<Vec<_>>(),
storages.iter_range(..).map(check).collect::<Vec<_>>()
);
}
#[test]
#[should_panic(expected = "SnapshotStorage.is_empty()")]
fn test_sorted_storages_empty() {