Refactor reward payout code - part 4 (#31159)

extract redeem_reward fn
This commit is contained in:
HaoranYi 2023-04-12 10:20:21 -05:00 committed by GitHub
parent 944b9d574a
commit 1db0683a50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 105 additions and 87 deletions

View File

@ -2889,94 +2889,22 @@ impl Bank {
metrics,
);
if point_value.is_none() {
return;
if let Some(point_value) = point_value {
let (vote_account_rewards, stake_rewards) = self.redeem_rewards(
vote_with_stake_delegations_map,
rewarded_epoch,
point_value,
credits_auto_rewind,
&stake_history,
thread_pool,
reward_calc_tracer.as_ref(),
metrics,
);
self.store_stake_accounts(&stake_rewards, metrics);
let vote_rewards = self.store_vote_accounts(vote_account_rewards, metrics);
self.update_reward_history(stake_rewards, vote_rewards);
}
// pay according to point value
let point_value = point_value.unwrap();
let vote_account_rewards: DashMap<Pubkey, (AccountSharedData, u8, u64, bool)> =
DashMap::with_capacity(vote_with_stake_delegations_map.len());
let stake_delegation_iterator = vote_with_stake_delegations_map.into_par_iter().flat_map(
|(
vote_pubkey,
VoteWithStakeDelegations {
vote_state,
vote_account,
delegations,
},
)| {
vote_account_rewards
.insert(vote_pubkey, (vote_account, vote_state.commission, 0, false));
delegations
.into_par_iter()
.map(move |delegation| (vote_pubkey, Arc::clone(&vote_state), delegation))
},
);
let mut m = Measure::start("redeem_rewards");
let stake_rewards: StakeRewards = thread_pool.install(|| {
stake_delegation_iterator
.filter_map(|(vote_pubkey, vote_state, (stake_pubkey, stake_account))| {
// curry closure to add the contextual stake_pubkey
let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| {
// inner
move |inner_event: &_| {
outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event))
}
});
let (mut stake_account, stake_state) =
<(AccountSharedData, StakeState)>::from(stake_account);
let redeemed = stake_state::redeem_rewards(
rewarded_epoch,
stake_state,
&mut stake_account,
&vote_state,
&point_value,
Some(&stake_history),
reward_calc_tracer.as_ref(),
credits_auto_rewind,
);
if let Ok((stakers_reward, voters_reward)) = redeemed {
// track voter rewards
if let Some((
_vote_account,
_commission,
vote_rewards_sum,
vote_needs_store,
)) = vote_account_rewards.get_mut(&vote_pubkey).as_deref_mut()
{
*vote_needs_store = true;
*vote_rewards_sum = vote_rewards_sum.saturating_add(voters_reward);
}
let post_balance = stake_account.lamports();
return Some(StakeReward {
stake_pubkey,
stake_reward_info: RewardInfo {
reward_type: RewardType::Staking,
lamports: i64::try_from(stakers_reward).unwrap(),
post_balance,
commission: Some(vote_state.commission),
},
stake_account,
});
} else {
debug!(
"stake_state::redeem_rewards() failed for {}: {:?}",
stake_pubkey, redeemed
);
}
None
})
.collect()
});
m.stop();
metrics.redeem_rewards_us += m.as_us();
self.store_stake_accounts(&stake_rewards, metrics);
let vote_rewards = self.store_vote_accounts(vote_account_rewards, metrics);
self.update_reward_history(stake_rewards, vote_rewards);
}
fn load_vote_and_stake_accounts(
@ -3059,6 +2987,96 @@ impl Bank {
(points > 0).then_some(PointValue { rewards, points })
}
fn redeem_rewards(
&self,
vote_with_stake_delegations_map: DashMap<Pubkey, VoteWithStakeDelegations>,
rewarded_epoch: Epoch,
point_value: PointValue,
credits_auto_rewind: bool,
stake_history: &StakeHistory,
thread_pool: &ThreadPool,
reward_calc_tracer: Option<impl RewardCalcTracer>,
metrics: &mut RewardsMetrics,
) -> (VoteRewards, StakeRewards) {
let vote_account_rewards: VoteRewards =
DashMap::with_capacity(vote_with_stake_delegations_map.len());
let stake_delegation_iterator = vote_with_stake_delegations_map.into_par_iter().flat_map(
|(
vote_pubkey,
VoteWithStakeDelegations {
vote_state,
vote_account,
delegations,
},
)| {
vote_account_rewards
.insert(vote_pubkey, (vote_account, vote_state.commission, 0, false));
delegations
.into_par_iter()
.map(move |delegation| (vote_pubkey, Arc::clone(&vote_state), delegation))
},
);
let (stake_rewards, measure) = measure!(thread_pool.install(|| {
stake_delegation_iterator
.filter_map(|(vote_pubkey, vote_state, (stake_pubkey, stake_account))| {
// curry closure to add the contextual stake_pubkey
let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| {
// inner
move |inner_event: &_| {
outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event))
}
});
let (mut stake_account, stake_state) =
<(AccountSharedData, StakeState)>::from(stake_account);
let redeemed = stake_state::redeem_rewards(
rewarded_epoch,
stake_state,
&mut stake_account,
&vote_state,
&point_value,
Some(stake_history),
reward_calc_tracer.as_ref(),
credits_auto_rewind,
);
if let Ok((stakers_reward, voters_reward)) = redeemed {
// track voter rewards
if let Some((
_vote_account,
_commission,
vote_rewards_sum,
vote_needs_store,
)) = vote_account_rewards.get_mut(&vote_pubkey).as_deref_mut()
{
*vote_needs_store = true;
*vote_rewards_sum = vote_rewards_sum.saturating_add(voters_reward);
}
let post_balance = stake_account.lamports();
return Some(StakeReward {
stake_pubkey,
stake_reward_info: RewardInfo {
reward_type: RewardType::Staking,
lamports: i64::try_from(stakers_reward).unwrap(),
post_balance,
commission: Some(vote_state.commission),
},
stake_account,
});
} else {
debug!(
"stake_state::redeem_rewards() failed for {}: {:?}",
stake_pubkey, redeemed
);
}
None
})
.collect()
}));
metrics.redeem_rewards_us += measure.as_us();
(vote_account_rewards, stake_rewards)
}
fn store_stake_accounts(&self, stake_rewards: &[StakeReward], metrics: &mut RewardsMetrics) {
// store stake account even if stake_reward is 0
// because credits observed has changed