improves Stakes::activate_epoch performance (#24068)

Tested with mainnet stakes obtained from the ledger at 5 recent epoch
boundaries, this code is ~30% faster than current master.

Current code:
  epoch: 289, elapsed: 82901us
  epoch: 290, elapsed: 80525us
  epoch: 291, elapsed: 79122us
  epoch: 292, elapsed: 79961us
  epoch: 293, elapsed: 78965us

This commit:
  epoch: 289, elapsed: 61710us
  epoch: 290, elapsed: 55721us
  epoch: 291, elapsed: 55886us
  epoch: 292, elapsed: 55399us
  epoch: 293, elapsed: 56803us
This commit is contained in:
behzad nouri 2022-04-02 22:48:51 +00:00 committed by GitHub
parent 0ca5a0ec68
commit fa7eb7f30c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 47 additions and 66 deletions

View File

@ -3,16 +3,13 @@
use {
crate::{
stake_history::StakeHistory,
vote_account::{VoteAccount, VoteAccounts, VoteAccountsHashMap},
vote_account::{VoteAccount, VoteAccounts},
},
dashmap::DashMap,
im::HashMap as ImHashMap,
num_derive::ToPrimitive,
num_traits::ToPrimitive,
rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
},
rayon::{prelude::*, ThreadPool},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::{Epoch, Slot},
@ -26,6 +23,7 @@ use {
solana_vote_program::vote_state::VoteState,
std::{
collections::HashMap,
ops::Add,
sync::{Arc, RwLock, RwLockReadGuard},
},
};
@ -168,71 +166,54 @@ impl Stakes {
&self.stake_history
}
pub fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) {
let prev_epoch = self.epoch;
self.epoch = next_epoch;
thread_pool.install(|| {
let stake_delegations = &self.stake_delegations;
let stake_history = &mut self.stake_history;
let vote_accounts: &VoteAccountsHashMap = self.vote_accounts.as_ref();
// construct map of vote pubkey -> list of stake delegations
let vote_delegations: HashMap<Pubkey, Vec<&Delegation>> = {
let mut vote_delegations = HashMap::with_capacity(vote_accounts.len());
stake_delegations
.iter()
.for_each(|(_stake_pubkey, delegation)| {
let vote_pubkey = &delegation.voter_pubkey;
vote_delegations
.entry(*vote_pubkey)
.and_modify(|delegations: &mut Vec<_>| delegations.push(delegation))
.or_insert_with(|| vec![delegation]);
});
vote_delegations
};
// wrap up the prev epoch by adding new stake history entry for the prev epoch
{
let stake_history_entry = vote_delegations
.par_iter()
.map(|(_vote_pubkey, delegations)| {
delegations
.par_iter()
.map(|delegation| {
delegation.stake_activating_and_deactivating(
prev_epoch,
Some(stake_history),
)
})
.reduce(StakeActivationStatus::default, |a, b| a + b)
})
.reduce(StakeActivationStatus::default, |a, b| a + b);
stake_history.add(prev_epoch, stake_history_entry);
fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) {
type StakesHashMap = HashMap</*voter:*/ Pubkey, /*stake:*/ u64>;
fn merge(mut acc: StakesHashMap, other: StakesHashMap) -> StakesHashMap {
if acc.len() < other.len() {
return merge(other, acc);
}
// refresh the stake distribution of vote accounts for the next epoch, using new stake history
let vote_accounts_for_next_epoch: VoteAccountsHashMap = vote_accounts
for (key, stake) in other {
*acc.entry(key).or_default() += stake;
}
acc
}
let stake_delegations: Vec<_> = self.stake_delegations.values().collect();
// Wrap up the prev epoch by adding new stake history entry for the
// prev epoch.
let stake_history_entry = thread_pool.install(|| {
stake_delegations
.par_iter()
.map(|(vote_pubkey, (_stake, vote_account))| {
let delegated_stake = vote_delegations
.get(vote_pubkey)
.map(|delegations| {
delegations
.par_iter()
.map(|delegation| delegation.stake(next_epoch, Some(stake_history)))
.sum()
})
.unwrap_or_default();
(*vote_pubkey, (delegated_stake, vote_account.clone()))
.fold(StakeActivationStatus::default, |acc, delegation| {
acc + delegation
.stake_activating_and_deactivating(self.epoch, Some(&self.stake_history))
})
.collect();
// overwrite vote accounts so that staked nodes singleton is reset
self.vote_accounts = VoteAccounts::from(Arc::new(vote_accounts_for_next_epoch));
.reduce(StakeActivationStatus::default, Add::add)
});
self.stake_history.add(self.epoch, stake_history_entry);
self.epoch = next_epoch;
// Refresh the stake distribution of vote accounts for the next epoch,
// using new stake history.
let delegated_stakes = thread_pool.install(|| {
stake_delegations
.par_iter()
.fold(HashMap::default, |mut delegated_stakes, delegation| {
let entry = delegated_stakes.entry(delegation.voter_pubkey).or_default();
*entry += delegation.stake(self.epoch, Some(&self.stake_history));
delegated_stakes
})
.reduce(HashMap::default, merge)
});
self.vote_accounts = self
.vote_accounts
.iter()
.map(|(&vote_pubkey, (_ /*stake*/, vote_account))| {
let delegated_stake = delegated_stakes
.get(&vote_pubkey)
.copied()
.unwrap_or_default();
(vote_pubkey, (delegated_stake, vote_account.clone()))
})
.collect();
}
/// Sum the stakes that point to the given voter_pubkey