Add StakesCache struct to abstract away locking (#21738)

This commit is contained in:
Justin Starry 2021-12-10 10:51:33 -05:00 committed by GitHub
parent 622fd7c7ec
commit fd175c1ea9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 163 additions and 127 deletions

View File

@ -56,7 +56,7 @@ use {
calculate_stake_weighted_timestamp, MaxAllowableDrift, MAX_ALLOWABLE_DRIFT_PERCENTAGE,
MAX_ALLOWABLE_DRIFT_PERCENTAGE_FAST, MAX_ALLOWABLE_DRIFT_PERCENTAGE_SLOW,
},
stakes::Stakes,
stakes::{InvalidCacheEntryReason, Stakes, StakesCache},
status_cache::{SlotDelta, StatusCache},
system_instruction_processor::{get_system_account_kind, SystemAccountKind},
transaction_batch::TransactionBatch,
@ -66,8 +66,6 @@ use {
dashmap::DashMap,
itertools::Itertools,
log::*,
num_derive::ToPrimitive,
num_traits::ToPrimitive,
rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
ThreadPool, ThreadPoolBuilder,
@ -772,7 +770,7 @@ pub(crate) struct BankFieldsToSerialize<'a> {
pub(crate) rent_collector: RentCollector,
pub(crate) epoch_schedule: EpochSchedule,
pub(crate) inflation: Inflation,
pub(crate) stakes: &'a RwLock<Stakes>,
pub(crate) stakes: &'a StakesCache,
pub(crate) epoch_stakes: &'a HashMap<Epoch, EpochStakes>,
pub(crate) is_delta: bool,
}
@ -811,7 +809,7 @@ impl PartialEq for Bank {
&& self.rent_collector == other.rent_collector
&& self.epoch_schedule == other.epoch_schedule
&& *self.inflation.read().unwrap() == *other.inflation.read().unwrap()
&& *self.stakes.read().unwrap() == *other.stakes.read().unwrap()
&& *self.stakes_cache.stakes() == *other.stakes_cache.stakes()
&& self.epoch_stakes == other.epoch_stakes
&& self.is_delta.load(Relaxed) == other.is_delta.load(Relaxed)
}
@ -991,7 +989,7 @@ pub struct Bank {
inflation: Arc<RwLock<Inflation>>,
/// cache of vote_account and stake_account state for this fork
stakes: RwLock<Stakes>,
stakes_cache: StakesCache,
/// staked nodes on epoch boundaries, saved off when a bank.slot() is at
/// a leader schedule calculation boundary
@ -1057,17 +1055,10 @@ struct VoteWithStakeDelegations {
delegations: Vec<(Pubkey, (StakeState, AccountSharedData))>,
}
#[derive(Debug, Clone, PartialEq, ToPrimitive)]
enum InvalidReason {
Missing,
BadState,
WrongOwner,
}
struct LoadVoteAndStakeAccountsResult {
vote_with_stake_delegations_map: DashMap<Pubkey, VoteWithStakeDelegations>,
invalid_stake_keys: DashMap<Pubkey, InvalidReason>,
invalid_vote_keys: DashMap<Pubkey, InvalidReason>,
invalid_stake_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
invalid_vote_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
}
#[derive(Debug, Default)]
@ -1170,7 +1161,7 @@ impl Bank {
rent_collector: RentCollector::default(),
epoch_schedule: EpochSchedule::default(),
inflation: Arc::<RwLock<Inflation>>::default(),
stakes: RwLock::<Stakes>::default(),
stakes_cache: StakesCache::default(),
epoch_stakes: HashMap::<Epoch, EpochStakes>::default(),
is_delta: AtomicBool::default(),
builtin_programs: BuiltinPrograms::default(),
@ -1279,7 +1270,7 @@ impl Bank {
// genesis needs stakes for all epochs up to the epoch implied by
// slot = 0 and genesis configuration
{
let stakes = bank.stakes.read().unwrap();
let stakes = bank.stakes_cache.stakes();
for epoch in 0..=bank.get_leader_schedule_epoch(bank.slot) {
bank.epoch_stakes
.insert(epoch, EpochStakes::new(&stakes, epoch));
@ -1399,7 +1390,7 @@ impl Bank {
transaction_entries_count: AtomicU64::new(0),
transactions_per_entry_max: AtomicU64::new(0),
// we will .clone_with_epoch() this soon after stake data update; so just .clone() for now
stakes: RwLock::new(parent.stakes.read().unwrap().clone()),
stakes_cache: StakesCache::new(parent.stakes_cache.stakes().clone()),
epoch_stakes: parent.epoch_stakes.clone(),
parent_hash: parent.hash(),
parent_slot: parent.slot(),
@ -1456,10 +1447,7 @@ impl Bank {
// Add new entry to stakes.stake_history, set appropriate epoch and
// update vote accounts with warmed up stakes before saving a
// snapshot of stakes in epoch stakes
new.stakes
.write()
.unwrap()
.activate_epoch(epoch, &thread_pool);
new.stakes_cache.activate_epoch(epoch, &thread_pool);
// Save a snapshot of stakes for use in consensus and stake weighted networking
let leader_schedule_epoch = epoch_schedule.get_leader_schedule_epoch(slot);
@ -1592,7 +1580,7 @@ impl Bank {
rent_collector: fields.rent_collector.clone_with_epoch(fields.epoch),
epoch_schedule: fields.epoch_schedule,
inflation: Arc::new(RwLock::new(fields.inflation)),
stakes: RwLock::new(fields.stakes),
stakes_cache: StakesCache::new(fields.stakes),
epoch_stakes: fields.epoch_stakes,
is_delta: AtomicBool::new(fields.is_delta),
builtin_programs: new(),
@ -1693,7 +1681,7 @@ impl Bank {
rent_collector: self.rent_collector.clone(),
epoch_schedule: self.epoch_schedule,
inflation: *self.inflation.read().unwrap(),
stakes: &self.stakes,
stakes: &self.stakes_cache,
epoch_stakes: &self.epoch_stakes,
is_delta: self.is_delta.load(Relaxed),
}
@ -1962,12 +1950,11 @@ impl Bank {
});
let new_epoch_stakes =
EpochStakes::new(&self.stakes.read().unwrap(), leader_schedule_epoch);
EpochStakes::new(&self.stakes_cache.stakes(), leader_schedule_epoch);
{
let vote_stakes: HashMap<_, _> = self
.stakes
.read()
.unwrap()
.stakes_cache
.stakes()
.vote_accounts()
.iter()
.map(|(pubkey, (stake, _))| (*pubkey, *stake))
@ -2024,7 +2011,7 @@ impl Bank {
// if I'm the first Bank in an epoch, ensure stake_history is updated
self.update_sysvar_account(&sysvar::stake_history::id(), |account| {
create_account::<sysvar::stake_history::StakeHistory>(
self.stakes.read().unwrap().history(),
self.stakes_cache.stakes().history(),
self.inherit_specially_retained_account_fields(account),
)
});
@ -2097,7 +2084,7 @@ impl Bank {
let validator_rewards =
(validator_rate * capitalization as f64 * epoch_duration_in_years) as u64;
let old_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked();
let old_vote_balance_and_staked = self.stakes_cache.stakes().vote_balance_and_staked();
let validator_point_value = self.pay_validator_rewards_with_thread_pool(
prev_epoch,
@ -2120,7 +2107,7 @@ impl Bank {
});
}
let new_vote_balance_and_staked = self.stakes.read().unwrap().vote_balance_and_staked();
let new_vote_balance_and_staked = self.stakes_cache.stakes().vote_balance_and_staked();
let validator_rewards_paid = new_vote_balance_and_staked - old_vote_balance_and_staked;
assert_eq!(
validator_rewards_paid,
@ -2152,7 +2139,7 @@ impl Bank {
.fetch_add(validator_rewards_paid, Relaxed);
let active_stake = if let Some(stake_history_entry) =
self.stakes.read().unwrap().history().get(prev_epoch)
self.stakes_cache.stakes().history().get(prev_epoch)
{
stake_history_entry.effective
} else {
@ -2183,11 +2170,11 @@ impl Bank {
thread_pool: &ThreadPool,
reward_calc_tracer: Option<impl Fn(&RewardCalculationEvent) + Send + Sync>,
) -> LoadVoteAndStakeAccountsResult {
let stakes = self.stakes.read().unwrap();
let stakes = self.stakes_cache.stakes();
let vote_with_stake_delegations_map =
DashMap::with_capacity(stakes.vote_accounts().as_ref().len());
let invalid_stake_keys: DashMap<Pubkey, InvalidReason> = DashMap::new();
let invalid_vote_keys: DashMap<Pubkey, InvalidReason> = DashMap::new();
let invalid_stake_keys: DashMap<Pubkey, InvalidCacheEntryReason> = DashMap::new();
let invalid_vote_keys: DashMap<Pubkey, InvalidCacheEntryReason> = DashMap::new();
thread_pool.install(|| {
stakes
@ -2202,7 +2189,8 @@ impl Bank {
let stake_delegation = match self.get_account_with_fixed_root(stake_pubkey) {
Some(stake_account) => {
if stake_account.owner() != &solana_stake_program::id() {
invalid_stake_keys.insert(*stake_pubkey, InvalidReason::WrongOwner);
invalid_stake_keys
.insert(*stake_pubkey, InvalidCacheEntryReason::WrongOwner);
return;
}
@ -2210,13 +2198,14 @@ impl Bank {
Some(stake_state) => (*stake_pubkey, (stake_state, stake_account)),
None => {
invalid_stake_keys
.insert(*stake_pubkey, InvalidReason::BadState);
.insert(*stake_pubkey, InvalidCacheEntryReason::BadState);
return;
}
}
}
None => {
invalid_stake_keys.insert(*stake_pubkey, InvalidReason::Missing);
invalid_stake_keys
.insert(*stake_pubkey, InvalidCacheEntryReason::Missing);
return;
}
};
@ -2230,13 +2219,14 @@ impl Bank {
Some(vote_account) => {
if vote_account.owner() != &solana_vote_program::id() {
invalid_vote_keys
.insert(*vote_pubkey, InvalidReason::WrongOwner);
.insert(*vote_pubkey, InvalidCacheEntryReason::WrongOwner);
return;
}
vote_account
}
None => {
invalid_vote_keys.insert(*vote_pubkey, InvalidReason::Missing);
invalid_vote_keys
.insert(*vote_pubkey, InvalidCacheEntryReason::Missing);
return;
}
};
@ -2246,7 +2236,8 @@ impl Bank {
{
vote_state.convert_to_current()
} else {
invalid_vote_keys.insert(*vote_pubkey, InvalidReason::BadState);
invalid_vote_keys
.insert(*vote_pubkey, InvalidCacheEntryReason::BadState);
return;
};
@ -2280,51 +2271,6 @@ impl Bank {
}
}
fn handle_invalid_stakes_cache_keys(
&self,
invalid_stake_keys: DashMap<Pubkey, InvalidReason>,
invalid_vote_keys: DashMap<Pubkey, InvalidReason>,
) {
if invalid_stake_keys.is_empty() && invalid_vote_keys.is_empty() {
return;
}
// Prune invalid stake delegations and vote accounts that were
// not properly evicted in normal operation.
let mut maybe_stakes_cache = if self
.feature_set
.is_active(&feature_set::evict_invalid_stakes_cache_entries::id())
{
Some(self.stakes.write().unwrap())
} else {
None
};
for (stake_pubkey, reason) in invalid_stake_keys {
if let Some(stakes_cache) = maybe_stakes_cache.as_mut() {
stakes_cache.remove_stake_delegation(&stake_pubkey);
}
datapoint_warn!(
"bank-stake_delegation_accounts-invalid-account",
("slot", self.slot() as i64, i64),
("stake-address", format!("{:?}", stake_pubkey), String),
("reason", reason.to_i64().unwrap_or_default(), i64),
);
}
for (vote_pubkey, reason) in invalid_vote_keys {
if let Some(stakes_cache) = maybe_stakes_cache.as_mut() {
stakes_cache.remove_vote_account(&vote_pubkey);
}
datapoint_warn!(
"bank-stake_delegation_accounts-invalid-account",
("slot", self.slot() as i64, i64),
("vote-address", format!("{:?}", vote_pubkey), String),
("reason", reason.to_i64().unwrap_or_default(), i64),
);
}
}
/// iterate over all stakes, redeem vote credits for each stake we can
/// successfully load and parse, return the lamport value of one point
fn pay_validator_rewards_with_thread_pool(
@ -2335,7 +2281,7 @@ impl Bank {
fix_activating_credits_observed: bool,
thread_pool: &ThreadPool,
) -> f64 {
let stake_history = self.stakes.read().unwrap().history().clone();
let stake_history = self.stakes_cache.stakes().history().clone();
let vote_with_stake_delegations_map = {
let LoadVoteAndStakeAccountsResult {
vote_with_stake_delegations_map,
@ -2346,7 +2292,15 @@ impl Bank {
reward_calc_tracer.as_ref(),
);
self.handle_invalid_stakes_cache_keys(invalid_stake_keys, invalid_vote_keys);
let evict_invalid_stakes_cache_entries = self
.feature_set
.is_active(&feature_set::evict_invalid_stakes_cache_entries::id());
self.stakes_cache.handle_invalid_keys(
invalid_stake_keys,
invalid_vote_keys,
evict_invalid_stakes_cache_entries,
self.slot(),
);
vote_with_stake_delegations_map
};
@ -2771,9 +2725,8 @@ impl Bank {
// highest staked node is the first collector
self.collector_id = self
.stakes
.read()
.unwrap()
.stakes_cache
.stakes()
.highest_staked_node()
.unwrap_or_default();
@ -4779,13 +4732,11 @@ impl Bank {
.accounts
.store_slow_cached(self.slot(), pubkey, account);
if Stakes::is_stake(account) {
self.stakes.write().unwrap().store(
pubkey,
account,
self.stakes_remove_delegation_if_inactive_enabled(),
);
}
self.stakes_cache.check_and_store(
pubkey,
account,
self.stakes_remove_delegation_if_inactive_enabled(),
);
}
pub fn force_flush_accounts_cache(&self) {
@ -5524,11 +5475,10 @@ impl Bank {
let message = tx.message();
let loaded_transaction = raccs.as_ref().unwrap();
for (_i, (pubkey, account)) in (0..message.account_keys_len())
.zip(loaded_transaction.accounts.iter())
.filter(|(_i, (_pubkey, account))| (Stakes::is_stake(account)))
for (_i, (pubkey, account)) in
(0..message.account_keys_len()).zip(loaded_transaction.accounts.iter())
{
self.stakes.write().unwrap().store(
self.stakes_cache.check_and_store(
pubkey,
account,
self.stakes_remove_delegation_if_inactive_enabled(),
@ -5538,19 +5488,19 @@ impl Bank {
}
pub fn staked_nodes(&self) -> Arc<HashMap<Pubkey, u64>> {
self.stakes.read().unwrap().staked_nodes()
self.stakes_cache.stakes().staked_nodes()
}
/// current vote accounts for this bank along with the stake
/// attributed to each account
pub fn vote_accounts(&self) -> Arc<HashMap<Pubkey, (/*stake:*/ u64, VoteAccount)>> {
let stakes = self.stakes.read().unwrap();
let stakes = self.stakes_cache.stakes();
Arc::from(stakes.vote_accounts())
}
/// Vote account for the given vote account pubkey along with the stake.
pub fn get_vote_account(&self, vote_account: &Pubkey) -> Option<(/*stake:*/ u64, VoteAccount)> {
let stakes = self.stakes.read().unwrap();
let stakes = self.stakes_cache.stakes();
stakes.vote_accounts().get(vote_account).cloned()
}
@ -6290,7 +6240,7 @@ pub(crate) mod tests {
impl Bank {
fn cloned_stake_delegations(&self) -> StakeDelegations {
self.stakes.read().unwrap().stake_delegations().clone()
self.stakes_cache.stakes().stake_delegations().clone()
}
}
@ -10577,10 +10527,10 @@ pub(crate) mod tests {
// Non-builtin loader accounts can not be used for instruction processing
{
let stakes = bank.stakes.read().unwrap();
let stakes = bank.stakes_cache.stakes();
assert!(stakes.vote_accounts().as_ref().is_empty());
}
assert!(bank.stakes.read().unwrap().stake_delegations().is_empty());
assert!(bank.stakes_cache.stakes().stake_delegations().is_empty());
assert_eq!(bank.calculate_capitalization(true), bank.capitalization());
let ((vote_id, vote_account), (stake_id, stake_account)) =
@ -10590,19 +10540,19 @@ pub(crate) mod tests {
bank.store_account(&vote_id, &vote_account);
bank.store_account(&stake_id, &stake_account);
{
let stakes = bank.stakes.read().unwrap();
let stakes = bank.stakes_cache.stakes();
assert!(!stakes.vote_accounts().as_ref().is_empty());
}
assert!(!bank.stakes.read().unwrap().stake_delegations().is_empty());
assert!(!bank.stakes_cache.stakes().stake_delegations().is_empty());
assert_eq!(bank.calculate_capitalization(true), bank.capitalization());
bank.add_builtin("mock_program1", &vote_id, mock_ix_processor);
bank.add_builtin("mock_program2", &stake_id, mock_ix_processor);
{
let stakes = bank.stakes.read().unwrap();
let stakes = bank.stakes_cache.stakes();
assert!(stakes.vote_accounts().as_ref().is_empty());
}
assert!(bank.stakes.read().unwrap().stake_delegations().is_empty());
assert!(bank.stakes_cache.stakes().stake_delegations().is_empty());
assert_eq!(bank.calculate_capitalization(true), bank.capitalization());
assert_eq!(
"mock_program1",
@ -10622,10 +10572,10 @@ pub(crate) mod tests {
let new_hash = bank.get_accounts_hash();
assert_eq!(old_hash, new_hash);
{
let stakes = bank.stakes.read().unwrap();
let stakes = bank.stakes_cache.stakes();
assert!(stakes.vote_accounts().as_ref().is_empty());
}
assert!(bank.stakes.read().unwrap().stake_delegations().is_empty());
assert!(bank.stakes_cache.stakes().stake_delegations().is_empty());
assert_eq!(bank.calculate_capitalization(true), bank.capitalization());
assert_eq!(
"mock_program1",

View File

@ -2,9 +2,9 @@
use solana_frozen_abi::abi_example::IgnoreAsHelper;
use {
super::{common::UnusedAccounts, *},
crate::ancestors::AncestorsForSerialization,
crate::{ancestors::AncestorsForSerialization, stakes::StakesCache},
solana_measure::measure::Measure,
std::cell::RefCell,
std::{cell::RefCell, sync::RwLock},
};
type AccountsDbFields = super::AccountsDbFields<SerializableAccountStorageEntry>;
@ -42,7 +42,6 @@ impl From<&AccountStorageEntry> for SerializableAccountStorageEntry {
}
}
use std::sync::RwLock;
// Deserializable version of Bank which need not be serializable,
// because it's handled by SerializableVersionedBank.
// So, sync fields with it!
@ -153,7 +152,7 @@ pub(crate) struct SerializableVersionedBank<'a> {
pub(crate) rent_collector: RentCollector,
pub(crate) epoch_schedule: EpochSchedule,
pub(crate) inflation: Inflation,
pub(crate) stakes: &'a RwLock<Stakes>,
pub(crate) stakes: &'a StakesCache,
pub(crate) unused_accounts: UnusedAccounts,
pub(crate) epoch_stakes: &'a HashMap<Epoch, EpochStakes>,
pub(crate) is_delta: bool,

View File

@ -312,7 +312,7 @@ mod test_bank_serialize {
// This some what long test harness is required to freeze the ABI of
// Bank's serialization due to versioned nature
#[frozen_abi(digest = "Fv5AFJSnZi9sssiE7Jn8bH2iTPnqu3UNc3np62r1sTsr")]
#[frozen_abi(digest = "EuYcD3JCEWRnQaFHW1CAy2bBqLkakc88iLJtZH6kYeVF")]
#[derive(Serialize, AbiExample)]
pub struct BankAbiTestWrapperFuture {
#[serde(serialize_with = "wrapper_future")]

View File

@ -6,13 +6,16 @@ use {
stake_history::StakeHistory,
vote_account::{VoteAccount, VoteAccounts, VoteAccountsHashMap},
},
dashmap::DashMap,
num_derive::ToPrimitive,
num_traits::ToPrimitive,
rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Epoch,
clock::{Epoch, Slot},
pubkey::Pubkey,
stake::{
self,
@ -21,9 +24,99 @@ use {
},
solana_stake_program::stake_state,
solana_vote_program::vote_state::VoteState,
std::{collections::HashMap, sync::Arc},
std::{
collections::HashMap,
sync::{Arc, RwLock, RwLockReadGuard},
},
};
#[derive(Debug, Clone, PartialEq, ToPrimitive)]
pub enum InvalidCacheEntryReason {
Missing,
BadState,
WrongOwner,
}
#[derive(Default, Debug, Deserialize, Serialize, AbiExample)]
pub struct StakesCache(RwLock<Stakes>);
impl StakesCache {
pub fn new(stakes: Stakes) -> Self {
Self(RwLock::new(stakes))
}
pub fn stakes(&self) -> RwLockReadGuard<Stakes> {
self.0.read().unwrap()
}
pub fn is_stake(account: &AccountSharedData) -> bool {
solana_vote_program::check_id(account.owner())
|| stake::program::check_id(account.owner())
&& account.data().len() >= std::mem::size_of::<StakeState>()
}
pub fn check_and_store(
&self,
pubkey: &Pubkey,
account: &AccountSharedData,
remove_delegation_on_inactive: bool,
) {
if Self::is_stake(account) {
let mut stakes = self.0.write().unwrap();
stakes.store(pubkey, account, remove_delegation_on_inactive)
}
}
pub fn activate_epoch(&self, next_epoch: Epoch, thread_pool: &ThreadPool) {
let mut stakes = self.0.write().unwrap();
stakes.activate_epoch(next_epoch, thread_pool)
}
pub fn handle_invalid_keys(
&self,
invalid_stake_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
invalid_vote_keys: DashMap<Pubkey, InvalidCacheEntryReason>,
should_evict_invalid_entries: bool,
current_slot: Slot,
) {
if invalid_stake_keys.is_empty() && invalid_vote_keys.is_empty() {
return;
}
// Prune invalid stake delegations and vote accounts that were
// not properly evicted in normal operation.
let mut maybe_stakes = if should_evict_invalid_entries {
Some(self.0.write().unwrap())
} else {
None
};
for (stake_pubkey, reason) in invalid_stake_keys {
if let Some(stakes) = maybe_stakes.as_mut() {
stakes.remove_stake_delegation(&stake_pubkey);
}
datapoint_warn!(
"bank-stake_delegation_accounts-invalid-account",
("slot", current_slot as i64, i64),
("stake-address", format!("{:?}", stake_pubkey), String),
("reason", reason.to_i64().unwrap_or_default(), i64),
);
}
for (vote_pubkey, reason) in invalid_vote_keys {
if let Some(stakes) = maybe_stakes.as_mut() {
stakes.remove_vote_account(&vote_pubkey);
}
datapoint_warn!(
"bank-stake_delegation_accounts-invalid-account",
("slot", current_slot as i64, i64),
("vote-address", format!("{:?}", vote_pubkey), String),
("reason", reason.to_i64().unwrap_or_default(), i64),
);
}
}
}
#[derive(Default, Clone, PartialEq, Debug, Deserialize, Serialize, AbiExample)]
pub struct Stakes {
/// vote accounts
@ -144,12 +237,6 @@ impl Stakes {
+ self.vote_accounts.iter().map(get_lamports).sum::<u64>()
}
pub fn is_stake(account: &AccountSharedData) -> bool {
solana_vote_program::check_id(account.owner())
|| stake::program::check_id(account.owner())
&& account.data().len() >= std::mem::size_of::<StakeState>()
}
pub fn remove_vote_account(&mut self, vote_pubkey: &Pubkey) {
self.vote_accounts.remove(vote_pubkey);
}