updates rewards at epoch boundary using cached accounts

Loading vote and stake accounts from accounts-db takes a significant
portion of time updating rewards at epoch boundary.

This commit bypasses accounts-db and instead uses vote and stake
accounts cached in bank stakes:
https://github.com/solana-labs/solana/blob/d2702201c/runtime/src/stakes.rs#L148-L152

These cached accounts are synchronized with accounts-db after each
transaction, and so there should not be any change in the resulting
computation:
https://github.com/solana-labs/solana/blob/d2702201c/runtime/src/bank.rs#L4526

Nevertheless, to avoid any chances of introducing a consensus issue, the
switch to cached account is feature gated.
This commit is contained in:
behzad nouri 2022-04-08 09:45:55 -04:00
parent b4491ff4ba
commit f937fcbd95
4 changed files with 203 additions and 23 deletions

View File

@ -151,13 +151,13 @@ use {
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
fmt, mem,
ops::{Div, RangeInclusive},
ops::{Deref, Div, RangeInclusive},
path::PathBuf,
ptr,
rc::Rc,
sync::{
atomic::{
AtomicBool, AtomicU64,
AtomicBool, AtomicU64, AtomicUsize,
Ordering::{AcqRel, Acquire, Relaxed, Release},
},
Arc, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard,
@ -172,6 +172,7 @@ struct RewardsMetrics {
calculate_points_us: AtomicU64,
store_stake_accounts_us: AtomicU64,
store_vote_accounts_us: AtomicU64,
vote_accounts_cache_miss_count: usize,
}
mod address_lookup_table;
@ -1271,6 +1272,7 @@ struct LoadVoteAndStakeAccountsResult {
vote_with_stake_delegations_map: DashMap<Pubkey, VoteWithStakeDelegations>,
invalid_stake_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
invalid_vote_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
vote_accounts_cache_miss_count: usize,
}
#[derive(Debug, Default)]
@ -1786,7 +1788,7 @@ impl Bank {
"update_epoch_stakes",
);
let metrics = RewardsMetrics::default();
let mut metrics = RewardsMetrics::default();
// After saving a snapshot of stakes, apply stake rewards and commission
let (_, update_rewards_with_thread_pool_time) = Measure::this(
|_| {
@ -1794,7 +1796,7 @@ impl Bank {
parent_epoch,
reward_calc_tracer,
&thread_pool,
&metrics,
&mut metrics,
)
},
(),
@ -1843,6 +1845,11 @@ impl Bank {
metrics.store_vote_accounts_us.load(Relaxed),
i64
),
(
"vote_accounts_cache_miss_count",
metrics.vote_accounts_cache_miss_count,
i64
),
);
} else {
// Save a snapshot of stakes for use in consensus and stake weighted networking
@ -2522,7 +2529,7 @@ impl Bank {
prev_epoch: Epoch,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
thread_pool: &ThreadPool,
metrics: &RewardsMetrics,
metrics: &mut RewardsMetrics,
) {
let slot_in_year = self.slot_in_year_for_inflation();
let epoch_duration_in_years = self.epoch_duration_in_years(prev_epoch);
@ -2540,6 +2547,9 @@ impl Bank {
(validator_rate * capitalization as f64 * epoch_duration_in_years) as u64;
let old_vote_balance_and_staked = self.stakes_cache.stakes().vote_balance_and_staked();
let update_rewards_from_cached_accounts = self
.feature_set
.is_active(&feature_set::update_rewards_from_cached_accounts::id());
let validator_point_value = self.pay_validator_rewards_with_thread_pool(
prev_epoch,
@ -2548,6 +2558,7 @@ impl Bank {
self.stake_program_advance_activating_credits_observed(),
thread_pool,
metrics,
update_rewards_from_cached_accounts,
);
if !self
@ -2722,6 +2733,122 @@ impl Bank {
vote_with_stake_delegations_map,
invalid_vote_keys,
invalid_stake_keys,
vote_accounts_cache_miss_count: 0,
}
}
fn load_vote_and_stake_accounts<F>(
&self,
thread_pool: &ThreadPool,
reward_calc_tracer: Option<F>,
) -> LoadVoteAndStakeAccountsResult
where
F: Fn(&RewardCalculationEvent) + Send + Sync,
{
let stakes = self.stakes_cache.stakes();
let stake_delegations: Vec<_> = stakes.stake_delegations().iter().collect();
// Obtain all unique voter pubkeys from stake delegations.
fn merge(mut acc: HashSet<Pubkey>, other: HashSet<Pubkey>) -> HashSet<Pubkey> {
if acc.len() < other.len() {
return merge(other, acc);
}
acc.extend(other);
acc
}
let voter_pubkeys = thread_pool.install(|| {
stake_delegations
.par_iter()
.fold(
HashSet::default,
|mut voter_pubkeys, (_stake_pubkey, stake_account)| {
let delegation = stake_account.delegation().unwrap();
voter_pubkeys.insert(delegation.voter_pubkey);
voter_pubkeys
},
)
.reduce(HashSet::default, merge)
});
// Obtain vote-accounts for unique voter pubkeys.
let cached_vote_accounts = stakes.vote_accounts();
let solana_vote_program: Pubkey = solana_vote_program::id();
let vote_accounts_cache_miss_count = AtomicUsize::default();
let get_vote_account = |vote_pubkey: &Pubkey| -> Option<VoteAccount> {
if let Some((_stake, vote_account)) = cached_vote_accounts.get(vote_pubkey) {
return Some(vote_account.clone());
}
// If accounts-db contains a valid vote account, then it should
// already have been cached in cached_vote_accounts; so the code
// below is only for sanity check, and can be removed once
// vote_accounts_cache_miss_count is shown to be always zero.
let account = self.get_account_with_fixed_root(vote_pubkey)?;
if account.owner() == &solana_vote_program
&& VoteState::deserialize(account.data()).is_ok()
{
vote_accounts_cache_miss_count.fetch_add(1, Relaxed);
}
Some(VoteAccount::from(account))
};
let invalid_vote_keys = DashMap::<Pubkey, InvalidCacheEntryReason>::new();
let make_vote_delegations_entry = |vote_pubkey| {
let vote_account = match get_vote_account(&vote_pubkey) {
Some(vote_account) => vote_account,
None => {
invalid_vote_keys.insert(vote_pubkey, InvalidCacheEntryReason::Missing);
return None;
}
};
if vote_account.owner() != &solana_vote_program {
invalid_vote_keys.insert(vote_pubkey, InvalidCacheEntryReason::WrongOwner);
return None;
}
let vote_state = match vote_account.vote_state().deref() {
Ok(vote_state) => vote_state.clone(),
Err(_) => {
invalid_vote_keys.insert(vote_pubkey, InvalidCacheEntryReason::BadState);
return None;
}
};
let vote_with_stake_delegations = VoteWithStakeDelegations {
vote_state: Arc::new(vote_state),
vote_account: AccountSharedData::from(vote_account),
delegations: Vec::default(),
};
Some((vote_pubkey, vote_with_stake_delegations))
};
let vote_with_stake_delegations_map: DashMap<Pubkey, VoteWithStakeDelegations> =
thread_pool.install(|| {
voter_pubkeys
.into_par_iter()
.filter_map(make_vote_delegations_entry)
.collect()
});
// Join stake accounts with vote-accounts.
let push_stake_delegation = |(stake_pubkey, stake_account): (&Pubkey, &StakeAccount)| {
let delegation = stake_account.delegation().unwrap();
let mut vote_delegations =
match vote_with_stake_delegations_map.get_mut(&delegation.voter_pubkey) {
Some(vote_delegations) => vote_delegations,
None => return,
};
if let Some(reward_calc_tracer) = reward_calc_tracer.as_ref() {
let delegation =
InflationPointCalculationEvent::Delegation(delegation, solana_vote_program);
let event = RewardCalculationEvent::Staking(stake_pubkey, &delegation);
reward_calc_tracer(&event);
}
let stake_delegation = (*stake_pubkey, stake_account.clone());
vote_delegations.delegations.push(stake_delegation);
};
thread_pool.install(|| {
stake_delegations
.into_par_iter()
.for_each(push_stake_delegation);
});
LoadVoteAndStakeAccountsResult {
vote_with_stake_delegations_map,
invalid_vote_keys,
invalid_stake_keys: DashMap::default(),
vote_accounts_cache_miss_count: vote_accounts_cache_miss_count.into_inner(),
}
}
@ -2734,7 +2861,8 @@ impl Bank {
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
fix_activating_credits_observed: bool,
thread_pool: &ThreadPool,
metrics: &RewardsMetrics,
metrics: &mut RewardsMetrics,
update_rewards_from_cached_accounts: bool,
) -> f64 {
let stake_history = self.stakes_cache.stakes().history().clone();
let vote_with_stake_delegations_map = {
@ -2743,15 +2871,20 @@ impl Bank {
vote_with_stake_delegations_map,
invalid_stake_keys,
invalid_vote_keys,
} = self.load_vote_and_stake_accounts_with_thread_pool(
thread_pool,
reward_calc_tracer.as_ref(),
);
vote_accounts_cache_miss_count,
} = if update_rewards_from_cached_accounts {
self.load_vote_and_stake_accounts(thread_pool, reward_calc_tracer.as_ref())
} else {
self.load_vote_and_stake_accounts_with_thread_pool(
thread_pool,
reward_calc_tracer.as_ref(),
)
};
m.stop();
metrics
.load_vote_and_stake_accounts_us
.fetch_add(m.as_us(), Relaxed);
metrics.vote_accounts_cache_miss_count += vote_accounts_cache_miss_count;
let evict_invalid_stakes_cache_entries = self
.feature_set
.is_active(&feature_set::evict_invalid_stakes_cache_entries::id());
@ -4567,6 +4700,8 @@ impl Bank {
);
let rent_debits = self.collect_rent(&execution_results, loaded_txs);
// Cached vote and stake accounts are synchronized with accounts-db
// after each transaction.
let mut update_stakes_cache_time = Measure::start("update_stakes_cache_time");
self.update_stakes_cache(sanitized_txs, &execution_results, loaded_txs);
update_stakes_cache_time.stop();
@ -9016,6 +9151,19 @@ pub(crate) mod tests {
#[test]
fn test_bank_update_vote_stake_rewards() {
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
check_bank_update_vote_stake_rewards(|bank: &Bank| {
bank.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer())
});
check_bank_update_vote_stake_rewards(|bank: &Bank| {
bank.load_vote_and_stake_accounts(&thread_pool, null_tracer())
});
}
fn check_bank_update_vote_stake_rewards<F>(load_vote_and_stake_accounts: F)
where
F: Fn(&Bank) -> LoadVoteAndStakeAccountsResult,
{
solana_logger::setup();
// create a bank that ticks really slowly...
@ -9087,9 +9235,7 @@ pub(crate) mod tests {
);
assert!(bank0.rewards.read().unwrap().is_empty());
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let validator_points: u128 = bank0
.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer())
let validator_points: u128 = load_vote_and_stake_accounts(&bank0)
.vote_with_stake_delegations_map
.into_iter()
.map(
@ -15451,6 +15597,28 @@ pub(crate) mod tests {
#[test]
fn test_stake_vote_account_validity() {
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
check_stake_vote_account_validity(
true, // check owner change,
|bank: &Bank| {
bank.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer())
},
);
// TODO: stakes cache should be hardened for the case when the account
// owner is changed from vote/stake program to something else. see:
// https://github.com/solana-labs/solana/pull/24200#discussion_r849935444
check_stake_vote_account_validity(
false, // check owner change
|bank: &Bank| bank.load_vote_and_stake_accounts(&thread_pool, null_tracer()),
);
}
fn check_stake_vote_account_validity<F>(
check_owner_change: bool,
load_vote_and_stake_accounts: F,
) where
F: Fn(&Bank) -> LoadVoteAndStakeAccountsResult,
{
let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![&validator_vote_keypairs0, &validator_vote_keypairs1];
@ -15460,10 +15628,8 @@ pub(crate) mod tests {
vec![10_000; 2],
);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let vote_and_stake_accounts = bank
.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer())
.vote_with_stake_delegations_map;
let vote_and_stake_accounts =
load_vote_and_stake_accounts(&bank).vote_with_stake_delegations_map;
assert_eq!(vote_and_stake_accounts.len(), 2);
let mut vote_account = bank
@ -15502,11 +15668,12 @@ pub(crate) mod tests {
);
// Accounts must be valid stake and vote accounts
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let vote_and_stake_accounts = bank
.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer())
.vote_with_stake_delegations_map;
assert_eq!(vote_and_stake_accounts.len(), 0);
let vote_and_stake_accounts =
load_vote_and_stake_accounts(&bank).vote_with_stake_delegations_map;
assert_eq!(
vote_and_stake_accounts.len(),
if check_owner_change { 0 } else { 1 }
);
}
#[test]

View File

@ -56,6 +56,10 @@ impl StakesCache {
}
pub fn check_and_store(&self, pubkey: &Pubkey, account: &AccountSharedData) {
// TODO: If the account is already cached as a vote or stake account
// but the owner changes, then this needs to evict the account from
// the cache. see:
// https://github.com/solana-labs/solana/pull/24200#discussion_r849935444
if solana_vote_program::check_id(account.owner()) {
let new_vote_account = if account.lamports() != 0
&& VoteState::is_correct_size_and_initialized(account.data())

View File

@ -56,6 +56,10 @@ impl VoteAccount {
self.0.account.lamports()
}
pub(crate) fn owner(&self) -> &Pubkey {
self.0.account.owner()
}
pub fn vote_state(&self) -> RwLockReadGuard<Result<VoteState, InstructionError>> {
let inner = &self.0;
inner.vote_state_once.call_once(|| {

View File

@ -359,6 +359,10 @@ pub mod fix_recent_blockhashes {
solana_sdk::declare_id!("6iyggb5MTcsvdcugX7bEKbHV8c6jdLbpHwkncrgLMhfo");
}
pub mod update_rewards_from_cached_accounts {
solana_sdk::declare_id!("28s7i3htzhahXQKqmS2ExzbEoUypg9krwvtK2M9UWXh9");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -443,6 +447,7 @@ lazy_static! {
(drop_redundant_turbine_path::id(), "drop redundant turbine path"),
(executables_incur_cpi_data_cost::id(), "Executables incure CPI data costs"),
(fix_recent_blockhashes::id(), "stop adding hashes for skipped slots to recent blockhashes"),
(update_rewards_from_cached_accounts::id(), "update rewards from cached accounts"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()