diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 5162ca1a99..d4497e7ce6 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,5 +1,6 @@ use crate::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, + consensus::VOTE_THRESHOLD_SIZE, crds_value::CrdsValueLabel, poh_recorder::PohRecorder, result::{Error, Result}, @@ -15,7 +16,10 @@ use log::*; use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; use solana_perf::packet::{self, Packets}; -use solana_runtime::{bank::Bank, epoch_stakes::EpochAuthorizedVoters}; +use solana_runtime::{ + bank::Bank, + epoch_stakes::{EpochAuthorizedVoters, EpochStakes}, +}; use solana_sdk::{ clock::{Epoch, Slot}, epoch_schedule::EpochSchedule, @@ -44,6 +48,7 @@ pub type VerifiedVoteTransactionsReceiver = CrossbeamReceiver>; pub struct SlotVoteTracker { voted: HashSet>, updates: Option>>, + total_stake: u64, } impl SlotVoteTracker { @@ -376,12 +381,14 @@ impl ClusterInfoVoteListener { let root_bank = bank_forks.read().unwrap().root_bank().clone(); vote_tracker.process_new_root_bank(&root_bank); + let epoch_stakes = root_bank.epoch_stakes(root_bank.epoch()); if let Err(e) = Self::get_and_process_votes( &vote_txs_receiver, &vote_tracker, root_bank.slot(), subscriptions.clone(), + epoch_stakes, ) { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -403,7 +410,13 @@ impl ClusterInfoVoteListener { last_root: Slot, subscriptions: Arc, ) -> Result<()> { - Self::get_and_process_votes(vote_txs_receiver, vote_tracker, last_root, subscriptions) + Self::get_and_process_votes( + vote_txs_receiver, + vote_tracker, + last_root, + subscriptions, + None, + ) } fn get_and_process_votes( @@ -411,13 +424,20 @@ impl ClusterInfoVoteListener { vote_tracker: &Arc, last_root: Slot, subscriptions: Arc, + epoch_stakes: Option<&EpochStakes>, ) -> Result<()> { let timer = Duration::from_millis(200); let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; while let Ok(new_txs) = vote_txs_receiver.try_recv() { vote_txs.extend(new_txs); } - Self::process_votes(vote_tracker, vote_txs, last_root, subscriptions); + Self::process_votes( + vote_tracker, + vote_txs, + last_root, + subscriptions, + epoch_stakes, + ); Ok(()) } @@ -426,6 +446,7 @@ impl ClusterInfoVoteListener { vote_txs: Vec, root: Slot, subscriptions: Arc, + epoch_stakes: Option<&EpochStakes>, ) { let mut diff: HashMap>> = HashMap::new(); { @@ -521,15 +542,35 @@ impl ClusterInfoVoteListener { if w_slot_tracker.updates.is_none() { w_slot_tracker.updates = Some(vec![]); } - for pk in slot_diff { - w_slot_tracker.voted.insert(pk.clone()); - w_slot_tracker.updates.as_mut().unwrap().push(pk); + let mut current_stake = 0; + for pubkey in slot_diff { + Self::sum_stake(&mut current_stake, epoch_stakes, &pubkey); + + w_slot_tracker.voted.insert(pubkey.clone()); + w_slot_tracker.updates.as_mut().unwrap().push(pubkey); } + Self::notify_for_stake_change( + current_stake, + w_slot_tracker.total_stake, + &subscriptions, + epoch_stakes, + slot, + ); + w_slot_tracker.total_stake += current_stake; } else { - let voted: HashSet<_> = slot_diff.into_iter().collect(); + let mut total_stake = 0; + let voted: HashSet<_> = slot_diff + .into_iter() + .map(|pubkey| { + Self::sum_stake(&mut total_stake, epoch_stakes, &pubkey); + pubkey + }) + .collect(); + Self::notify_for_stake_change(total_stake, 0, &subscriptions, epoch_stakes, slot); let new_slot_tracker = SlotVoteTracker { voted: voted.clone(), updates: Some(voted.into_iter().collect()), + total_stake, }; vote_tracker .slot_vote_trackers @@ -539,6 +580,31 @@ impl ClusterInfoVoteListener { } } } + + fn notify_for_stake_change( + current_stake: u64, + previous_stake: u64, + subscriptions: &Arc, + epoch_stakes: Option<&EpochStakes>, + slot: Slot, + ) { + if let Some(stakes) = epoch_stakes { + let supermajority_stake = (stakes.total_stake() as f64 * VOTE_THRESHOLD_SIZE) as u64; + if previous_stake < supermajority_stake + && (previous_stake + current_stake) > supermajority_stake + { + subscriptions.notify_gossip_subscribers(slot); + } + } + } + + fn sum_stake(sum: &mut u64, epoch_stakes: Option<&EpochStakes>, pubkey: &Pubkey) { + if let Some(stakes) = epoch_stakes { + if let Some(vote_account) = stakes.stakes().vote_accounts().get(pubkey) { + *sum += vote_account.0; + } + } + } } #[cfg(test)] @@ -749,6 +815,7 @@ mod tests { &vote_tracker, 0, subscriptions, + None, ) .unwrap(); for vote_slot in vote_slots { @@ -798,6 +865,7 @@ mod tests { &vote_tracker, 0, subscriptions, + None, ) .unwrap(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { @@ -916,7 +984,13 @@ mod tests { &validator0_keypairs.vote_keypair, )]; - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0, subscriptions.clone()); + ClusterInfoVoteListener::process_votes( + &vote_tracker, + vote_tx, + 0, + subscriptions.clone(), + None, + ); let ref_count = Arc::strong_count( &vote_tracker .keys @@ -966,7 +1040,7 @@ mod tests { }) .collect(); - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions); + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0, subscriptions, None); let ref_count = Arc::strong_count( &vote_tracker