Avoid RMWs on shared data inside parallel loops: collect_rent_from_accounts() (#25790)

This commit is contained in:
Brooks Prumo 2022-06-06 13:32:22 -05:00 committed by GitHub
parent 67a11ce4b1
commit ec64d5261f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 156 additions and 66 deletions

View File

@ -80,7 +80,7 @@ use {
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
ThreadPool, ThreadPoolBuilder,
},
solana_measure::measure::Measure,
solana_measure::{measure, measure::Measure},
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
solana_program_runtime::{
accounts_data_meter::MAX_ACCOUNTS_DATA_LEN,
@ -5240,31 +5240,39 @@ impl Bank {
}
}
/// Collect rent from `accounts`
///
/// This fn is called inside a parallel loop from `collect_rent_in_partition()`. Avoid adding
/// any code that causes contention on shared memory/data (i.e. do not update atomic metrics).
///
/// The return value is a struct of computed values that `collect_rent_in_partition()` will
/// reduce at the end of its parallel loop. If possible, place data/computation that cause
/// contention/take locks in the return struct and process them in
/// `collect_rent_from_partition()` after reducing the parallel loop.
fn collect_rent_from_accounts(
&self,
mut accounts: Vec<(Pubkey, AccountSharedData, Slot)>,
metrics: &RentMetrics,
just_rewrites: bool,
) {
) -> CollectRentFromAccountsInfo {
let mut rent_debits = RentDebits::default();
let mut total_collected = CollectedInfo::default();
let mut total_rent_collected_info = CollectedInfo::default();
let bank_slot = self.slot();
let mut rewrites_skipped = Vec::with_capacity(accounts.len());
let mut accounts_to_store =
Vec::<(&Pubkey, &AccountSharedData)>::with_capacity(accounts.len());
let mut collect_us = 0;
let mut hash_skipped_rewrites_us = 0;
let mut time_collecting_rent_us = 0;
let mut time_hashing_skipped_rewrites_us = 0;
let mut time_storing_accounts_us = 0;
let can_skip_rewrites = self.rc.accounts.accounts_db.skip_rewrites || just_rewrites;
for (pubkey, account, loaded_slot) in accounts.iter_mut() {
let old_rent_epoch = account.rent_epoch();
let mut time = Measure::start("collect");
let collected = self.rent_collector.collect_from_existing_account(
pubkey,
account,
self.rc.accounts.accounts_db.filler_account_suffix.as_ref(),
);
time.stop();
collect_us += time.as_us();
let (rent_collected_info, measure) =
measure!(self.rent_collector.collect_from_existing_account(
pubkey,
account,
self.rc.accounts.accounts_db.filler_account_suffix.as_ref(),
));
time_collecting_rent_us += measure.as_us();
// only store accounts where we collected rent
// but get the hash for all these accounts even if collected rent is 0 (= not updated).
// Also, there's another subtle side-effect from this: this
@ -5273,48 +5281,45 @@ impl Bank {
if can_skip_rewrites
&& Self::skip_rewrite(
bank_slot,
collected.rent_amount,
rent_collected_info.rent_amount,
*loaded_slot,
old_rent_epoch,
account.rent_epoch(),
account,
)
{
// this would have been rewritten previously. Now we skip it.
// calculate the hash that we would have gotten if we did the rewrite.
// This will be needed to calculate the bank's hash.
let mut time = Measure::start("hash_account");
let hash =
crate::accounts_db::AccountsDb::hash_account(self.slot(), account, pubkey);
time.stop();
hash_skipped_rewrites_us += time.as_us();
let (hash, measure) = measure!(crate::accounts_db::AccountsDb::hash_account(
self.slot(),
account,
pubkey
));
time_hashing_skipped_rewrites_us += measure.as_us();
rewrites_skipped.push((*pubkey, hash));
assert_eq!(collected, CollectedInfo::default());
assert_eq!(rent_collected_info, CollectedInfo::default());
} else if !just_rewrites {
total_collected += collected;
total_rent_collected_info += rent_collected_info;
accounts_to_store.push((pubkey, account));
}
rent_debits.insert(pubkey, collected.rent_amount, account.lamports());
rent_debits.insert(pubkey, rent_collected_info.rent_amount, account.lamports());
}
metrics.collect_us.fetch_add(collect_us, Relaxed);
metrics.hash_us.fetch_add(hash_skipped_rewrites_us, Relaxed);
if !accounts_to_store.is_empty() {
let mut time = Measure::start("store_account");
self.store_accounts(&accounts_to_store);
time.stop();
metrics.store_us.fetch_add(time.as_us(), Relaxed);
// TODO: Maybe do not call `store_accounts()` here. Instead return `accounts_to_store`
// and have `collect_rent_in_partition()` perform all the stores.
let (_, measure) = measure!(self.store_accounts(&accounts_to_store));
time_storing_accounts_us += measure.as_us();
}
self.remember_skipped_rewrites(rewrites_skipped);
self.collected_rent
.fetch_add(total_collected.rent_amount, Relaxed);
self.rewards
.write()
.unwrap()
.extend(rent_debits.into_unordered_rewards_iter());
self.update_accounts_data_size_delta_off_chain(
-(total_collected.account_data_len_reclaimed as i64),
);
CollectRentFromAccountsInfo {
rent_collected_info: total_rent_collected_info,
rent_rewards: rent_debits.into_unordered_rewards_iter().collect(),
rewrites_skipped,
time_collecting_rent_us,
time_hashing_skipped_rewrites_us,
time_storing_accounts_us,
}
}
/// load accounts with pubkeys in 'partition'
@ -5348,10 +5353,9 @@ impl Bank {
let end_prefix_inclusive = Self::prefix_from_pubkey(subrange_full.end());
let range = end_prefix_inclusive - start_prefix;
let increment = range / num_threads;
for thread_metrics in (0..num_threads)
let mut results = (0..num_threads)
.into_par_iter()
.map(|chunk| {
let metrics = RentMetrics::default();
let offset = |chunk| start_prefix + chunk * increment;
let start = offset(chunk);
let last = chunk == num_threads - 1;
@ -5360,8 +5364,7 @@ impl Bank {
bound
};
let start = merge_prefix(start, *subrange_full.start());
let mut load = Measure::start("load");
let accounts = if last {
let (accounts, measure_load_accounts) = measure!(if last {
let end = *subrange_full.end();
let subrange = start..=end; // IN-clusive
self.rc
@ -5373,32 +5376,44 @@ impl Bank {
self.rc
.accounts
.load_to_collect_rent_eagerly(&self.ancestors, subrange)
};
load.stop();
metrics.load_us.fetch_add(load.as_us(), Relaxed);
self.collect_rent_from_accounts(accounts, &metrics, just_rewrites);
metrics
});
CollectRentInPartitionInfo::new(
self.collect_rent_from_accounts(accounts, just_rewrites),
Duration::from_nanos(measure_load_accounts.as_ns()),
)
})
.collect::<Vec<_>>()
{
metrics
.load_us
.fetch_add(thread_metrics.load_us.load(Relaxed), Relaxed);
metrics
.store_us
.fetch_add(thread_metrics.store_us.load(Relaxed), Relaxed);
metrics
.hash_us
.fetch_add(thread_metrics.hash_us.load(Relaxed), Relaxed);
metrics
.collect_us
.fetch_add(thread_metrics.collect_us.load(Relaxed), Relaxed);
}
.reduce(
CollectRentInPartitionInfo::default,
CollectRentInPartitionInfo::reduce,
);
self.rc
.accounts
.hold_range_in_memory(&subrange_full, false, thread_pool);
self.collected_rent
.fetch_add(results.rent_collected, Relaxed);
self.update_accounts_data_size_delta_off_chain(
-(results.accounts_data_size_reclaimed as i64),
);
self.rewards
.write()
.unwrap()
.append(&mut results.rent_rewards);
self.remember_skipped_rewrites(results.rewrites_skipped);
metrics
.load_us
.fetch_add(results.time_loading_accounts_us, Relaxed);
metrics
.collect_us
.fetch_add(results.time_collecting_rent_us, Relaxed);
metrics
.hash_us
.fetch_add(results.time_hashing_skipped_rewrites_us, Relaxed);
metrics
.store_us
.fetch_add(results.time_storing_accounts_us, Relaxed);
});
}
@ -7479,6 +7494,81 @@ impl Bank {
}
}
/// Return the computed values from `collect_rent_from_accounts()`
///
/// Since `collect_rent_from_accounts()` is running in parallel, instead of updating the
/// atomics/shared data inside this function, return those values in this struct for the caller to
/// process later.
#[derive(Debug, Default)]
struct CollectRentFromAccountsInfo {
rent_collected_info: CollectedInfo,
rent_rewards: Vec<(Pubkey, RewardInfo)>,
rewrites_skipped: Vec<(Pubkey, Hash)>,
time_collecting_rent_us: u64,
time_hashing_skipped_rewrites_us: u64,
time_storing_accounts_us: u64,
}
/// Return the computed values—of each iteration in the parallel loop inside
/// `collect_rent_in_partition()`—and then perform a reduce on all of them.
#[derive(Debug, Default)]
struct CollectRentInPartitionInfo {
rent_collected: u64,
accounts_data_size_reclaimed: u64,
rent_rewards: Vec<(Pubkey, RewardInfo)>,
rewrites_skipped: Vec<(Pubkey, Hash)>,
time_loading_accounts_us: u64,
time_collecting_rent_us: u64,
time_hashing_skipped_rewrites_us: u64,
time_storing_accounts_us: u64,
}
impl CollectRentInPartitionInfo {
/// Create a new `CollectRentInPartitionInfo` from the results of loading accounts and
/// collecting rent on them.
#[must_use]
fn new(info: CollectRentFromAccountsInfo, time_loading_accounts: Duration) -> Self {
Self {
rent_collected: info.rent_collected_info.rent_amount,
accounts_data_size_reclaimed: info.rent_collected_info.account_data_len_reclaimed,
rent_rewards: info.rent_rewards,
rewrites_skipped: info.rewrites_skipped,
time_loading_accounts_us: time_loading_accounts.as_micros() as u64,
time_collecting_rent_us: info.time_collecting_rent_us,
time_hashing_skipped_rewrites_us: info.time_hashing_skipped_rewrites_us,
time_storing_accounts_us: info.time_storing_accounts_us,
}
}
/// Reduce (i.e. 'combine') two `CollectRentInPartitionInfo`s into one.
///
/// This fn is used by `collect_rent_in_partition()` as the reduce step (of map-reduce) in its
/// parallel loop of rent collection.
#[must_use]
fn reduce(lhs: Self, rhs: Self) -> Self {
Self {
rent_collected: lhs.rent_collected.saturating_add(rhs.rent_collected),
accounts_data_size_reclaimed: lhs
.accounts_data_size_reclaimed
.saturating_add(rhs.accounts_data_size_reclaimed),
rent_rewards: [lhs.rent_rewards, rhs.rent_rewards].concat(),
rewrites_skipped: [lhs.rewrites_skipped, rhs.rewrites_skipped].concat(),
time_loading_accounts_us: lhs
.time_loading_accounts_us
.saturating_add(rhs.time_loading_accounts_us),
time_collecting_rent_us: lhs
.time_collecting_rent_us
.saturating_add(rhs.time_collecting_rent_us),
time_hashing_skipped_rewrites_us: lhs
.time_hashing_skipped_rewrites_us
.saturating_add(rhs.time_hashing_skipped_rewrites_us),
time_storing_accounts_us: lhs
.time_storing_accounts_us
.saturating_add(rhs.time_storing_accounts_us),
}
}
}
/// Struct to collect stats when scanning all accounts in `get_total_accounts_stats()`
#[derive(Debug, Default, Copy, Clone)]
pub struct TotalAccountsStats {