diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3825422f8c..ea898bfb17 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -43,7 +43,7 @@ use { }, solana_runtime::{ accounts_background_service::AbsRequestSender, bank::Bank, bank::ExecuteTimings, - bank_forks::BankForks, commitment::BlockCommitmentCache, + bank::NewBankOptions, bank_forks::BankForks, commitment::BlockCommitmentCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{ @@ -132,6 +132,7 @@ pub struct ReplayStageConfig { pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, pub tower_storage: Arc, + pub disable_epoch_boundary_optimization: bool, } #[derive(Default)] @@ -341,6 +342,7 @@ impl ReplayStage { wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, tower_storage, + disable_epoch_boundary_optimization, } = config; trace!("replay stage"); @@ -769,6 +771,7 @@ impl ReplayStage { &retransmit_slots_sender, &mut skipped_slots_info, has_new_vote_been_rooted, + disable_epoch_boundary_optimization, ); let poh_bank = poh_recorder.lock().unwrap().bank(); @@ -1341,6 +1344,7 @@ impl ReplayStage { } } + #[allow(clippy::too_many_arguments)] fn maybe_start_leader( my_pubkey: &Pubkey, bank_forks: &Arc>, @@ -1351,6 +1355,7 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, has_new_vote_been_rooted: bool, + disable_epoch_boundary_optimization: bool, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1468,7 +1473,10 @@ impl ReplayStage { root_slot, my_pubkey, rpc_subscriptions, - vote_only_bank, + NewBankOptions { + vote_only_bank, + disable_epoch_boundary_optimization, + }, ); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); @@ -2794,7 +2802,7 @@ impl ReplayStage { forks.root(), &leader, rpc_subscriptions, - false, + NewBankOptions::default(), ); let empty: Vec = vec![]; Self::update_fork_propagated_threshold_from_votes( @@ -2821,10 +2829,10 @@ impl ReplayStage { root_slot: u64, leader: &Pubkey, rpc_subscriptions: &Arc, - vote_only_bank: bool, + new_bank_options: NewBankOptions, ) -> Bank { rpc_subscriptions.notify_slot(slot, parent.slot(), root_slot); - Bank::new_from_parent_with_vote_only(parent, leader, slot, vote_only_bank) + Bank::new_from_parent_with_options(parent, leader, slot, new_bank_options) } fn record_rewards(bank: &Bank, rewards_recorder_sender: &Option) { diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 45810818fb..cf3ae2e1e7 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -94,6 +94,7 @@ pub struct TvuConfig { pub rocksdb_max_compaction_jitter: Option, pub wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, + pub disable_epoch_boundary_optimization: bool, } impl Tvu { @@ -282,6 +283,7 @@ impl Tvu { wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, tower_storage: tower_storage.clone(), + disable_epoch_boundary_optimization: tvu_config.disable_epoch_boundary_optimization, }; let (voting_sender, voting_receiver) = channel(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 5bdf99edcc..70784ada06 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -161,6 +161,7 @@ pub struct ValidatorConfig { pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, + pub disable_epoch_boundary_optimization: bool, } impl Default for ValidatorConfig { @@ -221,6 +222,7 @@ impl Default for ValidatorConfig { no_wait_for_vote_to_start_leader: true, accounts_shrink_ratio: AccountShrinkThreshold::default(), accounts_db_config: None, + disable_epoch_boundary_optimization: false, } } } @@ -820,6 +822,7 @@ impl Validator { rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, + disable_epoch_boundary_optimization: config.disable_epoch_boundary_optimization, }, &max_slots, &cost_model, diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 50a5943ac1..6a93842585 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -60,6 +60,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, accounts_db_config: config.accounts_db_config.clone(), + disable_epoch_boundary_optimization: config.disable_epoch_boundary_optimization, } } diff --git a/programs/stake/src/stake_state.rs b/programs/stake/src/stake_state.rs index cff0e0ebb7..8b07e0c1e2 100644 --- a/programs/stake/src/stake_state.rs +++ b/programs/stake/src/stake_state.rs @@ -1098,7 +1098,9 @@ fn stake_weighted_credits_observed( // utility function, used by runtime // returns a tuple of (stakers_reward,voters_reward) -pub fn redeem_rewards( +#[doc(hidden)] +#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] +pub fn redeem_rewards_slow( rewarded_epoch: Epoch, stake_account: &mut AccountSharedData, vote_account: &mut AccountSharedData, @@ -1147,7 +1149,9 @@ pub fn redeem_rewards( } // utility function, used by runtime -pub fn calculate_points( +#[doc(hidden)] +#[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] +pub fn calculate_points_slow( stake_account: &AccountSharedData, vote_account: &AccountSharedData, stake_history: Option<&StakeHistory>, @@ -1167,6 +1171,74 @@ pub fn calculate_points( } } +// utility function, used by runtime +// returns a tuple of (stakers_reward,voters_reward) +#[doc(hidden)] +pub fn redeem_rewards( + rewarded_epoch: Epoch, + stake_state: StakeState, + stake_account: &mut AccountSharedData, + vote_state: &VoteState, + point_value: &PointValue, + stake_history: Option<&StakeHistory>, + inflation_point_calc_tracer: Option, + fix_activating_credits_observed: bool, +) -> Result<(u64, u64), InstructionError> { + if let StakeState::Stake(meta, mut stake) = stake_state { + if let Some(inflation_point_calc_tracer) = inflation_point_calc_tracer.as_ref() { + inflation_point_calc_tracer( + &InflationPointCalculationEvent::EffectiveStakeAtRewardedEpoch( + stake.stake(rewarded_epoch, stake_history), + ), + ); + inflation_point_calc_tracer(&InflationPointCalculationEvent::RentExemptReserve( + meta.rent_exempt_reserve, + )); + inflation_point_calc_tracer(&InflationPointCalculationEvent::Commission( + vote_state.commission, + )); + } + + if let Some((stakers_reward, voters_reward)) = redeem_stake_rewards( + rewarded_epoch, + &mut stake, + point_value, + vote_state, + stake_history, + inflation_point_calc_tracer, + fix_activating_credits_observed, + ) { + stake_account.checked_add_lamports(stakers_reward)?; + stake_account.set_state(&StakeState::Stake(meta, stake))?; + + Ok((stakers_reward, voters_reward)) + } else { + Err(StakeError::NoCreditsToRedeem.into()) + } + } else { + Err(InstructionError::InvalidAccountData) + } +} + +// utility function, used by runtime +#[doc(hidden)] +pub fn calculate_points( + stake_state: &StakeState, + vote_state: &VoteState, + stake_history: Option<&StakeHistory>, +) -> Result { + if let StakeState::Stake(_meta, stake) = stake_state { + Ok(calculate_stake_points( + stake, + vote_state, + stake_history, + null_tracer(), + )) + } else { + Err(InstructionError::InvalidAccountData) + } +} + // utility function, used by Split //This emulates current Rent math in order to preserve backward compatibility. In the future, and //to support variable rent, the Split instruction should pass in the Rent sysvar instead. diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index a787fb2bce..85f07bdcba 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -61,9 +61,13 @@ use crate::{ vote_account::VoteAccount, }; use byteorder::{ByteOrder, LittleEndian}; +use dashmap::DashMap; use itertools::Itertools; use log::*; -use rayon::ThreadPool; +use rayon::{ + iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}, + ThreadPool, ThreadPoolBuilder, +}; use solana_measure::measure::Measure; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_info}; use solana_program_runtime::{ExecuteDetailsTimings, Executors, InstructionProcessor}; @@ -105,7 +109,6 @@ use solana_sdk::{ signature::{Keypair, Signature}, slot_hashes::SlotHashes, slot_history::SlotHistory, - stake::{self, state::Delegation}, system_transaction, sysvar::{self}, timing::years_as_slots, @@ -113,7 +116,9 @@ use solana_sdk::{ Result, SanitizedTransaction, Transaction, TransactionError, VersionedTransaction, }, }; -use solana_stake_program::stake_state::{self, InflationPointCalculationEvent, PointValue}; +use solana_stake_program::stake_state::{ + self, Delegation, InflationPointCalculationEvent, PointValue, StakeState, +}; use solana_vote_program::{ vote_instruction::VoteInstruction, vote_state::{VoteState, VoteStateVersions}, @@ -996,6 +1001,18 @@ impl Default for BlockhashQueue { } } +struct VoteWithStakeDelegations { + vote_state: Arc, + vote_account: AccountSharedData, + delegations: Vec<(Pubkey, (StakeState, AccountSharedData))>, +} + +#[derive(Debug, Default)] +pub struct NewBankOptions { + pub vote_only_bank: bool, + pub disable_epoch_boundary_optimization: bool, +} + impl Bank { pub fn default_for_tests() -> Self { Self::default_with_accounts(Accounts::default_for_tests()) @@ -1228,16 +1245,22 @@ impl Bank { /// Create a new bank that points to an immutable checkpoint of another bank. pub fn new_from_parent(parent: &Arc, collector_id: &Pubkey, slot: Slot) -> Self { - Self::_new_from_parent(parent, collector_id, slot, null_tracer(), false) + Self::_new_from_parent( + parent, + collector_id, + slot, + null_tracer(), + NewBankOptions::default(), + ) } - pub fn new_from_parent_with_vote_only( + pub fn new_from_parent_with_options( parent: &Arc, collector_id: &Pubkey, slot: Slot, - vote_only_bank: bool, + new_bank_options: NewBankOptions, ) -> Self { - Self::_new_from_parent(parent, collector_id, slot, null_tracer(), vote_only_bank) + Self::_new_from_parent(parent, collector_id, slot, null_tracer(), new_bank_options) } pub fn new_from_parent_with_tracer( @@ -1246,7 +1269,13 @@ impl Bank { slot: Slot, reward_calc_tracer: impl Fn(&RewardCalculationEvent) + Send + Sync, ) -> Self { - Self::_new_from_parent(parent, collector_id, slot, Some(reward_calc_tracer), false) + Self::_new_from_parent( + parent, + collector_id, + slot, + Some(reward_calc_tracer), + NewBankOptions::default(), + ) } fn _new_from_parent( @@ -1254,8 +1283,13 @@ impl Bank { collector_id: &Pubkey, slot: Slot, reward_calc_tracer: Option, - vote_only_bank: bool, + new_bank_options: NewBankOptions, ) -> Self { + let NewBankOptions { + vote_only_bank, + disable_epoch_boundary_optimization, + } = new_bank_options; + parent.freeze(); assert_ne!(slot, parent.slot()); @@ -1369,6 +1403,45 @@ impl Bank { new.apply_feature_activations(false, false); } + let optimize_epoch_boundary_updates = !disable_epoch_boundary_optimization + && new + .feature_set + .is_active(&feature_set::optimize_epoch_boundary_updates::id()); + + if optimize_epoch_boundary_updates { + if parent_epoch < new.epoch() { + let thread_pool = ThreadPoolBuilder::new().build().unwrap(); + + // Add new entry to stakes.stake_history, set appropriate epoch and + // update vote accounts with warmed up stakes before saving a + // snapshot of stakes in epoch stakes + new.stakes + .write() + .unwrap() + .activate_epoch(epoch, &thread_pool); + + // Save a snapshot of stakes for use in consensus and stake weighted networking + let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); + new.update_epoch_stakes(leader_schedule_epoch); + + // After saving a snapshot of stakes, apply stake rewards and commission + new.update_rewards_with_thread_pool(parent_epoch, reward_calc_tracer, &thread_pool); + } else { + // Save a snapshot of stakes for use in consensus and stake weighted networking + let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot); + new.update_epoch_stakes(leader_schedule_epoch); + } + + // Update sysvars before processing transactions + new.update_slot_hashes(); + new.update_stake_history(Some(parent_epoch)); + new.update_clock(Some(parent_epoch)); + new.update_fees(); + + return new; + } + + #[allow(deprecated)] let cloned = new.stakes.read().unwrap().clone_with_epoch(epoch); *new.stakes.write().unwrap() = cloned; @@ -1961,6 +2034,7 @@ impl Bank { let old_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked(); + #[allow(deprecated)] let validator_point_value = self.pay_validator_rewards( prev_epoch, validator_rewards, @@ -2034,11 +2108,110 @@ impl Bank { ); } + // update rewards based on the previous epoch + fn update_rewards_with_thread_pool( + &mut self, + prev_epoch: Epoch, + reward_calc_tracer: Option, + thread_pool: &ThreadPool, + ) { + let slot_in_year = self.slot_in_year_for_inflation(); + let epoch_duration_in_years = self.epoch_duration_in_years(prev_epoch); + + let (validator_rate, foundation_rate) = { + let inflation = self.inflation.read().unwrap(); + ( + (*inflation).validator(slot_in_year), + (*inflation).foundation(slot_in_year), + ) + }; + + let capitalization = self.capitalization(); + let validator_rewards = + (validator_rate * capitalization as f64 * epoch_duration_in_years) as u64; + + let old_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked(); + + let validator_point_value = self.pay_validator_rewards_with_thread_pool( + prev_epoch, + validator_rewards, + reward_calc_tracer, + self.stake_program_advance_activating_credits_observed(), + thread_pool, + ); + + if !self + .feature_set + .is_active(&feature_set::deprecate_rewards_sysvar::id()) + { + // this sysvar can be retired once `pico_inflation` is enabled on all clusters + self.update_sysvar_account(&sysvar::rewards::id(), |account| { + create_account( + &sysvar::rewards::Rewards::new(validator_point_value), + self.inherit_specially_retained_account_fields(account), + ) + }); + } + + let new_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked(); + let validator_rewards_paid = new_vote_balance_and_staked - old_vote_balance_and_staked; + assert_eq!( + validator_rewards_paid, + u64::try_from( + self.rewards + .read() + .unwrap() + .iter() + .map(|(_address, reward_info)| { + match reward_info.reward_type { + RewardType::Voting | RewardType::Staking => reward_info.lamports, + _ => 0, + } + }) + .sum::() + ) + .unwrap() + ); + + // verify that we didn't pay any more than we expected to + assert!(validator_rewards >= validator_rewards_paid); + + info!( + "distributed inflation: {} (rounded from: {})", + validator_rewards_paid, validator_rewards + ); + + self.capitalization + .fetch_add(validator_rewards_paid, Relaxed); + + let active_stake = if let Some(stake_history_entry) = + self.stakes.read().unwrap().history().get(&prev_epoch) + { + stake_history_entry.effective + } else { + 0 + }; + + datapoint_warn!( + "epoch_rewards", + ("slot", self.slot, i64), + ("epoch", prev_epoch, i64), + ("validator_rate", validator_rate, f64), + ("foundation_rate", foundation_rate, f64), + ("epoch_duration_in_years", epoch_duration_in_years, f64), + ("validator_rewards", validator_rewards_paid, i64), + ("active_stake", active_stake, i64), + ("pre_capitalization", capitalization, i64), + ("post_capitalization", self.capitalization(), i64) + ); + } + /// map stake delegations into resolved (pubkey, account) pairs /// returns a map (has to be copied) of loaded /// ( Vec<(staker info)> (voter account) ) keyed by voter pubkey /// /// Filters out invalid pairs + #[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] fn stake_delegation_accounts( &self, reward_calc_tracer: Option, @@ -2069,7 +2242,7 @@ impl Bank { if self .feature_set .is_active(&feature_set::filter_stake_delegation_accounts::id()) - && (stake_account.owner() != &stake::program::id() + && (stake_account.owner() != &solana_stake_program::id() || vote_account.owner() != &solana_vote_program::id()) { datapoint_warn!( @@ -2096,8 +2269,116 @@ impl Bank { accounts } + /// map stake delegations into resolved (pubkey, account) pairs + /// returns a map (has to be copied) of loaded + /// ( Vec<(staker info)> (voter account) ) keyed by voter pubkey + /// + /// Filters out invalid pairs + fn load_vote_and_stake_accounts_with_thread_pool( + &self, + thread_pool: &ThreadPool, + reward_calc_tracer: Option, + ) -> DashMap { + let filter_stake_delegation_accounts = self + .feature_set + .is_active(&feature_set::filter_stake_delegation_accounts::id()); + + let stakes = self.stakes.read().unwrap(); + let accounts = DashMap::with_capacity(stakes.vote_accounts().as_ref().len()); + + thread_pool.install(|| { + stakes + .stake_delegations() + .par_iter() + .for_each(|(stake_pubkey, delegation)| { + let vote_pubkey = &delegation.voter_pubkey; + let stake_account = match self.get_account_with_fixed_root(stake_pubkey) { + Some(stake_account) => stake_account, + None => return, + }; + + // fetch vote account from stakes cache if it hasn't been cached locally + let fetched_vote_account = if !accounts.contains_key(vote_pubkey) { + let vote_account = match self.get_account_with_fixed_root(vote_pubkey) { + Some(vote_account) => vote_account, + None => return, + }; + + let vote_state: VoteState = + match StateMut::::state(&vote_account) { + Ok(vote_state) => vote_state.convert_to_current(), + Err(err) => { + debug!( + "failed to deserialize vote account {}: {}", + vote_pubkey, err + ); + return; + } + }; + + Some((vote_state, vote_account)) + } else { + None + }; + + let fetched_vote_account_owner = fetched_vote_account + .as_ref() + .map(|(_vote_state, vote_account)| vote_account.owner()); + + if let Some(reward_calc_tracer) = reward_calc_tracer.as_ref() { + reward_calc_tracer(&RewardCalculationEvent::Staking( + stake_pubkey, + &InflationPointCalculationEvent::Delegation( + *delegation, + fetched_vote_account_owner + .cloned() + .unwrap_or_else(solana_vote_program::id), + ), + )); + } + + // filter invalid delegation accounts + if filter_stake_delegation_accounts + && (stake_account.owner() != &solana_stake_program::id() + || (fetched_vote_account_owner.is_some() + && fetched_vote_account_owner != Some(&solana_vote_program::id()))) + { + datapoint_warn!( + "bank-stake_delegation_accounts-invalid-account", + ("slot", self.slot() as i64, i64), + ("stake-address", format!("{:?}", stake_pubkey), String), + ("vote-address", format!("{:?}", vote_pubkey), String), + ); + return; + } + + let stake_delegation = match stake_account.state().ok() { + Some(stake_state) => (*stake_pubkey, (stake_state, stake_account)), + None => return, + }; + + if let Some((vote_state, vote_account)) = fetched_vote_account { + accounts + .entry(*vote_pubkey) + .or_insert_with(|| VoteWithStakeDelegations { + vote_state: Arc::new(vote_state), + vote_account, + delegations: vec![], + }); + } + + if let Some(mut stake_delegation_accounts) = accounts.get_mut(vote_pubkey) { + stake_delegation_accounts.delegations.push(stake_delegation); + } + }); + }); + + accounts + } + /// iterate over all stakes, redeem vote credits for each stake we can /// successfully load and parse, return the lamport value of one point + #[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] fn pay_validator_rewards( &mut self, rewarded_epoch: Epoch, @@ -2107,6 +2388,7 @@ impl Bank { ) -> f64 { let stake_history = self.stakes.read().unwrap().history().clone(); + #[allow(deprecated)] let mut stake_delegation_accounts = self.stake_delegation_accounts(reward_calc_tracer.as_ref()); @@ -2118,8 +2400,13 @@ impl Bank { .map(move |(_stake_pubkey, stake_account)| (stake_account, vote_account)) }) .map(|(stake_account, vote_account)| { - stake_state::calculate_points(stake_account, vote_account, Some(&stake_history)) - .unwrap_or(0) + #[allow(deprecated)] + stake_state::calculate_points_slow( + stake_account, + vote_account, + Some(&stake_history), + ) + .unwrap_or(0) }) .sum(); @@ -2155,7 +2442,8 @@ impl Bank { outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event)) } }); - let redeemed = stake_state::redeem_rewards( + #[allow(deprecated)] + let redeemed = stake_state::redeem_rewards_slow( rewarded_epoch, stake_account, vote_account, @@ -2210,6 +2498,176 @@ impl Bank { point_value.rewards as f64 / point_value.points as f64 } + /// iterate over all stakes, redeem vote credits for each stake we can + /// successfully load and parse, return the lamport value of one point + fn pay_validator_rewards_with_thread_pool( + &mut self, + rewarded_epoch: Epoch, + rewards: u64, + reward_calc_tracer: Option, + fix_activating_credits_observed: bool, + thread_pool: &ThreadPool, + ) -> f64 { + let stake_history = self.stakes.read().unwrap().history().clone(); + let vote_and_stake_accounts = self.load_vote_and_stake_accounts_with_thread_pool( + thread_pool, + reward_calc_tracer.as_ref(), + ); + + let points: u128 = thread_pool.install(|| { + vote_and_stake_accounts + .par_iter() + .map(|entry| { + let VoteWithStakeDelegations { + vote_state, + delegations, + .. + } = entry.value(); + + delegations + .par_iter() + .map(|(_stake_pubkey, (stake_state, _stake_account))| { + stake_state::calculate_points( + stake_state, + vote_state, + Some(&stake_history), + ) + .unwrap_or(0) + }) + .sum::() + }) + .sum() + }); + + if points == 0 { + return 0.0; + } + + // pay according to point value + let point_value = PointValue { rewards, points }; + let vote_account_rewards: DashMap = + DashMap::with_capacity(vote_and_stake_accounts.len()); + let stake_delegation_iterator = vote_and_stake_accounts.into_par_iter().flat_map( + |( + vote_pubkey, + VoteWithStakeDelegations { + vote_state, + vote_account, + delegations, + }, + )| { + vote_account_rewards + .insert(vote_pubkey, (vote_account, vote_state.commission, 0, false)); + delegations + .into_par_iter() + .map(move |delegation| (vote_pubkey, Arc::clone(&vote_state), delegation)) + }, + ); + + let mut stake_rewards = thread_pool.install(|| { + stake_delegation_iterator + .filter_map( + |( + vote_pubkey, + vote_state, + (stake_pubkey, (stake_state, mut stake_account)), + )| { + // curry closure to add the contextual stake_pubkey + let reward_calc_tracer = reward_calc_tracer.as_ref().map(|outer| { + // inner + move |inner_event: &_| { + outer(&RewardCalculationEvent::Staking(&stake_pubkey, inner_event)) + } + }); + let redeemed = stake_state::redeem_rewards( + rewarded_epoch, + stake_state, + &mut stake_account, + &vote_state, + &point_value, + Some(&stake_history), + reward_calc_tracer.as_ref(), + fix_activating_credits_observed, + ); + if let Ok((stakers_reward, voters_reward)) = redeemed { + // track voter rewards + if let Some(( + _vote_account, + _commission, + vote_rewards_sum, + vote_needs_store, + )) = vote_account_rewards.get_mut(&vote_pubkey).as_deref_mut() + { + *vote_needs_store = true; + *vote_rewards_sum = vote_rewards_sum.saturating_add(voters_reward); + } + + // store stake account even if stakers_reward is 0 + // because credits observed has changed + self.store_account(&stake_pubkey, &stake_account); + + if stakers_reward > 0 { + return Some(( + stake_pubkey, + RewardInfo { + reward_type: RewardType::Staking, + lamports: stakers_reward as i64, + post_balance: stake_account.lamports(), + commission: Some(vote_state.commission), + }, + )); + } + } else { + debug!( + "stake_state::redeem_rewards() failed for {}: {:?}", + stake_pubkey, redeemed + ); + } + None + }, + ) + .collect() + }); + + let mut vote_rewards = vote_account_rewards + .into_iter() + .filter_map( + |(vote_pubkey, (mut vote_account, commission, vote_rewards, vote_needs_store))| { + if let Err(err) = vote_account.checked_add_lamports(vote_rewards) { + debug!("reward redemption failed for {}: {:?}", vote_pubkey, err); + return None; + } + + if vote_needs_store { + self.store_account(&vote_pubkey, &vote_account); + } + + if vote_rewards > 0 { + Some(( + vote_pubkey, + RewardInfo { + reward_type: RewardType::Voting, + lamports: vote_rewards as i64, + post_balance: vote_account.lamports(), + commission: Some(commission), + }, + )) + } else { + None + } + }, + ) + .collect(); + + { + let mut rewards = self.rewards.write().unwrap(); + rewards.append(&mut vote_rewards); + rewards.append(&mut stake_rewards); + } + + point_value.rewards as f64 / point_value.points as f64 + } + fn update_recent_blockhashes_locked(&self, locked_blockhash_queue: &BlockhashQueue) { #[allow(deprecated)] self.update_sysvar_account(&sysvar::recent_blockhashes::id(), |account| { @@ -7831,17 +8289,28 @@ pub(crate) mod tests { } bank0.store_account_and_update_capitalization(&vote_id, &vote_account); + let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); let validator_points: u128 = bank0 - .stake_delegation_accounts(null_tracer()) - .iter() - .flat_map(|(_vote_pubkey, (stake_group, vote_account))| { - stake_group - .iter() - .map(move |(_stake_pubkey, stake_account)| (stake_account, vote_account)) - }) - .map(|(stake_account, vote_account)| { - stake_state::calculate_points(stake_account, vote_account, None).unwrap_or(0) - }) + .load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer()) + .into_iter() + .map( + |( + _vote_pubkey, + VoteWithStakeDelegations { + vote_state, + delegations, + .. + }, + )| { + delegations + .iter() + .map(move |(_stake_pubkey, (stake_state, _stake_account))| { + stake_state::calculate_points(stake_state, &vote_state, None) + .unwrap_or(0) + }) + .sum::() + }, + ) .sum(); // put a child bank in epoch 1, which calls update_rewards()... @@ -13857,8 +14326,10 @@ pub(crate) mod tests { vec![10_000; 2], ); let bank = Arc::new(Bank::new_for_tests(&genesis_config)); - let stake_delegation_accounts = bank.stake_delegation_accounts(null_tracer()); - assert_eq!(stake_delegation_accounts.len(), 2); + let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let vote_and_stake_accounts = + bank.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer()); + assert_eq!(vote_and_stake_accounts.len(), 2); let mut vote_account = bank .get_account(&validator_vote_keypairs0.vote_keypair.pubkey()) @@ -13896,8 +14367,10 @@ pub(crate) mod tests { ); // Accounts must be valid stake and vote accounts - let stake_delegation_accounts = bank.stake_delegation_accounts(null_tracer()); - assert_eq!(stake_delegation_accounts.len(), 0); + let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + let vote_and_stake_accounts = + bank.load_vote_and_stake_accounts_with_thread_pool(&thread_pool, null_tracer()); + assert_eq!(vote_and_stake_accounts.len(), 0); } #[test] diff --git a/runtime/src/stakes.rs b/runtime/src/stakes.rs index d22fc415ba..b303033fc8 100644 --- a/runtime/src/stakes.rs +++ b/runtime/src/stakes.rs @@ -1,14 +1,18 @@ //! Stakes serve as a cache of stake and vote accounts to derive //! node stakes use { - crate::vote_account::{VoteAccount, VoteAccounts}, + crate::vote_account::{VoteAccount, VoteAccounts, VoteAccountsHashMap}, + rayon::{ + iter::{IntoParallelRefIterator, ParallelIterator}, + ThreadPool, + }, solana_sdk::{ account::{AccountSharedData, ReadableAccount}, clock::Epoch, pubkey::Pubkey, stake::{ self, - state::{Delegation, StakeState}, + state::{Delegation, StakeActivationStatus, StakeState}, }, stake_history::StakeHistory, }, @@ -39,6 +43,8 @@ impl Stakes { pub fn history(&self) -> &StakeHistory { &self.stake_history } + + #[deprecated(note = "remove after optimize_epoch_boundary_updates feature is active")] pub fn clone_with_epoch(&self, next_epoch: Epoch) -> Self { let prev_epoch = self.epoch; if prev_epoch == next_epoch { @@ -81,6 +87,73 @@ impl Stakes { } } + pub fn activate_epoch(&mut self, next_epoch: Epoch, thread_pool: &ThreadPool) { + let prev_epoch = self.epoch; + self.epoch = next_epoch; + + thread_pool.install(|| { + let stake_delegations = &self.stake_delegations; + let stake_history = &mut self.stake_history; + let vote_accounts: &VoteAccountsHashMap = self.vote_accounts.as_ref(); + + // construct map of vote pubkey -> list of stake delegations + let vote_delegations: HashMap> = { + let mut vote_delegations = HashMap::with_capacity(vote_accounts.len()); + stake_delegations + .iter() + .for_each(|(_stake_pubkey, delegation)| { + let vote_pubkey = &delegation.voter_pubkey; + vote_delegations + .entry(*vote_pubkey) + .and_modify(|delegations: &mut Vec<_>| delegations.push(delegation)) + .or_insert_with(|| vec![delegation]); + }); + vote_delegations + }; + + // wrap up the prev epoch by adding new stake history entry for the prev epoch + { + let stake_history_entry = vote_delegations + .par_iter() + .map(|(_vote_pubkey, delegations)| { + delegations + .par_iter() + .map(|delegation| { + delegation.stake_activating_and_deactivating( + prev_epoch, + Some(stake_history), + ) + }) + .reduce(StakeActivationStatus::default, |a, b| a + b) + }) + .reduce(StakeActivationStatus::default, |a, b| a + b); + + stake_history.add(prev_epoch, stake_history_entry); + } + + // refresh the stake distribution of vote accounts for the next epoch, using new stake history + let vote_accounts_for_next_epoch: VoteAccountsHashMap = vote_accounts + .par_iter() + .map(|(vote_pubkey, (_stake, vote_account))| { + let delegated_stake = vote_delegations + .get(vote_pubkey) + .map(|delegations| { + delegations + .par_iter() + .map(|delegation| delegation.stake(next_epoch, Some(stake_history))) + .sum() + }) + .unwrap_or_default(); + + (*vote_pubkey, (delegated_stake, vote_account.clone())) + }) + .collect(); + + // overwrite vote accounts so that staked nodes singleton is reset + self.vote_accounts = VoteAccounts::from(Arc::new(vote_accounts_for_next_epoch)); + }); + } + // sum the stakes that point to the given voter_pubkey fn calculate_stake( &self, @@ -228,6 +301,7 @@ impl Stakes { #[cfg(test)] pub mod tests { use super::*; + use rayon::ThreadPoolBuilder; use solana_sdk::{account::WritableAccount, pubkey::Pubkey, rent::Rent}; use solana_stake_program::stake_state; use solana_vote_program::vote_state::{self, VoteState, VoteStateVersions}; @@ -512,6 +586,7 @@ pub mod tests { assert_eq!(vote_accounts.get(&vote_pubkey).unwrap().0, 20); } } + #[test] fn test_clone_with_epoch() { let mut stakes = Stakes::default(); @@ -530,6 +605,7 @@ pub mod tests { stake.stake(stakes.epoch, Some(&stakes.stake_history)) ); } + #[allow(deprecated)] let stakes = stakes.clone_with_epoch(3); { let vote_accounts = stakes.vote_accounts(); @@ -540,6 +616,35 @@ pub mod tests { } } + #[test] + fn test_activate_epoch() { + let mut stakes = Stakes::default(); + + let ((vote_pubkey, vote_account), (stake_pubkey, stake_account)) = + create_staked_node_accounts(10); + + stakes.store(&vote_pubkey, &vote_account, true, true); + stakes.store(&stake_pubkey, &stake_account, true, true); + let stake = stake_state::stake_from(&stake_account).unwrap(); + + { + let vote_accounts = stakes.vote_accounts(); + assert_eq!( + vote_accounts.get(&vote_pubkey).unwrap().0, + stake.stake(stakes.epoch, Some(&stakes.stake_history)) + ); + } + let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + stakes.activate_epoch(3, &thread_pool); + { + let vote_accounts = stakes.vote_accounts(); + assert_eq!( + vote_accounts.get(&vote_pubkey).unwrap().0, + stake.stake(stakes.epoch, Some(&stakes.stake_history)) + ); + } + } + #[test] fn test_stakes_not_delegate() { let mut stakes = Stakes { @@ -600,8 +705,9 @@ pub mod tests { assert_eq!(stakes.vote_balance_and_staked(), 11); assert_eq!(stakes.vote_balance_and_warmed_staked(), 1); + let thread_pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); for (epoch, expected_warmed_stake) in ((genesis_epoch + 1)..=3).zip(&[2, 3, 4]) { - stakes = stakes.clone_with_epoch(epoch); + stakes.activate_epoch(epoch, &thread_pool); // vote_balance_and_staked() always remain to return same lamports // while vote_balance_and_warmed_staked() gradually increases assert_eq!(stakes.vote_balance_and_staked(), 11); diff --git a/runtime/src/vote_account.rs b/runtime/src/vote_account.rs index 7e7e54a7fb..f3b3d59f21 100644 --- a/runtime/src/vote_account.rs +++ b/runtime/src/vote_account.rs @@ -29,9 +29,11 @@ struct VoteAccountInner { vote_state_once: Once, } +pub type VoteAccountsHashMap = HashMap; + #[derive(Debug, AbiExample)] pub struct VoteAccounts { - vote_accounts: Arc>, + vote_accounts: Arc, // Inner Arc is meant to implement copy-on-write semantics as opposed to // sharing mutations (hence RwLock> instead of Arc>). staked_nodes: RwLock< @@ -46,8 +48,12 @@ pub struct VoteAccounts { } impl VoteAccount { + pub fn account(&self) -> &Account { + &self.0.account + } + pub(crate) fn lamports(&self) -> u64 { - self.0.account.lamports + self.account().lamports } pub fn vote_state(&self) -> RwLockReadGuard> { @@ -192,6 +198,12 @@ impl From for VoteAccount { } } +impl AsRef for VoteAccount { + fn as_ref(&self) -> &VoteAccountInner { + &self.0 + } +} + impl From for VoteAccountInner { fn from(account: AccountSharedData) -> Self { Self::from(Account::from(account)) @@ -261,8 +273,6 @@ impl PartialEq for VoteAccounts { } } -type VoteAccountsHashMap = HashMap; - impl From> for VoteAccounts { fn from(vote_accounts: Arc) -> Self { Self { diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index e65ff489d4..85d1f7ad98 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -229,6 +229,10 @@ pub mod prevent_calling_precompiles_as_programs { solana_sdk::declare_id!("4ApgRX3ud6p7LNMJmsuaAcZY5HWctGPr5obAsjB3A54d"); } +pub mod optimize_epoch_boundary_updates { + solana_sdk::declare_id!("265hPS8k8xJ37ot82KEgjRunsUp5w4n4Q4VwwiN9i9ps"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -281,6 +285,7 @@ lazy_static! { (stakes_remove_delegation_if_inactive::id(), "remove delegations from stakes cache when inactive"), (do_support_realloc::id(), "support account data reallocation"), (prevent_calling_precompiles_as_programs::id(), "Prevent calling precompiles as programs"), + (optimize_epoch_boundary_updates::id(), "Optimize epoch boundary updates"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter() diff --git a/validator/src/main.rs b/validator/src/main.rs index 6dce7b8bc4..e4e499f887 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1443,6 +1443,14 @@ pub fn main() { .help("Allow contacting private ip addresses") .hidden(true), ) + .arg( + Arg::with_name("disable_epoch_boundary_optimization") + .long("disable-epoch-boundary-optimization") + .takes_value(false) + .help("Disables epoch boundary optimization and overrides the \ + optimize_epoch_boundary_updates feature switch if enabled.") + .hidden(true), + ) .after_help("The default subcommand is run") .subcommand( SubCommand::with_name("exit") @@ -2072,6 +2080,8 @@ pub fn main() { tpu_coalesce_ms, no_wait_for_vote_to_start_leader: matches.is_present("no_wait_for_vote_to_start_leader"), accounts_shrink_ratio, + disable_epoch_boundary_optimization: matches + .is_present("disable_epoch_boundary_optimization"), ..ValidatorConfig::default() };