add filter to AppendVecScan (#25664)

This commit is contained in:
Jeff Washington (jwash) 2022-06-01 11:55:18 -05:00 committed by GitHub
parent 17995c7e67
commit 905fef29cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 19 deletions

View File

@ -1716,6 +1716,8 @@ type GenerateIndexAccountsMap<'a> = HashMap<Pubkey, IndexAccountMapEntry<'a>>;
/// called on a struct while scanning append vecs
trait AppendVecScan: Send + Sync + Clone {
/// return true if this pubkey should be included
fn filter(&mut self, pubkey: &Pubkey) -> bool;
/// set current slot of the scan
fn set_slot(&mut self, slot: Slot);
/// found `account` in the append vec
@ -1749,12 +1751,17 @@ struct ScanState<'a, T: Fn(Slot) -> Option<Slot> + Sync + Send + Clone> {
filler_account_suffix: Option<&'a Pubkey>,
range: usize,
sort_time: Arc<AtomicU64>,
pubkey_to_bin_index: usize,
}
impl<'a, T: Fn(Slot) -> Option<Slot> + Sync + Send + Clone> AppendVecScan for ScanState<'a, T> {
fn set_slot(&mut self, slot: Slot) {
self.current_slot = slot;
}
fn filter(&mut self, pubkey: &Pubkey) -> bool {
self.pubkey_to_bin_index = self.bin_calculator.bin_from_pubkey(pubkey);
self.bin_range.contains(&self.pubkey_to_bin_index)
}
fn init_accum(&mut self, count: usize) {
if self.accum.is_empty() {
self.accum.append(&mut vec![Vec::new(); count]);
@ -1762,13 +1769,10 @@ impl<'a, T: Fn(Slot) -> Option<Slot> + Sync + Send + Clone> AppendVecScan for Sc
}
fn found_account(&mut self, loaded_account: &LoadedAccount) {
let pubkey = loaded_account.pubkey();
let mut pubkey_to_bin_index = self.bin_calculator.bin_from_pubkey(pubkey);
if !self.bin_range.contains(&pubkey_to_bin_index) {
return;
}
assert!(self.bin_range.contains(&self.pubkey_to_bin_index)); // get rid of this once we have confidence
// when we are scanning with bin ranges, we don't need to use exact bin numbers. Subtract to make first bin we care about at index 0.
pubkey_to_bin_index -= self.bin_range.start;
self.pubkey_to_bin_index -= self.bin_range.start;
let raw_lamports = loaded_account.lamports();
let zero_raw_lamports = raw_lamports == 0;
@ -1809,7 +1813,7 @@ impl<'a, T: Fn(Slot) -> Option<Slot> + Sync + Send + Clone> AppendVecScan for Sc
}
}
self.init_accum(self.range);
self.accum[pubkey_to_bin_index].push(source_item);
self.accum[self.pubkey_to_bin_index].push(source_item);
}
fn scanning_complete(self) -> BinnedHashData {
let (result, timing) = AccountsDb::sort_slot_storage_scan(self.accum);
@ -6069,8 +6073,11 @@ impl AccountsDb {
let mut len = storages.len();
if len == 1 {
// only 1 storage, so no need to interleave between multiple storages based on write_version
AppendVecAccountsIter::new(&storages[0].accounts)
.for_each(|account| scanner.found_account(&LoadedAccount::Stored(account)));
AppendVecAccountsIter::new(&storages[0].accounts).for_each(|account| {
if scanner.filter(&account.meta.pubkey) {
scanner.found_account(&LoadedAccount::Stored(account))
}
});
} else {
// we have to call the scan_func in order of write_version within a slot if there are multiple storages per slot
let mut progress = Vec::with_capacity(len);
@ -6095,10 +6102,19 @@ impl AccountsDb {
min = *item;
}
}
let mut account: (StoredMetaWriteVersion, Option<StoredAccountMeta<'_>>) =
(0, None);
std::mem::swap(&mut account, &mut current[min_index]);
scanner.found_account(&LoadedAccount::Stored(account.1.unwrap()));
let found_account = &mut current[min_index];
if scanner.filter(
&found_account
.1
.as_ref()
.map(|stored_account| stored_account.meta.pubkey)
.unwrap(), // will always be 'Some'
) {
let mut account: (StoredMetaWriteVersion, Option<StoredAccountMeta<'_>>) =
(0, None);
std::mem::swap(&mut account, found_account);
scanner.found_account(&LoadedAccount::Stored(account.1.unwrap()));
}
let next = progress[min_index].next().map(|stored_account| {
(stored_account.meta.write_version, Some(stored_account))
});
@ -6296,13 +6312,15 @@ impl AccountsDb {
{
let keys = slot_cache.get_all_pubkeys();
for key in keys {
if let Some(cached_account) = slot_cache.get_cloned(&key) {
let mut accessor = LoadedAccountAccessor::Cached(Some(
Cow::Owned(cached_account),
));
let account = accessor.get_loaded_account().unwrap();
scanner.found_account(&account);
};
if scanner.filter(&key) {
if let Some(cached_account) = slot_cache.get_cloned(&key) {
let mut accessor = LoadedAccountAccessor::Cached(Some(
Cow::Owned(cached_account),
));
let account = accessor.get_loaded_account().unwrap();
scanner.found_account(&account);
}
}
}
}
}
@ -6485,6 +6503,7 @@ impl AccountsDb {
bin_range,
stats,
sort_time: sort_time.clone(),
pubkey_to_bin_index: 0,
};
let result: Vec<BinnedHashData> = self.scan_account_storage_no_bank(
@ -8988,6 +9007,9 @@ pub mod tests {
}
impl AppendVecScan for TestScan {
fn filter(&mut self, _pubkey: &Pubkey) -> bool {
true
}
fn set_slot(&mut self, slot: Slot) {
self.current_slot = slot;
}
@ -9227,6 +9249,9 @@ pub mod tests {
fn set_slot(&mut self, slot: Slot) {
self.current_slot = slot;
}
fn filter(&mut self, _pubkey: &Pubkey) -> bool {
true
}
fn init_accum(&mut self, _count: usize) {}
fn found_account(&mut self, loaded_account: &LoadedAccount) {
self.calls.fetch_add(1, Ordering::Relaxed);