Optimize stake accounts store (#26004)

* reduce threadpool for stake processing

* wip

* Revert "reduce threadpool for stake processing"

This reverts commit 004a4f872ea7a3ef53e38d145b6350c3f57c680c.

* batch update stake_cache

* fix deadlock

* add test

* code review feedbacks

* more review feedbacks

* clippy

* update comments

* fix conflicts

* remove batch store, no atomic for redeem timing

* refactor stake_reward struct
This commit is contained in:
HaoranYi 2022-06-22 10:45:58 -05:00 committed by GitHub
parent 955302b953
commit 5624ab0d4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 38 additions and 17 deletions

View File

@ -173,6 +173,7 @@ use {
struct RewardsMetrics {
load_vote_and_stake_accounts_us: AtomicU64,
calculate_points_us: AtomicU64,
redeem_rewards_us: u64,
store_stake_accounts_us: AtomicU64,
store_vote_accounts_us: AtomicU64,
invalid_cached_vote_accounts: usize,
@ -1948,6 +1949,7 @@ impl Bank {
metrics.calculate_points_us.load(Relaxed),
i64
),
("redeem_rewards_us", metrics.redeem_rewards_us, i64),
(
"store_stake_accounts_us",
metrics.store_stake_accounts_us.load(Relaxed),
@ -2989,7 +2991,7 @@ impl Bank {
}
/// iterate over all stakes, redeem vote credits for each stake we can
/// successfully load and parse, return the lamport value of one point
/// successfully load and parse, return the lamport value of one point
fn pay_validator_rewards_with_thread_pool(
&mut self,
rewarded_epoch: Epoch,
@ -3000,6 +3002,12 @@ impl Bank {
metrics: &mut RewardsMetrics,
update_rewards_from_cached_accounts: bool,
) -> f64 {
struct StakeReward {
stake_pubkey: Pubkey,
stake_reward_info: RewardInfo,
stake_account: AccountSharedData,
}
let stake_history = self.stakes_cache.stakes().history().clone();
let vote_with_stake_delegations_map = {
let mut m = Measure::start("load_vote_and_stake_accounts_us");
@ -3087,7 +3095,7 @@ impl Bank {
);
let mut m = Measure::start("redeem_rewards");
let mut stake_rewards = thread_pool.install(|| {
let stake_rewards: Vec<StakeReward> = thread_pool.install(|| {
stake_delegation_iterator
.filter_map(|(vote_pubkey, vote_state, (stake_pubkey, stake_account))| {
// curry closure to add the contextual stake_pubkey
@ -3122,21 +3130,17 @@ impl Bank {
*vote_rewards_sum = vote_rewards_sum.saturating_add(voters_reward);
}
// store stake account even if stakers_reward is 0
// because credits observed has changed
self.store_account(&stake_pubkey, &stake_account);
if stakers_reward > 0 {
return Some((
stake_pubkey,
RewardInfo {
reward_type: RewardType::Staking,
lamports: stakers_reward as i64,
post_balance: stake_account.lamports(),
commission: Some(vote_state.commission),
},
));
}
let post_balance = stake_account.lamports();
return Some(StakeReward {
stake_pubkey,
stake_reward_info: RewardInfo {
reward_type: RewardType::Staking,
lamports: i64::try_from(stakers_reward).unwrap(),
post_balance,
commission: Some(vote_state.commission),
},
stake_account,
});
} else {
debug!(
"stake_state::redeem_rewards() failed for {}: {:?}",
@ -3148,10 +3152,27 @@ impl Bank {
.collect()
});
m.stop();
metrics.redeem_rewards_us += m.as_us();
// store stake account even if stakers_reward is 0
// because credits observed has changed
let mut m = Measure::start("store_stake_account");
let accounts_to_store = stake_rewards
.iter()
.map(|x| (&x.stake_pubkey, &x.stake_account))
.collect::<Vec<_>>();
self.store_accounts(&accounts_to_store);
m.stop();
metrics
.store_stake_accounts_us
.fetch_add(m.as_us(), Relaxed);
let mut stake_rewards = stake_rewards
.into_iter()
.filter(|x| x.stake_reward_info.lamports > 0)
.map(|x| (x.stake_pubkey, x.stake_reward_info))
.collect();
let mut m = Measure::start("store_vote_accounts");
let mut vote_rewards = vote_account_rewards
.into_iter()