runtime: remove `Default` req on account scan interfaces (#28533)

This commit is contained in:
Trent Nelson 2022-10-21 16:53:06 -07:00 committed by GitHub
parent 434c8ed416
commit 1fbd818647
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 106 deletions

View File

@ -742,10 +742,11 @@ impl Accounts {
if num == 0 { if num == 0 {
return Ok(vec![]); return Ok(vec![]);
} }
let account_balances = self.accounts_db.scan_accounts( let mut account_balances = BinaryHeap::new();
self.accounts_db.scan_accounts(
ancestors, ancestors,
bank_id, bank_id,
|collector: &mut BinaryHeap<Reverse<(u64, Pubkey)>>, option| { |option| {
if let Some((pubkey, account, _slot)) = option { if let Some((pubkey, account, _slot)) = option {
if account.lamports() == 0 { if account.lamports() == 0 {
return; return;
@ -758,16 +759,16 @@ impl Accounts {
if !collect { if !collect {
return; return;
} }
if collector.len() == num { if account_balances.len() == num {
let Reverse(entry) = collector let Reverse(entry) = account_balances
.peek() .peek()
.expect("BinaryHeap::peek should succeed when len > 0"); .expect("BinaryHeap::peek should succeed when len > 0");
if *entry >= (account.lamports(), *pubkey) { if *entry >= (account.lamports(), *pubkey) {
return; return;
} }
collector.pop(); account_balances.pop();
} }
collector.push(Reverse((account.lamports(), *pubkey))); account_balances.push(Reverse((account.lamports(), *pubkey)));
} }
}, },
&ScanConfig::default(), &ScanConfig::default(),
@ -878,16 +879,19 @@ impl Accounts {
program_id: &Pubkey, program_id: &Pubkey,
config: &ScanConfig, config: &ScanConfig,
) -> ScanResult<Vec<TransactionAccount>> { ) -> ScanResult<Vec<TransactionAccount>> {
self.accounts_db.scan_accounts( let mut collector = Vec::new();
ancestors, self.accounts_db
bank_id, .scan_accounts(
|collector: &mut Vec<TransactionAccount>, some_account_tuple| { ancestors,
Self::load_while_filtering(collector, some_account_tuple, |account| { bank_id,
account.owner() == program_id |some_account_tuple| {
}) Self::load_while_filtering(&mut collector, some_account_tuple, |account| {
}, account.owner() == program_id
config, })
) },
config,
)
.map(|_| collector)
} }
pub fn load_by_program_with_filter<F: Fn(&AccountSharedData) -> bool>( pub fn load_by_program_with_filter<F: Fn(&AccountSharedData) -> bool>(
@ -898,16 +902,19 @@ impl Accounts {
filter: F, filter: F,
config: &ScanConfig, config: &ScanConfig,
) -> ScanResult<Vec<TransactionAccount>> { ) -> ScanResult<Vec<TransactionAccount>> {
self.accounts_db.scan_accounts( let mut collector = Vec::new();
ancestors, self.accounts_db
bank_id, .scan_accounts(
|collector: &mut Vec<TransactionAccount>, some_account_tuple| { ancestors,
Self::load_while_filtering(collector, some_account_tuple, |account| { bank_id,
account.owner() == program_id && filter(account) |some_account_tuple| {
}) Self::load_while_filtering(&mut collector, some_account_tuple, |account| {
}, account.owner() == program_id && filter(account)
config, })
) },
config,
)
.map(|_| collector)
} }
fn calc_scan_result_size(account: &AccountSharedData) -> usize { fn calc_scan_result_size(account: &AccountSharedData) -> usize {
@ -957,14 +964,15 @@ impl Accounts {
) -> ScanResult<Vec<TransactionAccount>> { ) -> ScanResult<Vec<TransactionAccount>> {
let sum = AtomicUsize::default(); let sum = AtomicUsize::default();
let config = config.recreate_with_abort(); let config = config.recreate_with_abort();
let mut collector = Vec::new();
let result = self let result = self
.accounts_db .accounts_db
.index_scan_accounts( .index_scan_accounts(
ancestors, ancestors,
bank_id, bank_id,
*index_key, *index_key,
|collector: &mut Vec<TransactionAccount>, some_account_tuple| { |some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| { Self::load_while_filtering(&mut collector, some_account_tuple, |account| {
let use_account = filter(account); let use_account = filter(account);
if use_account if use_account
&& Self::accumulate_and_check_scan_result_size( && Self::accumulate_and_check_scan_result_size(
@ -981,7 +989,7 @@ impl Accounts {
}, },
&config, &config,
) )
.map(|result| result.0); .map(|_| collector);
Self::maybe_abort_scan(result, &config) Self::maybe_abort_scan(result, &config)
} }
@ -994,18 +1002,21 @@ impl Accounts {
ancestors: &Ancestors, ancestors: &Ancestors,
bank_id: BankId, bank_id: BankId,
) -> ScanResult<Vec<PubkeyAccountSlot>> { ) -> ScanResult<Vec<PubkeyAccountSlot>> {
self.accounts_db.scan_accounts( let mut collector = Vec::new();
ancestors, self.accounts_db
bank_id, .scan_accounts(
|collector: &mut Vec<PubkeyAccountSlot>, some_account_tuple| { ancestors,
if let Some((pubkey, account, slot)) = some_account_tuple bank_id,
.filter(|(_, account, _)| Self::is_loadable(account.lamports())) |some_account_tuple| {
{ if let Some((pubkey, account, slot)) = some_account_tuple
collector.push((*pubkey, account, slot)) .filter(|(_, account, _)| Self::is_loadable(account.lamports()))
} {
}, collector.push((*pubkey, account, slot))
&ScanConfig::default(), }
) },
&ScanConfig::default(),
)
.map(|_| collector)
} }
pub fn hold_range_in_memory<R>( pub fn hold_range_in_memory<R>(
@ -1026,15 +1037,15 @@ impl Accounts {
ancestors: &Ancestors, ancestors: &Ancestors,
range: R, range: R,
) -> Vec<PubkeyAccountSlot> { ) -> Vec<PubkeyAccountSlot> {
let mut collector = Vec::new();
self.accounts_db.range_scan_accounts( self.accounts_db.range_scan_accounts(
"", // disable logging of this. We now parallelize it and this results in multiple parallel logs "", // disable logging of this. We now parallelize it and this results in multiple parallel logs
ancestors, ancestors,
range, range,
&ScanConfig::new(true), &ScanConfig::new(true),
|collector: &mut Vec<PubkeyAccountSlot>, option| { |option| Self::load_with_slot(&mut collector, option),
Self::load_with_slot(collector, option) );
}, collector
)
} }
/// Slow because lock is held for 1 operation instead of many. /// Slow because lock is held for 1 operation instead of many.

View File

@ -4607,19 +4607,16 @@ impl AccountsDb {
} }
} }
pub fn scan_accounts<F, A>( pub fn scan_accounts<F>(
&self, &self,
ancestors: &Ancestors, ancestors: &Ancestors,
bank_id: BankId, bank_id: BankId,
scan_func: F, mut scan_func: F,
config: &ScanConfig, config: &ScanConfig,
) -> ScanResult<A> ) -> ScanResult<()>
where where
F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>), F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
A: Default,
{ {
let mut collector = A::default();
// This can error out if the slots being scanned over are aborted // This can error out if the slots being scanned over are aborted
self.accounts_index.scan_accounts( self.accounts_index.scan_accounts(
ancestors, ancestors,
@ -4629,26 +4626,23 @@ impl AccountsDb {
.get_account_accessor(slot, pubkey, &account_info.storage_location()) .get_account_accessor(slot, pubkey, &account_info.storage_location())
.get_loaded_account() .get_loaded_account()
.map(|loaded_account| (pubkey, loaded_account.take_account(), slot)); .map(|loaded_account| (pubkey, loaded_account.take_account(), slot));
scan_func(&mut collector, account_slot) scan_func(account_slot)
}, },
config, config,
)?; )?;
Ok(collector) Ok(())
} }
pub fn unchecked_scan_accounts<F, A>( pub fn unchecked_scan_accounts<F>(
&self, &self,
metric_name: &'static str, metric_name: &'static str,
ancestors: &Ancestors, ancestors: &Ancestors,
scan_func: F, mut scan_func: F,
config: &ScanConfig, config: &ScanConfig,
) -> A ) where
where F: FnMut(&Pubkey, LoadedAccount, Slot),
F: Fn(&mut A, (&Pubkey, LoadedAccount, Slot)),
A: Default,
{ {
let mut collector = A::default();
self.accounts_index.unchecked_scan_accounts( self.accounts_index.unchecked_scan_accounts(
metric_name, metric_name,
ancestors, ancestors,
@ -4657,29 +4651,25 @@ impl AccountsDb {
.get_account_accessor(slot, pubkey, &account_info.storage_location()) .get_account_accessor(slot, pubkey, &account_info.storage_location())
.get_loaded_account() .get_loaded_account()
{ {
scan_func(&mut collector, (pubkey, loaded_account, slot)); scan_func(pubkey, loaded_account, slot);
} }
}, },
config, config,
); );
collector
} }
/// Only guaranteed to be safe when called from rent collection /// Only guaranteed to be safe when called from rent collection
pub fn range_scan_accounts<F, A, R>( pub fn range_scan_accounts<F, R>(
&self, &self,
metric_name: &'static str, metric_name: &'static str,
ancestors: &Ancestors, ancestors: &Ancestors,
range: R, range: R,
config: &ScanConfig, config: &ScanConfig,
scan_func: F, mut scan_func: F,
) -> A ) where
where F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>),
A: Default,
R: RangeBounds<Pubkey> + std::fmt::Debug, R: RangeBounds<Pubkey> + std::fmt::Debug,
{ {
let mut collector = A::default();
self.accounts_index.range_scan_accounts( self.accounts_index.range_scan_accounts(
metric_name, metric_name,
ancestors, ancestors,
@ -4700,24 +4690,22 @@ impl AccountsDb {
.get_loaded_account() .get_loaded_account()
.map(|loaded_account| (pubkey, loaded_account.take_account(), slot)) .map(|loaded_account| (pubkey, loaded_account.take_account(), slot))
{ {
scan_func(&mut collector, Some(account_slot)) scan_func(Some(account_slot))
} }
}, },
); );
collector
} }
pub fn index_scan_accounts<F, A>( pub fn index_scan_accounts<F>(
&self, &self,
ancestors: &Ancestors, ancestors: &Ancestors,
bank_id: BankId, bank_id: BankId,
index_key: IndexKey, index_key: IndexKey,
scan_func: F, mut scan_func: F,
config: &ScanConfig, config: &ScanConfig,
) -> ScanResult<(A, bool)> ) -> ScanResult<bool>
where where
F: Fn(&mut A, Option<(&Pubkey, AccountSharedData, Slot)>), F: FnMut(Option<(&Pubkey, AccountSharedData, Slot)>),
A: Default,
{ {
let key = match &index_key { let key = match &index_key {
IndexKey::ProgramId(key) => key, IndexKey::ProgramId(key) => key,
@ -4727,11 +4715,10 @@ impl AccountsDb {
if !self.account_indexes.include_key(key) { if !self.account_indexes.include_key(key) {
// the requested key was not indexed in the secondary index, so do a normal scan // the requested key was not indexed in the secondary index, so do a normal scan
let used_index = false; let used_index = false;
let scan_result = self.scan_accounts(ancestors, bank_id, scan_func, config)?; self.scan_accounts(ancestors, bank_id, scan_func, config)?;
return Ok((scan_result, used_index)); return Ok(used_index);
} }
let mut collector = A::default();
self.accounts_index.index_scan_accounts( self.accounts_index.index_scan_accounts(
ancestors, ancestors,
bank_id, bank_id,
@ -4741,12 +4728,12 @@ impl AccountsDb {
.get_account_accessor(slot, pubkey, &account_info.storage_location()) .get_account_accessor(slot, pubkey, &account_info.storage_location())
.get_loaded_account() .get_loaded_account()
.map(|loaded_account| (pubkey, loaded_account.take_account(), slot)); .map(|loaded_account| (pubkey, loaded_account.take_account(), slot));
scan_func(&mut collector, account_slot) scan_func(account_slot)
}, },
config, config,
)?; )?;
let used_index = true; let used_index = true;
Ok((collector, used_index)) Ok(used_index)
} }
/// Scan a specific slot through all the account storage in parallel /// Scan a specific slot through all the account storage in parallel
@ -10551,11 +10538,12 @@ pub mod tests {
&account1 &account1
); );
let accounts: Vec<AccountSharedData> = db.unchecked_scan_accounts( let mut accounts = Vec::new();
db.unchecked_scan_accounts(
"", "",
&ancestors, &ancestors,
|accounts: &mut Vec<AccountSharedData>, option| { |_, account, _| {
accounts.push(option.1.take_account()); accounts.push(account.take_account());
}, },
&ScanConfig::default(), &ScanConfig::default(),
); );
@ -11482,40 +11470,42 @@ pub mod tests {
keys: [mint_key].iter().cloned().collect::<HashSet<Pubkey>>(), keys: [mint_key].iter().cloned().collect::<HashSet<Pubkey>>(),
}); });
// Secondary index can't be used - do normal scan: should still find both pubkeys // Secondary index can't be used - do normal scan: should still find both pubkeys
let found_accounts = accounts let mut found_accounts = HashSet::new();
let used_index = accounts
.index_scan_accounts( .index_scan_accounts(
&Ancestors::default(), &Ancestors::default(),
bank_id, bank_id,
index_key, index_key,
|collection: &mut HashSet<Pubkey>, account| { |account| {
collection.insert(*account.unwrap().0); found_accounts.insert(*account.unwrap().0);
}, },
&ScanConfig::default(), &ScanConfig::default(),
) )
.unwrap(); .unwrap();
assert!(!found_accounts.1); assert!(!used_index);
assert_eq!(found_accounts.0.len(), 2); assert_eq!(found_accounts.len(), 2);
assert!(found_accounts.0.contains(&pubkey1)); assert!(found_accounts.contains(&pubkey1));
assert!(found_accounts.0.contains(&pubkey2)); assert!(found_accounts.contains(&pubkey2));
accounts.account_indexes.keys = None; accounts.account_indexes.keys = None;
// Secondary index can now be used since it isn't marked as excluded // Secondary index can now be used since it isn't marked as excluded
let found_accounts = accounts let mut found_accounts = HashSet::new();
let used_index = accounts
.index_scan_accounts( .index_scan_accounts(
&Ancestors::default(), &Ancestors::default(),
bank_id, bank_id,
index_key, index_key,
|collection: &mut HashSet<Pubkey>, account| { |account| {
collection.insert(*account.unwrap().0); found_accounts.insert(*account.unwrap().0);
}, },
&ScanConfig::default(), &ScanConfig::default(),
) )
.unwrap(); .unwrap();
assert!(found_accounts.1); assert!(used_index);
assert_eq!(found_accounts.0.len(), 2); assert_eq!(found_accounts.len(), 2);
assert!(found_accounts.0.contains(&pubkey1)); assert!(found_accounts.contains(&pubkey1));
assert!(found_accounts.0.contains(&pubkey2)); assert!(found_accounts.contains(&pubkey2));
accounts.account_indexes.keys = None; accounts.account_indexes.keys = None;
} }
@ -12124,22 +12114,24 @@ pub mod tests {
db.store_uncached(1, &[(&key1, &account1)]); db.store_uncached(1, &[(&key1, &account1)]);
let ancestors = vec![(0, 0)].into_iter().collect(); let ancestors = vec![(0, 0)].into_iter().collect();
let accounts: Vec<AccountSharedData> = db.unchecked_scan_accounts( let mut accounts = Vec::new();
db.unchecked_scan_accounts(
"", "",
&ancestors, &ancestors,
|accounts: &mut Vec<AccountSharedData>, option| { |_, account, _| {
accounts.push(option.1.take_account()); accounts.push(account.take_account());
}, },
&ScanConfig::default(), &ScanConfig::default(),
); );
assert_eq!(accounts, vec![account0]); assert_eq!(accounts, vec![account0]);
let ancestors = vec![(1, 1), (0, 0)].into_iter().collect(); let ancestors = vec![(1, 1), (0, 0)].into_iter().collect();
let accounts: Vec<AccountSharedData> = db.unchecked_scan_accounts( let mut accounts = Vec::new();
db.unchecked_scan_accounts(
"", "",
&ancestors, &ancestors,
|accounts: &mut Vec<AccountSharedData>, option| { |_, account, _| {
accounts.push(option.1.take_account()); accounts.push(account.take_account());
}, },
&ScanConfig::default(), &ScanConfig::default(),
); );
@ -14389,7 +14381,7 @@ pub mod tests {
db.scan_accounts( db.scan_accounts(
&scan_ancestors, &scan_ancestors,
bank_id, bank_id,
|_collector: &mut Vec<(Pubkey, AccountSharedData)>, maybe_account| { |maybe_account| {
ready_.store(true, Ordering::Relaxed); ready_.store(true, Ordering::Relaxed);
if let Some((pubkey, _, _)) = maybe_account { if let Some((pubkey, _, _)) = maybe_account {
if *pubkey == stall_key { if *pubkey == stall_key {