stores stake-accounts in parallel after epoch rewards calculations (#32633)

This commit is contained in:
behzad nouri 2023-07-29 20:47:28 +00:00 committed by GitHub
parent 3cc97c75e6
commit ad4ddd3cb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 57 deletions

View File

@ -94,6 +94,7 @@ use {
percentage::Percentage,
rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
slice::ParallelSlice,
ThreadPool, ThreadPoolBuilder,
},
solana_bpf_loader_program::syscalls::create_program_runtime_environment,
@ -2855,7 +2856,7 @@ impl Bank {
);
}
self.store_stake_accounts(&stake_rewards, metrics);
self.store_stake_accounts(thread_pool, &stake_rewards, metrics);
let vote_rewards = self.store_vote_accounts(vote_account_rewards, metrics);
self.update_reward_history(stake_rewards, vote_rewards);
}
@ -3306,15 +3307,30 @@ impl Bank {
(vote_account_rewards, stake_rewards)
}
fn store_stake_accounts(&self, stake_rewards: &[StakeReward], metrics: &mut RewardsMetrics) {
fn store_stake_accounts(
&self,
thread_pool: &ThreadPool,
stake_rewards: &[StakeReward],
metrics: &mut RewardsMetrics,
) {
// store stake account even if stake_reward is 0
// because credits observed has changed
let (_, measure) = measure!({
self.store_accounts((self.slot(), stake_rewards, self.include_slot_in_hash()))
let now = Instant::now();
let slot = self.slot();
let include_slot_in_hash = self.include_slot_in_hash();
self.stakes_cache
.update_stake_accounts(thread_pool, stake_rewards);
assert!(!self.freeze_started());
thread_pool.install(|| {
stake_rewards.par_chunks(512).for_each(|chunk| {
self.rc
.accounts
.store_accounts_cached((slot, chunk, include_slot_in_hash))
})
});
metrics
.store_stake_accounts_us
.fetch_add(measure.as_us(), Relaxed);
.fetch_add(now.elapsed().as_micros() as u64, Relaxed);
}
/// store stake rewards in partition

View File

@ -55,13 +55,16 @@ impl StakeAccount<Delegation> {
}
}
impl TryFrom<AccountSharedData> for StakeAccount<()> {
impl TryFrom<AccountSharedData> for StakeAccount<Delegation> {
type Error = Error;
fn try_from(account: AccountSharedData) -> Result<Self, Self::Error> {
if account.owner() != &solana_stake_program::id() {
return Err(Error::InvalidOwner(*account.owner()));
}
let stake_state = account.state()?;
let stake_state: StakeState = account.state()?;
if stake_state.delegation().is_none() {
return Err(Error::InvalidDelegation(Box::new(stake_state)));
}
Ok(Self {
account,
stake_state,
@ -70,23 +73,6 @@ impl TryFrom<AccountSharedData> for StakeAccount<()> {
}
}
impl TryFrom<AccountSharedData> for StakeAccount<Delegation> {
type Error = Error;
fn try_from(account: AccountSharedData) -> Result<Self, Self::Error> {
let stake_account = StakeAccount::<()>::try_from(account)?;
if stake_account.stake_state.delegation().is_none() {
return Err(Error::InvalidDelegation(Box::new(
stake_account.stake_state,
)));
}
Ok(Self {
account: stake_account.account,
stake_state: stake_account.stake_state,
_phantom: PhantomData,
})
}
}
impl<T> From<StakeAccount<T>> for (AccountSharedData, StakeState) {
#[inline]
fn from(stake_account: StakeAccount<T>) -> Self {

View File

@ -4,6 +4,7 @@ use {
crate::{
stake_account,
stake_history::StakeHistory,
stake_rewards::StakeReward,
vote_account::{VoteAccount, VoteAccounts},
},
dashmap::DashMap,
@ -21,7 +22,7 @@ use {
},
std::{
collections::{HashMap, HashSet},
ops::Add,
ops::{Add, Deref},
sync::{Arc, RwLock, RwLockReadGuard},
},
thiserror::Error,
@ -122,6 +123,17 @@ impl StakesCache {
stakes.activate_epoch(next_epoch, thread_pool)
}
pub(crate) fn update_stake_accounts(
&self,
thread_pool: &ThreadPool,
stake_rewards: &[StakeReward],
) {
self.0
.write()
.unwrap()
.update_stake_accounts(thread_pool, stake_rewards)
}
pub(crate) fn handle_invalid_keys(
&self,
invalid_vote_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
@ -261,16 +273,6 @@ impl Stakes<StakeAccount> {
}
fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) {
type StakesHashMap = HashMap</*voter:*/ Pubkey, /*stake:*/ u64>;
fn merge(mut acc: StakesHashMap, other: StakesHashMap) -> StakesHashMap {
if acc.len() < other.len() {
return merge(other, acc);
}
for (key, stake) in other {
*acc.entry(key).or_default() += stake;
}
acc
}
let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
// Wrap up the prev epoch by adding new stake history entry for the
// prev epoch.
@ -288,28 +290,13 @@ impl Stakes<StakeAccount> {
self.epoch = next_epoch;
// Refresh the stake distribution of vote accounts for the next epoch,
// using new stake history.
let delegated_stakes = thread_pool.install(|| {
stake_delegations
.par_iter()
.fold(HashMap::default, |mut delegated_stakes, stake_account| {
let delegation = stake_account.delegation();
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
*entry += delegation.stake(self.epoch, Some(&self.stake_history));
delegated_stakes
})
.reduce(HashMap::default, merge)
});
self.vote_accounts = self
.vote_accounts
.iter()
.map(|(&vote_pubkey, vote_account)| {
let delegated_stake = delegated_stakes
.get(&vote_pubkey)
.copied()
.unwrap_or_default();
(vote_pubkey, (delegated_stake, vote_account.clone()))
})
.collect();
self.vote_accounts = refresh_vote_accounts(
thread_pool,
self.epoch,
&self.vote_accounts,
&stake_delegations,
&self.stake_history,
);
}
/// Sum the stakes that point to the given voter_pubkey
@ -382,6 +369,33 @@ impl Stakes<StakeAccount> {
}
}
fn update_stake_accounts(&mut self, thread_pool: &ThreadPool, stake_rewards: &[StakeReward]) {
let stake_delegations: Vec<_> = thread_pool.install(|| {
stake_rewards
.into_par_iter()
.filter_map(|stake_reward| {
let stake_account = StakeAccount::try_from(stake_reward.stake_account.clone());
Some((stake_reward.stake_pubkey, stake_account.ok()?))
})
.collect()
});
self.stake_delegations = std::mem::take(&mut self.stake_delegations)
.into_iter()
.chain(stake_delegations)
.collect::<HashMap<Pubkey, StakeAccount>>()
.into_iter()
.filter(|(_, account)| account.lamports() != 0u64)
.collect();
let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
self.vote_accounts = refresh_vote_accounts(
thread_pool,
self.epoch,
&self.vote_accounts,
&stake_delegations,
&self.stake_history,
);
}
pub(crate) fn stake_delegations(&self) -> &ImHashMap<Pubkey, StakeAccount> {
&self.stake_delegations
}
@ -489,6 +503,47 @@ pub(crate) mod serde_stakes_enum_compat {
}
}
fn refresh_vote_accounts(
thread_pool: &ThreadPool,
epoch: Epoch,
vote_accounts: &VoteAccounts,
stake_delegations: &[&StakeAccount],
stake_history: &StakeHistory,
) -> VoteAccounts {
type StakesHashMap = HashMap</*voter:*/ Pubkey, /*stake:*/ u64>;
fn merge(mut stakes: StakesHashMap, other: StakesHashMap) -> StakesHashMap {
if stakes.len() < other.len() {
return merge(other, stakes);
}
for (pubkey, stake) in other {
*stakes.entry(pubkey).or_default() += stake;
}
stakes
}
let stake_history = Some(stake_history.deref());
let delegated_stakes = thread_pool.install(|| {
stake_delegations
.par_iter()
.fold(HashMap::default, |mut delegated_stakes, stake_account| {
let delegation = stake_account.delegation();
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
*entry += delegation.stake(epoch, stake_history);
delegated_stakes
})
.reduce(HashMap::default, merge)
});
vote_accounts
.iter()
.map(|(&vote_pubkey, vote_account)| {
let delegated_stake = delegated_stakes
.get(&vote_pubkey)
.copied()
.unwrap_or_default();
(vote_pubkey, (delegated_stake, vote_account.clone()))
})
.collect()
}
#[cfg(test)]
pub(crate) mod tests {
use {