Optimize stakes cache and rewards at epoch boundaries (#20432)

* Optimize stakes cache and rewards at epoch boundaries

* Fetch from accounts db

* Add cli flag for disabling epoch boundary optimization
This commit is contained in:
Justin Starry 2021-10-06 00:53:26 -04:00 committed by GitHub
parent 48d3627c8b
commit 129716f3f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 731 additions and 41 deletions

View File

@ -43,7 +43,7 @@ use {
},
solana_runtime::{
accounts_background_service::AbsRequestSender, bank::Bank, bank::ExecuteTimings,
bank_forks::BankForks, commitment::BlockCommitmentCache,
bank::NewBankOptions, bank_forks::BankForks, commitment::BlockCommitmentCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
@ -132,6 +132,7 @@ pub struct ReplayStageConfig {
pub wait_for_vote_to_start_leader: bool,
pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender,
pub tower_storage: Arc<dyn TowerStorage>,
pub disable_epoch_boundary_optimization: bool,
}
#[derive(Default)]
@ -341,6 +342,7 @@ impl ReplayStage {
wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
tower_storage,
disable_epoch_boundary_optimization,
} = config;
trace!("replay stage");
@ -769,6 +771,7 @@ impl ReplayStage {
&retransmit_slots_sender,
&mut skipped_slots_info,
has_new_vote_been_rooted,
disable_epoch_boundary_optimization,
);
let poh_bank = poh_recorder.lock().unwrap().bank();
@ -1341,6 +1344,7 @@ impl ReplayStage {
}
}
#[allow(clippy::too_many_arguments)]
fn maybe_start_leader(
my_pubkey: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
@ -1351,6 +1355,7 @@ impl ReplayStage {
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
has_new_vote_been_rooted: bool,
disable_epoch_boundary_optimization: bool,
) {
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention
@ -1468,7 +1473,10 @@ impl ReplayStage {
root_slot,
my_pubkey,
rpc_subscriptions,
vote_only_bank,
NewBankOptions {
vote_only_bank,
disable_epoch_boundary_optimization,
},
);
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
@ -2794,7 +2802,7 @@ impl ReplayStage {
forks.root(),
&leader,
rpc_subscriptions,
false,
NewBankOptions::default(),
);
let empty: Vec<Pubkey> = vec![];
Self::update_fork_propagated_threshold_from_votes(
@ -2821,10 +2829,10 @@ impl ReplayStage {
root_slot: u64,
leader: &Pubkey,
rpc_subscriptions: &Arc<RpcSubscriptions>,
vote_only_bank: bool,
new_bank_options: NewBankOptions,
) -> Bank {
rpc_subscriptions.notify_slot(slot, parent.slot(), root_slot);
Bank::new_from_parent_with_vote_only(parent, leader, slot, vote_only_bank)
Bank::new_from_parent_with_options(parent, leader, slot, new_bank_options)
}
fn record_rewards(bank: &Bank, rewards_recorder_sender: &Option<RewardsRecorderSender>) {

View File

@ -94,6 +94,7 @@ pub struct TvuConfig {
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
pub accounts_shrink_ratio: AccountShrinkThreshold,
pub disable_epoch_boundary_optimization: bool,
}
impl Tvu {
@ -282,6 +283,7 @@ impl Tvu {
wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader,
ancestor_hashes_replay_update_sender,
tower_storage: tower_storage.clone(),
disable_epoch_boundary_optimization: tvu_config.disable_epoch_boundary_optimization,
};
let (voting_sender, voting_receiver) = channel();

View File

@ -161,6 +161,7 @@ pub struct ValidatorConfig {
pub validator_exit: Arc<RwLock<Exit>>,
pub no_wait_for_vote_to_start_leader: bool,
pub accounts_shrink_ratio: AccountShrinkThreshold,
pub disable_epoch_boundary_optimization: bool,
}
impl Default for ValidatorConfig {
@ -221,6 +222,7 @@ impl Default for ValidatorConfig {
no_wait_for_vote_to_start_leader: true,
accounts_shrink_ratio: AccountShrinkThreshold::default(),
accounts_db_config: None,
disable_epoch_boundary_optimization: false,
}
}
}
@ -820,6 +822,7 @@ impl Validator {
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
accounts_shrink_ratio: config.accounts_shrink_ratio,
disable_epoch_boundary_optimization: config.disable_epoch_boundary_optimization,
},
&max_slots,
&cost_model,

View File

@ -60,6 +60,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader,
accounts_shrink_ratio: config.accounts_shrink_ratio,
accounts_db_config: config.accounts_db_config.clone(),
disable_epoch_boundary_optimization: config.disable_epoch_boundary_optimization,
}
}

View File

@ -1098,7 +1098,9 @@ fn stake_weighted_credits_observed(
// utility function, used by runtime
// returns a tuple of (stakers_reward,voters_reward)
pub fn redeem_rewards(
#[doc(hidden)]
#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")]
pub fn redeem_rewards_slow(
rewarded_epoch: Epoch,
stake_account: &mut AccountSharedData,
vote_account: &mut AccountSharedData,
@ -1147,7 +1149,9 @@ pub fn redeem_rewards(
}
// utility function, used by runtime
pub fn calculate_points(
#[doc(hidden)]
#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")]
pub fn calculate_points_slow(
stake_account: &AccountSharedData,
vote_account: &AccountSharedData,
stake_history: Option<&StakeHistory>,
@ -1167,6 +1171,74 @@ pub fn calculate_points(
}
}
// utility function, used by runtime
// returns a tuple of (stakers_reward,voters_reward)
#[doc(hidden)]
pub fn redeem_rewards(
rewarded_epoch: Epoch,
stake_state: StakeState,
stake_account: &mut AccountSharedData,
vote_state: &VoteState,
point_value: &PointValue,
stake_history: Option<&StakeHistory>,
inflation_point_calc_tracer: Option<impl Fn(&InflationPointCalculationEvent)>,
fix_activating_credits_observed: bool,
) -> Result<(u64, u64), InstructionError> {
if let StakeState::Stake(meta, mut stake) = stake_state {
if let Some(inflation_point_calc_tracer) = inflation_point_calc_tracer.as_ref() {
inflation_point_calc_tracer(
&InflationPointCalculationEvent::EffectiveStakeAtRewardedEpoch(
stake.stake(rewarded_epoch, stake_history),
),
);
inflation_point_calc_tracer(&InflationPointCalculationEvent::RentExemptReserve(
meta.rent_exempt_reserve,
));
inflation_point_calc_tracer(&InflationPointCalculationEvent::Commission(
vote_state.commission,
));
}
if let Some((stakers_reward, voters_reward)) = redeem_stake_rewards(
rewarded_epoch,
&mut stake,
point_value,
vote_state,
stake_history,
inflation_point_calc_tracer,
fix_activating_credits_observed,
) {
stake_account.checked_add_lamports(stakers_reward)?;
stake_account.set_state(&StakeState::Stake(meta, stake))?;
Ok((stakers_reward, voters_reward))
} else {
Err(StakeError::NoCreditsToRedeem.into())
}
} else {
Err(InstructionError::InvalidAccountData)
}
}
// utility function, used by runtime
#[doc(hidden)]
pub fn calculate_points(
stake_state: &StakeState,
vote_state: &VoteState,
stake_history: Option<&StakeHistory>,
) -> Result<u128, InstructionError> {
if let StakeState::Stake(_meta, stake) = stake_state {
Ok(calculate_stake_points(
stake,
vote_state,
stake_history,
null_tracer(),
))
} else {
Err(InstructionError::InvalidAccountData)
}
}
// utility function, used by Split
//This emulates current Rent math in order to preserve backward compatibility. In the future, and
//to support variable rent, the Split instruction should pass in the Rent sysvar instead.

View File

@ -61,9 +61,13 @@ use crate::{
vote_account::VoteAccount,
};
use byteorder::{ByteOrder, LittleEndian};
use dashmap::DashMap;
use itertools::Itertools;
use log::*;
use rayon::ThreadPool;
use rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
ThreadPool, ThreadPoolBuilder,
};
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_info};
use solana_program_runtime::{ExecuteDetailsTimings, Executors, InstructionProcessor};
@ -105,7 +109,6 @@ use solana_sdk::{
signature::{Keypair, Signature},
slot_hashes::SlotHashes,
slot_history::SlotHistory,
stake::{self, state::Delegation},
system_transaction,
sysvar::{self},
timing::years_as_slots,
@ -113,7 +116,9 @@ use solana_sdk::{
Result, SanitizedTransaction, Transaction, TransactionError, VersionedTransaction,
},
};
use solana_stake_program::stake_state::{self, InflationPointCalculationEvent, PointValue};
use solana_stake_program::stake_state::{
self, Delegation, InflationPointCalculationEvent, PointValue, StakeState,
};
use solana_vote_program::{
vote_instruction::VoteInstruction,
vote_state::{VoteState, VoteStateVersions},
@ -996,6 +1001,18 @@ impl Default for BlockhashQueue {
}
}
struct VoteWithStakeDelegations {
vote_state: Arc<VoteState>,
vote_account: AccountSharedData,
delegations: Vec<(Pubkey, (StakeState, AccountSharedData))>,
}
#[derive(Debug, Default)]
pub struct NewBankOptions {
pub vote_only_bank: bool,
pub disable_epoch_boundary_optimization: bool,
}
impl Bank {
pub fn default_for_tests() -> Self {
Self::default_with_accounts(Accounts::default_for_tests())
@ -1228,16 +1245,22 @@ impl Bank {
/// Create a new bank that points to an immutable checkpoint of another bank.
pub fn new_from_parent(parent: &Arc<Bank>, collector_id: &Pubkey, slot: Slot) -> Self {
Self::_new_from_parent(parent, collector_id, slot, null_tracer(), false)
Self::_new_from_parent(
parent,
collector_id,
slot,
null_tracer(),
NewBankOptions::default(),
)
}
pub fn new_from_parent_with_vote_only(
pub fn new_from_parent_with_options(
parent: &Arc<Bank>,
collector_id: &Pubkey,
slot: Slot,
vote_only_bank: bool,
new_bank_options: NewBankOptions,
) -> Self {
Self::_new_from_parent(parent, collector_id, slot, null_tracer(), vote_only_bank)
Self::_new_from_parent(parent, collector_id, slot, null_tracer(), new_bank_options)
}
pub fn new_from_parent_with_tracer(
@ -1246,7 +1269,13 @@ impl Bank {
slot: Slot,
reward_calc_tracer: impl Fn(&RewardCalculationEvent) + Send + Sync,
) -> Self {
Self::_new_from_parent(parent, collector_id, slot, Some(reward_calc_tracer), false)
Self::_new_from_parent(
parent,
collector_id,
slot,
Some(reward_calc_tracer),
NewBankOptions::default(),
)
}
fn _new_from_parent(
@ -1254,8 +1283,13 @@ impl Bank {
collector_id: &Pubkey,
slot: Slot,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
vote_only_bank: bool,
new_bank_options: NewBankOptions,
) -> Self {
let NewBankOptions {
vote_only_bank,
disable_epoch_boundary_optimization,
} = new_bank_options;
parent.freeze();
assert_ne!(slot, parent.slot());
@ -1369,6 +1403,45 @@ impl Bank {
new.apply_feature_activations(false, false);
}
let optimize_epoch_boundary_updates = !disable_epoch_boundary_optimization
&& new
.feature_set
.is_active(&feature_set::optimize_epoch_boundary_updates::id());
if optimize_epoch_boundary_updates {
if parent_epoch < new.epoch() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
// Add new entry to stakes.stake_history, set appropriate epoch and
// update vote accounts with warmed up stakes before saving a
// snapshot of stakes in epoch stakes
new.stakes
.write()
.unwrap()
.activate_epoch(epoch, &thread_pool);
// Save a snapshot of stakes for use in consensus and stake weighted networking
let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot);
new.update_epoch_stakes(leader_schedule_epoch);
// After saving a snapshot of stakes, apply stake rewards and commission
new.update_rewards_with_thread_pool(parent_epoch, reward_calc_tracer, &thread_pool);
} else {
// Save a snapshot of stakes for use in consensus and stake weighted networking
let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot);
new.update_epoch_stakes(leader_schedule_epoch);
}
// Update sysvars before processing transactions
new.update_slot_hashes();
new.update_stake_history(Some(parent_epoch));
new.update_clock(Some(parent_epoch));
new.update_fees();
return new;
}
#[allow(deprecated)]
let cloned = new.stakes.read().unwrap().clone_with_epoch(epoch);
*new.stakes.write().unwrap() = cloned;
@ -1961,6 +2034,7 @@ impl Bank {
let old_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked();
#[allow(deprecated)]
let validator_point_value = self.pay_validator_rewards(
prev_epoch,
validator_rewards,
@ -2034,11 +2108,110 @@ impl Bank {
);
}
// update rewards based on the previous epoch
fn update_rewards_with_thread_pool(
&mut self,
prev_epoch: Epoch,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
thread_pool: &ThreadPool,
) {
let slot_in_year = self.slot_in_year_for_inflation();
let epoch_duration_in_years = self.epoch_duration_in_years(prev_epoch);
let (validator_rate, foundation_rate) = {
let inflation = self.inflation.read().unwrap();
(
(*inflation).validator(slot_in_year),
(*inflation).foundation(slot_in_year),
)
};
let capitalization = self.capitalization();
let validator_rewards =
(validator_rate * capitalization as f64 * epoch_duration_in_years) as u64;
let old_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked();
let validator_point_value = self.pay_validator_rewards_with_thread_pool(
prev_epoch,
validator_rewards,
reward_calc_tracer,
self.stake_program_advance_activating_credits_observed(),
thread_pool,
);
if !self
.feature_set
.is_active(&feature_set::deprecate_rewards_sysvar::id())
{
// this sysvar can be retired once `pico_inflation` is enabled on all clusters
self.update_sysvar_account(&sysvar::rewards::id(), |account| {
create_account(
&sysvar::rewards::Rewards::new(validator_point_value),
self.inherit_specially_retained_account_fields(account),
)
});
}
let new_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked();
let validator_rewards_paid = new_vote_balance_and_staked - old_vote_balance_and_staked;
assert_eq!(
validator_rewards_paid,
u64::try_from(
self.rewards
.read()
.unwrap()
.iter()
.map(|(_address, reward_info)| {
match reward_info.reward_type {
RewardType::Voting | RewardType::Staking => reward_info.lamports,
_ => 0,
}
})
.sum::<i64>()
)
.unwrap()
);
// verify that we didn't pay any more than we expected to
assert!(validator_rewards >= validator_rewards_paid);
info!(
"distributed inflation: {} (rounded from: {})",
validator_rewards_paid, validator_rewards
);
self.capitalization
.fetch_add(validator_rewards_paid, Relaxed);
let active_stake = if let Some(stake_history_entry) =
self.stakes.read().unwrap().history().get(&prev_epoch)
{
stake_history_entry.effective
} else {
0
};
datapoint_warn!(
"epoch_rewards",
("slot", self.slot, i64),
("epoch", prev_epoch, i64),
("validator_rate", validator_rate, f64),
("foundation_rate", foundation_rate, f64),
("epoch_duration_in_years", epoch_duration_in_years, f64),
("validator_rewards", validator_rewards_paid, i64),
("active_stake", active_stake, i64),
("pre_capitalization", capitalization, i64),
("post_capitalization", self.capitalization(), i64)
);
}
/// map stake delegations into resolved (pubkey, account) pairs
/// returns a map (has to be copied) of loaded
/// ( Vec<(staker info)> (voter account) ) keyed by voter pubkey
///
/// Filters out invalid pairs
#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")]
fn stake_delegation_accounts(
&self,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
@ -2069,7 +2242,7 @@ impl Bank {
if self
.feature_set
.is_active(&feature_set::filter_stake_delegation_accounts::id())
&& (stake_account.owner() != &stake::program::id()
&& (stake_account.owner() != &solana_stake_program::id()
|| vote_account.owner() != &solana_vote_program::id())
{
datapoint_warn!(
@ -2096,8 +2269,116 @@ impl Bank {
accounts
}
/// map stake delegations into resolved (pubkey, account) pairs
/// returns a map (has to be copied) of loaded
/// ( Vec<(staker info)> (voter account) ) keyed by voter pubkey
///
/// Filters out invalid pairs
fn load_vote_and_stake_accounts_with_thread_pool(
&self,
thread_pool: &ThreadPool,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
) -> DashMap<Pubkey, VoteWithStakeDelegations> {
let filter_stake_delegation_accounts = self
.feature_set
.is_active(&feature_set::filter_stake_delegation_accounts::id());
let stakes = self.stakes.read().unwrap();
let accounts = DashMap::with_capacity(stakes.vote_accounts().as_ref().len());
thread_pool.install(|| {
stakes
.stake_delegations()
.par_iter()
.for_each(|(stake_pubkey, delegation)| {
let vote_pubkey = &delegation.voter_pubkey;
let stake_account = match self.get_account_with_fixed_root(stake_pubkey) {
Some(stake_account) => stake_account,
None => return,
};
// fetch vote account from stakes cache if it hasn't been cached locally
let fetched_vote_account = if !accounts.contains_key(vote_pubkey) {
let vote_account = match self.get_account_with_fixed_root(vote_pubkey) {
Some(vote_account) => vote_account,
None => return,
};
let vote_state: VoteState =
match StateMut::<VoteStateVersions>::state(&vote_account) {
Ok(vote_state) => vote_state.convert_to_current(),
Err(err) => {
debug!(
"failed to deserialize vote account {}: {}",
vote_pubkey, err
);
return;
}
};
Some((vote_state, vote_account))
} else {
None
};
let fetched_vote_account_owner = fetched_vote_account
.as_ref()
.map(|(_vote_state, vote_account)| vote_account.owner());
if let Some(reward_calc_tracer) = reward_calc_tracer.as_ref() {
reward_calc_tracer(&RewardCalculationEvent::Staking(
stake_pubkey,
&InflationPointCalculationEvent::Delegation(
*delegation,
fetched_vote_account_owner
.cloned()
.unwrap_or_else(solana_vote_program::id),
),
));
}
// filter invalid delegation accounts
if filter_stake_delegation_accounts
&& (stake_account.owner() != &solana_stake_program::id()
|| (fetched_vote_account_owner.is_some()
&& fetched_vote_account_owner != Some(&solana_vote_program::id())))
{
datapoint_warn!(
"bank-stake_delegation_accounts-invalid-account",
("slot", self.slot() as i64, i64),
("stake-address", format!("{:?}", stake_pubkey), String),
("vote-address", format!("{:?}", vote_pubkey), String),
);
return;
}
let stake_delegation = match stake_account.state().ok() {
Some(stake_state) => (*stake_pubkey, (stake_state, stake_account)),
None => return,
};
if let Some((vote_state, vote_account)) = fetched_vote_account {
accounts
.entry(*vote_pubkey)
.or_insert_with(|| VoteWithStakeDelegations {
vote_state: Arc::new(vote_state),
vote_account,
delegations: vec![],
});
}
if let Some(mut stake_delegation_accounts) = accounts.get_mut(vote_pubkey) {
stake_delegation_accounts.delegations.push(stake_delegation);
}
});
});
accounts
}
/// iterate over all stakes, redeem vote credits for each stake we can
/// successfully load and parse, return the lamport value of one point
#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")]
fn pay_validator_rewards(
&mut self,
rewarded_epoch: Epoch,
@ -2107,6 +2388,7 @@ impl Bank {
) -> f64 {
let stake_history = self.stakes.read().unwrap().history().clone();
#[allow(deprecated)]
let mut stake_delegation_accounts =
self.stake_delegation_accounts(reward_calc_tracer.as_ref());
@ -2118,8 +2400,13 @@ impl Bank {
.map(move |(_stake_pubkey, stake_account)| (stake_account, vote_account))
})
.map(|(stake_account, vote_account)| {
stake_state::calculate_points(stake_account, vote_account, Some(&stake_history))
.unwrap_or(0)
#[allow(deprecated)]
stake_state::calculate_points_slow(
stake_account,
vote_account,
Some(&stake_history),
)
.unwrap_or(0)
})
.sum();
@ -2155,7 +2442,8 @@ impl Bank {
outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event))
}
});
let redeemed = stake_state::redeem_rewards(
#[allow(deprecated)]
let redeemed = stake_state::redeem_rewards_slow(
rewarded_epoch,
stake_account,
vote_account,
@ -2210,6 +2498,176 @@ impl Bank {
point_value.rewards as f64 / point_value.points as f64
}
/// iterate over all stakes, redeem vote credits for each stake we can
/// successfully load and parse, return the lamport value of one point
fn pay_validator_rewards_with_thread_pool(
&mut self,
rewarded_epoch: Epoch,
rewards: u64,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
fix_activating_credits_observed: bool,
thread_pool: &ThreadPool,
) -> f64 {
let stake_history = self.stakes.read().unwrap().history().clone();
let vote_and_stake_accounts = self.load_vote_and_stake_accounts_with_thread_pool(
thread_pool,
reward_calc_tracer.as_ref(),
);
let points: u128 = thread_pool.install(|| {
vote_and_stake_accounts
.par_iter()
.map(|entry| {
let VoteWithStakeDelegations {
vote_state,
delegations,
..
} = entry.value();
delegations
.par_iter()
.map(|(_stake_pubkey, (stake_state, _stake_account))| {
stake_state::calculate_points(
stake_state,
vote_state,
Some(&stake_history),
)
.unwrap_or(0)
})
.sum::<u128>()
})
.sum()
});
if points == 0 {
return 0.0;
}
// pay according to point value
let point_value = PointValue { rewards, points };
let vote_account_rewards: DashMap<Pubkey, (AccountSharedData, u8, u64, bool)> =
DashMap::with_capacity(vote_and_stake_accounts.len());
let stake_delegation_iterator = vote_and_stake_accounts.into_par_iter().flat_map(
|(
vote_pubkey,
VoteWithStakeDelegations {
vote_state,
vote_account,
delegations,
},
)| {
vote_account_rewards
.insert(vote_pubkey, (vote_account, vote_state.commission, 0, false));
delegations
.into_par_iter()
.map(move |delegation| (vote_pubkey, Arc::clone(&vote_state), delegation))
},
);
let mut stake_rewards = thread_pool.install(|| {
stake_delegation_iterator
.filter_map(
|(
vote_pubkey,
vote_state,
(stake_pubkey, (stake_state, mut stake_account)),
)| {
// curry closure to add the contextual stake_pubkey
let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| {
// inner
move |inner_event: &_| {
outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event))
}
});
let redeemed = stake_state::redeem_rewards(
rewarded_epoch,
stake_state,
&mut stake_account,
&vote_state,
&point_value,
Some(&stake_history),
reward_calc_tracer.as_ref(),
fix_activating_credits_observed,
);
if let Ok((stakers_reward, voters_reward)) = redeemed {
// track voter rewards
if let Some((
_vote_account,
_commission,
vote_rewards_sum,
vote_needs_store,
)) = vote_account_rewards.get_mut(&vote_pubkey).as_deref_mut()
{
*vote_needs_store = true;
*vote_rewards_sum = vote_rewards_sum.saturating_add(voters_reward);
}
// store stake account even if stakers_reward is 0
// because credits observed has changed
self.store_account(&stake_pubkey, &stake_account);
if stakers_reward > 0 {
return Some((
stake_pubkey,
RewardInfo {
reward_type: RewardType::Staking,
lamports: stakers_reward as i64,
post_balance: stake_account.lamports(),
commission: Some(vote_state.commission),
},
));
}
} else {
debug!(
"stake_state::redeem_rewards() failed for {}: {:?}",
stake_pubkey, redeemed
);
}
None
},
)
.collect()
});
let mut vote_rewards = vote_account_rewards
.into_iter()
.filter_map(
|(vote_pubkey, (mut vote_account, commission, vote_rewards, vote_needs_store))| {
if let Err(err) = vote_account.checked_add_lamports(vote_rewards) {
debug!("reward redemption failed for {}: {:?}", vote_pubkey, err);
return None;
}
if vote_needs_store {
self.store_account(&vote_pubkey, &vote_account);
}
if vote_rewards > 0 {
Some((
vote_pubkey,
RewardInfo {
reward_type: RewardType::Voting,
lamports: vote_rewards as i64,
post_balance: vote_account.lamports(),
commission: Some(commission),
},
))
} else {
None
}
},
)
.collect();
{
let mut rewards = self.rewards.write().unwrap();
rewards.append(&mut vote_rewards);
rewards.append(&mut stake_rewards);
}
point_value.rewards as f64 / point_value.points as f64
}
fn update_recent_blockhashes_locked(&self, locked_blockhash_queue: &BlockhashQueue) {
#[allow(deprecated)]
self.update_sysvar_account(&sysvar::recent_blockhashes::id(), |account| {
@ -7831,17 +8289,28 @@ pub(crate) mod tests {
}
bank0.store_account_and_update_capitalization(&vote_id, &vote_account);
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let validator_points: u128 = bank0
.stake_delegation_accounts(null_tracer())
.iter()
.flat_map(|(_vote_pubkey, (stake_group, vote_account))| {
stake_group
.iter()
.map(move |(_stake_pubkey, stake_account)| (stake_account, vote_account))
})
.map(|(stake_account, vote_account)| {
stake_state::calculate_points(stake_account, vote_account, None).unwrap_or(0)
})
.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer())
.into_iter()
.map(
|(
_vote_pubkey,
VoteWithStakeDelegations {
vote_state,
delegations,
..
},
)| {
delegations
.iter()
.map(move |(_stake_pubkey, (stake_state, _stake_account))| {
stake_state::calculate_points(stake_state, &vote_state, None)
.unwrap_or(0)
})
.sum::<u128>()
},
)
.sum();
// put a child bank in epoch 1, which calls update_rewards()...
@ -13857,8 +14326,10 @@ pub(crate) mod tests {
vec![10_000; 2],
);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let stake_delegation_accounts = bank.stake_delegation_accounts(null_tracer());
assert_eq!(stake_delegation_accounts.len(), 2);
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let vote_and_stake_accounts =
bank.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer());
assert_eq!(vote_and_stake_accounts.len(), 2);
let mut vote_account = bank
.get_account(&validator_vote_keypairs0.vote_keypair.pubkey())
@ -13896,8 +14367,10 @@ pub(crate) mod tests {
);
// Accounts must be valid stake and vote accounts
let stake_delegation_accounts = bank.stake_delegation_accounts(null_tracer());
assert_eq!(stake_delegation_accounts.len(), 0);
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let vote_and_stake_accounts =
bank.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer());
assert_eq!(vote_and_stake_accounts.len(), 0);
}
#[test]

View File

@ -1,14 +1,18 @@
//! Stakes serve as a cache of stake and vote accounts to derive
//! node stakes
use {
crate::vote_account::{VoteAccount, VoteAccounts},
crate::vote_account::{VoteAccount, VoteAccounts, VoteAccountsHashMap},
rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Epoch,
pubkey::Pubkey,
stake::{
self,
state::{Delegation, StakeState},
state::{Delegation, StakeActivationStatus, StakeState},
},
stake_history::StakeHistory,
},
@ -39,6 +43,8 @@ impl Stakes {
pub fn history(&self) -> &StakeHistory {
&self.stake_history
}
#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")]
pub fn clone_with_epoch(&self, next_epoch: Epoch) -> Self {
let prev_epoch = self.epoch;
if prev_epoch == next_epoch {
@ -81,6 +87,73 @@ impl Stakes {
}
}
pub fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) {
let prev_epoch = self.epoch;
self.epoch = next_epoch;
thread_pool.install(|| {
let stake_delegations = &self.stake_delegations;
let stake_history = &mut self.stake_history;
let vote_accounts: &VoteAccountsHashMap = self.vote_accounts.as_ref();
// construct map of vote pubkey -> list of stake delegations
let vote_delegations: HashMap<Pubkey, Vec<&Delegation>> = {
let mut vote_delegations = HashMap::with_capacity(vote_accounts.len());
stake_delegations
.iter()
.for_each(|(_stake_pubkey, delegation)| {
let vote_pubkey = &delegation.voter_pubkey;
vote_delegations
.entry(*vote_pubkey)
.and_modify(|delegations: &mut Vec<_>| delegations.push(delegation))
.or_insert_with(|| vec![delegation]);
});
vote_delegations
};
// wrap up the prev epoch by adding new stake history entry for the prev epoch
{
let stake_history_entry = vote_delegations
.par_iter()
.map(|(_vote_pubkey, delegations)| {
delegations
.par_iter()
.map(|delegation| {
delegation.stake_activating_and_deactivating(
prev_epoch,
Some(stake_history),
)
})
.reduce(StakeActivationStatus::default, |a, b| a + b)
})
.reduce(StakeActivationStatus::default, |a, b| a + b);
stake_history.add(prev_epoch, stake_history_entry);
}
// refresh the stake distribution of vote accounts for the next epoch, using new stake history
let vote_accounts_for_next_epoch: VoteAccountsHashMap = vote_accounts
.par_iter()
.map(|(vote_pubkey, (_stake, vote_account))| {
let delegated_stake = vote_delegations
.get(vote_pubkey)
.map(|delegations| {
delegations
.par_iter()
.map(|delegation| delegation.stake(next_epoch, Some(stake_history)))
.sum()
})
.unwrap_or_default();
(*vote_pubkey, (delegated_stake, vote_account.clone()))
})
.collect();
// overwrite vote accounts so that staked nodes singleton is reset
self.vote_accounts = VoteAccounts::from(Arc::new(vote_accounts_for_next_epoch));
});
}
// sum the stakes that point to the given voter_pubkey
fn calculate_stake(
&self,
@ -228,6 +301,7 @@ impl Stakes {
#[cfg(test)]
pub mod tests {
use super::*;
use rayon::ThreadPoolBuilder;
use solana_sdk::{account::WritableAccount, pubkey::Pubkey, rent::Rent};
use solana_stake_program::stake_state;
use solana_vote_program::vote_state::{self, VoteState, VoteStateVersions};
@ -512,6 +586,7 @@ pub mod tests {
assert_eq!(vote_accounts.get(&vote_pubkey).unwrap().0, 20);
}
}
#[test]
fn test_clone_with_epoch() {
let mut stakes = Stakes::default();
@ -530,6 +605,7 @@ pub mod tests {
stake.stake(stakes.epoch, Some(&stakes.stake_history))
);
}
#[allow(deprecated)]
let stakes = stakes.clone_with_epoch(3);
{
let vote_accounts = stakes.vote_accounts();
@ -540,6 +616,35 @@ pub mod tests {
}
}
#[test]
fn test_activate_epoch() {
let mut stakes = Stakes::default();
let ((vote_pubkey, vote_account), (stake_pubkey, stake_account)) =
create_staked_node_accounts(10);
stakes.store(&vote_pubkey, &vote_account, true, true);
stakes.store(&stake_pubkey, &stake_account, true, true);
let stake = stake_state::stake_from(&stake_account).unwrap();
{
let vote_accounts = stakes.vote_accounts();
assert_eq!(
vote_accounts.get(&vote_pubkey).unwrap().0,
stake.stake(stakes.epoch, Some(&stakes.stake_history))
);
}
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
stakes.activate_epoch(3, &thread_pool);
{
let vote_accounts = stakes.vote_accounts();
assert_eq!(
vote_accounts.get(&vote_pubkey).unwrap().0,
stake.stake(stakes.epoch, Some(&stakes.stake_history))
);
}
}
#[test]
fn test_stakes_not_delegate() {
let mut stakes = Stakes {
@ -600,8 +705,9 @@ pub mod tests {
assert_eq!(stakes.vote_balance_and_staked(), 11);
assert_eq!(stakes.vote_balance_and_warmed_staked(), 1);
let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
for (epoch, expected_warmed_stake) in ((genesis_epoch + 1)..=3).zip(&[2, 3, 4]) {
stakes = stakes.clone_with_epoch(epoch);
stakes.activate_epoch(epoch, &thread_pool);
// vote_balance_and_staked() always remain to return same lamports
// while vote_balance_and_warmed_staked() gradually increases
assert_eq!(stakes.vote_balance_and_staked(), 11);

View File

@ -29,9 +29,11 @@ struct VoteAccountInner {
vote_state_once: Once,
}
pub type VoteAccountsHashMap = HashMap<Pubkey, (/*stake:*/ u64, VoteAccount)>;
#[derive(Debug, AbiExample)]
pub struct VoteAccounts {
vote_accounts: Arc<HashMap<Pubkey, (/*stake:*/ u64, VoteAccount)>>,
vote_accounts: Arc<VoteAccountsHashMap>,
// Inner Arc is meant to implement copy-on-write semantics as opposed to
// sharing mutations (hence RwLock<Arc<...>> instead of Arc<RwLock<...>>).
staked_nodes: RwLock<
@ -46,8 +48,12 @@ pub struct VoteAccounts {
}
impl VoteAccount {
pub fn account(&self) -> &Account {
&self.0.account
}
pub(crate) fn lamports(&self) -> u64 {
self.0.account.lamports
self.account().lamports
}
pub fn vote_state(&self) -> RwLockReadGuard<Result<VoteState, InstructionError>> {
@ -192,6 +198,12 @@ impl From<Account> for VoteAccount {
}
}
impl AsRef<VoteAccountInner> for VoteAccount {
fn as_ref(&self) -> &VoteAccountInner {
&self.0
}
}
impl From<AccountSharedData> for VoteAccountInner {
fn from(account: AccountSharedData) -> Self {
Self::from(Account::from(account))
@ -261,8 +273,6 @@ impl PartialEq<VoteAccounts> for VoteAccounts {
}
}
type VoteAccountsHashMap = HashMap<Pubkey, (/*stake:*/ u64, VoteAccount)>;
impl From<Arc<VoteAccountsHashMap>> for VoteAccounts {
fn from(vote_accounts: Arc<VoteAccountsHashMap>) -> Self {
Self {

View File

@ -229,6 +229,10 @@ pub mod prevent_calling_precompiles_as_programs {
solana_sdk::declare_id!("4ApgRX3ud6p7LNMJmsuaAcZY5HWctGPr5obAsjB3A54d");
}
pub mod optimize_epoch_boundary_updates {
solana_sdk::declare_id!("265hPS8k8xJ37ot82KEgjRunsUp5w4n4Q4VwwiN9i9ps");
}
lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -281,6 +285,7 @@ lazy_static! {
(stakes_remove_delegation_if_inactive::id(), "remove delegations from stakes cache when inactive"),
(do_support_realloc::id(), "support account data reallocation"),
(prevent_calling_precompiles_as_programs::id(), "Prevent calling precompiles as programs"),
(optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()

View File

@ -1443,6 +1443,14 @@ pub fn main() {
.help("Allow contacting private ip addresses")
.hidden(true),
)
.arg(
Arg::with_name("disable_epoch_boundary_optimization")
.long("disable-epoch-boundary-optimization")
.takes_value(false)
.help("Disables epoch boundary optimization and overrides the \
optimize_epoch_boundary_updates feature switch if enabled.")
.hidden(true),
)
.after_help("The default subcommand is run")
.subcommand(
SubCommand::with_name("exit")
@ -2072,6 +2080,8 @@ pub fn main() {
tpu_coalesce_ms,
no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"),
accounts_shrink_ratio,
disable_epoch_boundary_optimization: matches
.is_present("disable_epoch_boundary_optimization"),
..ValidatorConfig::default()
};