refactor and test scan abort code (#21390)

This commit is contained in:
Jeff Washington (jwash) 2021-12-08 14:09:34 -06:00 committed by GitHub
parent 923720f529
commit 8d1e5ac294
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 25 deletions

View File

@ -783,6 +783,42 @@ impl Accounts {
)
}
fn calc_scan_result_size(account: &AccountSharedData) -> usize {
account.data().len()
+ std::mem::size_of::<AccountSharedData>()
+ std::mem::size_of::<Pubkey>()
}
/// Accumulate size of (pubkey + account) into sum.
/// Return true iff sum > 'byte_limit_for_scan'
fn accumulate_and_check_scan_result_size(
sum: &AtomicUsize,
account: &AccountSharedData,
byte_limit_for_scan: &Option<usize>,
) -> bool {
if let Some(byte_limit_for_scan) = byte_limit_for_scan.as_ref() {
let added = Self::calc_scan_result_size(account);
sum.fetch_add(added, Ordering::Relaxed)
.saturating_add(added)
> *byte_limit_for_scan
} else {
false
}
}
fn maybe_abort_scan(
result: ScanResult<Vec<(Pubkey, AccountSharedData)>>,
config: &ScanConfig,
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
if config.is_aborted() {
ScanResult::Err(ScanError::Aborted(
"The accumulated scan results exceeded the limit".to_string(),
))
} else {
result
}
}
pub fn load_by_index_key_with_filter<F: Fn(&AccountSharedData) -> bool>(
&self,
ancestors: &Ancestors,
@ -793,10 +829,7 @@ impl Accounts {
byte_limit_for_scan: Option<usize>,
) -> ScanResult<Vec<(Pubkey, AccountSharedData)>> {
let sum = AtomicUsize::default();
let config = ScanConfig {
abort: Some(config.abort.as_ref().map(Arc::clone).unwrap_or_default()),
collect_all_unsorted: config.collect_all_unsorted,
};
let config = config.recreate_with_abort();
let result = self
.accounts_db
.index_scan_accounts(
@ -806,20 +839,15 @@ impl Accounts {
|collector: &mut Vec<(Pubkey, AccountSharedData)>, some_account_tuple| {
Self::load_while_filtering(collector, some_account_tuple, |account| {
let use_account = filter(account);
if use_account {
if let Some(byte_limit_for_scan) = byte_limit_for_scan.as_ref() {
let added = account.data().len()
+ std::mem::size_of::<AccountSharedData>()
+ std::mem::size_of::<Pubkey>();
if sum
.fetch_add(added, Ordering::Relaxed)
.saturating_add(added)
> *byte_limit_for_scan
{
// total size of results exceeds size limit, so abort scan
config.abort();
}
}
if use_account
&& Self::accumulate_and_check_scan_result_size(
&sum,
account,
&byte_limit_for_scan,
)
{
// total size of results exceeds size limit, so abort scan
config.abort();
}
use_account
});
@ -827,13 +855,7 @@ impl Accounts {
&config,
)
.map(|result| result.0);
if config.is_aborted() {
ScanResult::Err(ScanError::Aborted(
"The accumulated scan results exceeded the limit".to_string(),
))
} else {
result
}
Self::maybe_abort_scan(result, &config)
}
pub fn account_indexes_include_key(&self, key: &Pubkey) -> bool {
@ -3329,4 +3351,67 @@ mod tests {
vec![(pubkey1, 42), (pubkey2, 41)]
);
}
fn zero_len_account_size() -> usize {
std::mem::size_of::<AccountSharedData>() + std::mem::size_of::<Pubkey>()
}
#[test]
fn test_calc_scan_result_size() {
for len in 0..3 {
assert_eq!(
Accounts::calc_scan_result_size(&AccountSharedData::new(
0,
len,
&Pubkey::default()
)),
zero_len_account_size() + len
);
}
}
#[test]
fn test_maybe_abort_scan() {
assert!(Accounts::maybe_abort_scan(ScanResult::Ok(vec![]), &ScanConfig::default()).is_ok());
let config = ScanConfig::default().recreate_with_abort();
assert!(Accounts::maybe_abort_scan(ScanResult::Ok(vec![]), &config).is_ok());
config.abort();
assert!(Accounts::maybe_abort_scan(ScanResult::Ok(vec![]), &config).is_err());
}
#[test]
fn test_accumulate_and_check_scan_result_size() {
for (account, byte_limit_for_scan, result) in [
(AccountSharedData::default(), zero_len_account_size(), false),
(
AccountSharedData::new(0, 1, &Pubkey::default()),
zero_len_account_size(),
true,
),
(
AccountSharedData::new(0, 2, &Pubkey::default()),
zero_len_account_size() + 3,
false,
),
] {
let sum = AtomicUsize::default();
assert_eq!(
result,
Accounts::accumulate_and_check_scan_result_size(
&sum,
&account,
&Some(byte_limit_for_scan)
)
);
// calling a second time should accumulate above the threshold
assert!(Accounts::accumulate_and_check_scan_result_size(
&sum,
&account,
&Some(byte_limit_for_scan)
));
assert!(!Accounts::accumulate_and_check_scan_result_size(
&sum, &account, &None
));
}
}
}

View File

@ -80,12 +80,21 @@ impl ScanConfig {
}
}
/// mark the scan as aborted
pub fn abort(&self) {
if let Some(abort) = self.abort.as_ref() {
abort.store(true, Ordering::Relaxed)
}
}
/// use existing 'abort' if available, otherwise allocate one
pub fn recreate_with_abort(&self) -> Self {
ScanConfig {
abort: Some(self.abort.as_ref().map(Arc::clone).unwrap_or_default()),
collect_all_unsorted: self.collect_all_unsorted,
}
}
/// true if scan should abort
pub fn is_aborted(&self) -> bool {
if let Some(abort) = self.abort.as_ref() {
@ -4320,4 +4329,29 @@ pub mod tests {
config.bins = Some(3);
AccountsIndex::<bool>::new(Some(config));
}
#[test]
fn test_scan_config() {
for collect_all_unsorted in [false, true] {
let config = ScanConfig::new(collect_all_unsorted);
assert_eq!(config.collect_all_unsorted, collect_all_unsorted);
assert!(config.abort.is_none()); // not allocated
assert!(!config.is_aborted());
config.abort(); // has no effect
assert!(!config.is_aborted());
}
let config = ScanConfig::default();
assert!(!config.collect_all_unsorted);
assert!(config.abort.is_none());
let config = config.recreate_with_abort();
assert!(config.abort.is_some());
assert!(!config.is_aborted());
config.abort();
assert!(config.is_aborted());
let config = config.recreate_with_abort();
assert!(config.is_aborted());
}
}